使用数据流作为 PCollection 从 GCS 存储桶读取 Avro 文件<TableRow>

问题描述 投票:0回答:1

我想知道如何从 GCS 中读取 Avro 文件的内容作为 PCollection 我正在尝试这样:

public static PCollection<TableRow> avroFileReader(Pipeline pipeline,String inputAvroFile){
pipeline.apply("Read Avro from GCS ", AvroIO.__________________) #<-this part is my question
        .apply("Convert Avro to TableRow", ParDo-logic)
}

其中 inputAvroFile 是我从数据流选项中获取的存储桶路径

google-cloud-dataflow apache-beam
1个回答
0
投票

它会是这样的:

pipeline.apply("Read Avro from GCS ", AvroIO..parseGenericRecords(new SerializableFunction<GenericRecord, TableRow>() {
       public genRec2TableRow apply(GenericRecord record) {
         // You could produce and return TableRow here
         TableRow tr = BigQueryAvroUtils.convertGenericRecordToTableRow(record, record.getSchema());
         return tr;
       }
     }));

您不需要

ParDo
,您可以直接从解析函数生成
TableRow

参考:AvroIO 读取未知 schema 的记录

© www.soinside.com 2019 - 2024. All rights reserved.