我可以使用独立 API 读取在 Amazon S3 中创建的增量表,但无法创建增量表并向其中插入数据。在下面的 Delta Lake 链接中,提到使用 Zappy reader 和 writer,这是虚构的,仅供参考。
我尝试使用 avro parquet writer,但在获取 AddFile 对象所需的所有数据时遇到了问题,您能否分享可在 scala 中使用的 writer 的任何示例以及如何将元数据提交到增量表?
https://docs.delta.io/latest/delta-standalone.html#-azure-blob-storage
ZappyDataFrame correctedSaleIdToTotalCost = ...;
ZappyDataFrame invalidSales = ZappyReader.readParquet(filteredFiles);
ZappyDataFrame correctedSales = invalidSales.join(correctedSaleIdToTotalCost, "id")
ZappyWriteResult dataWriteResult = ZappyWritter.writeParquet("/data/sales", correctedSales);
“请注意,此示例使用虚构的非 Spark 引擎 Zappy 来写入实际的 Parquet 数据,因为 Delta Standalone 不提供任何数据写入 API。相反,Delta Standalone Writer 允许您在创建元数据后将元数据提交到 Delta 日志。已写入您的数据”
使用的依赖项 - pom.xml
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.12</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
</dependency>
我尝试使用 avro parquet writer,但在获取 AddFile 对象所需的所有数据时遇到了问题,您能否分享可在 scala 中使用的 writer 的任何示例以及如何将元数据提交到增量表?
可以使用 Delta Standalone 创建表,但如何操作根本不明显。这是一个简短的 Java 程序,展示了如何创建新表:
package myproject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Operation;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.Action;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.Metadata;
import io.delta.standalone.types.StringType;
import io.delta.standalone.types.StructField;
import io.delta.standalone.types.StructType;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
public class DeltaExample {
public static void main(final String[] args) throws Exception {
Schema schema = SchemaBuilder
.record("MyRecord")
.namespace("mynamespace")
.fields()
.requiredString("myfield")
.endRecord();
ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path("delta/file.parquet"))
.withSchema(schema)
.build();
GenericRecord record = new GenericData.Record(schema);
record.put("myfield", "myvalue");
writer.write(record);
long size = writer.getDataSize();
writer.close();
List<Action> actions = List.of(new AddFile("file.parquet", new HashMap<String, String>(), size, System.currentTimeMillis(), true, null, null));
DeltaLog log = DeltaLog.forTable(new Configuration(), "delta");
OptimisticTransaction txn = log.startTransaction();
Metadata metaData = txn.metadata()
.copyBuilder()
.partitionColumns(new ArrayList<String>())
.schema(new StructType()
.add(new StructField("myfield", new StringType(), true))).build();
txn.updateMetadata(metaData);
txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "myproject");
}
}