感谢您的支持。
我目前正在尝试 Apache Beam,以尽可能多地了解它的工作原理。我面临 com.google.api.services.bigquery.model.TableSchema 序列化的问题。我读到不可能序列化 TableSchema,并且我正在探索应对这一挑战的潜在解决方法。到目前为止,我确实有我的超类:
public class TransformBasic extends DoFn<String, TableRow> {
private static final Logger LOG = LoggerFactory.getLogger(TransformBasic.class);
protected final TupleTag<TableRow> outputTag;
protected final TupleTag<TableRow> failureTag;
private transient com.google.api.services.bigquery.model.TableSchema tableSchema;
private String whoAmI;
private String header;
public TransformBasic(String whoAmI, String header, TupleTag<TableRow> outputTag, TupleTag<TableRow> failureTag) {
this.whoAmI = whoAmI;
this.header = header;
this.outputTag = outputTag;
this.failureTag = failureTag;
}
public TransformBasic() {
this.outputTag = null;
this.failureTag = null;
}
@ProcessElement
public void processElement(ProcessContext c) {
tableSchema = getTableSchemaByName();
//Csv line
String line = c.element();
//BigQuery row
TableRow row = new TableRow();
//Csv parser
CSVParser csvParser = new CSVParserBuilder()
.withSeparator(',')
.build();
try {
//If csv line is different from header
if (!line.equalsIgnoreCase(header)) {
String[] csvColumns = csvParser.parseLine(line);
for (int i = 0; i < csvColumns.length; i++) {
TableFieldSchema bigqueryColumn = tableSchema.getFields().get(i);
String newBigqueryColumns = replaceCharacters(bigqueryColumn.getName());
row.set(newBigqueryColumns, csvColumns[i]);
}
c.output(outputTag,row);
}
} catch (Exception e) {
LOG.error("FAILURE in " + whoAmI.toUpperCase() + e);
Failure failure = new Failure(
LocalDate.now().toString(),
whoAmI,
line,
e.toString());
c.output(failureTag, failure.getAsTableRow());
}
}
//****************************
// HERE IS THE SERIALIZATION WORKAROUND SOLUTION ( NOT WORKING ) ----------------------------
//****************************
protected TableSchema getTableSchemaByName() {
// Use the whoAmI variable to determine the schema
if ("L".equals(whoAmI)) {
return getTableSchemaL();
} else if ("P".equals(whoAmI)) {
return getTableSchemaP();
} else {
// Handle other cases or throw an exception
throw new RuntimeException("Unknown class: " + whoAmI);
}
}}
然后我创建子类:
public class TransformL extends TransformBasic {
public static final TupleTag<TableRow> OUTPUT_TAGS_L=new TupleTag<TableRow>() {};
public static final TupleTag<TableRow> FAILURE_TAGS_L = new TupleTag<TableRow>() {};
private static final String WHO_AM_I = "l";
private static final String HEADER_L = "metric,scen,sec,year,value,id";
public TransformL() {
super(WHO_AM_I, HEADER_L,OUTPUT_TAGS_L,FAILURE_TAGS_L);
}}
我使用的表模式是:
public class TableSchema {
public static com.google.api.services.bigquery.model.TableSchema getTableSchemaL() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("metric"));
fields.add(new TableFieldSchema().setName("scen"));
fields.add(new TableFieldSchema().setName("sec"));
fields.add(new TableFieldSchema().setName("year"));
fields.add(new TableFieldSchema().setName("value"));
fields.add(new TableFieldSchema().setName("id"));
return new com.google.api.services.bigquery.model.TableSchema().setFields(fields);
}
}
我正在像这样测试这个实现:
public class TransformLitUwTest {
private static final Logger LOG = LoggerFactory.getLogger(TransformLitUwTest.class);
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
public void testTransformL() {
// Create a sample input data
String fakeInput = "metric,scen,sec,year,value,id\n" +
"metric1,scen1,10,2022,100,id1\n" +
"metric2,scen2,20,2022,200,id2";
PCollection<String> inputCollection = pipeline
.apply("CreateFakeInput", org.apache.beam.sdk.transforms.Create.of(fakeInput));
// Apply your TransformLitUw transformation
PCollectionTuple outputTuple = inputCollection.apply("TransformL", ParDo.of(new
TransformL())
.withOutputTags(TransformLitUw.OUTPUT_TAGS_L,
TupleTagList.of(TransformLitUw.FAILURE_TAGS_L)));
// Get the PCollection with successful outputs
PCollection<TableRow> outputCollection =
outputTuple.get(TransformLitUw.OUTPUT_TAGS_L);
// Log output of PCollection
outputCollection.apply(ParDo.of(new DoFn<TableRow, Void>() {
@ProcessElement
public void processElement(@Element TableRow c) {
// Print each TableRow to the console
LOG.info("TABLE ROW -------------------------------------> " + c.toString());
}
}));
//Logic for asset equal (implementation omitted)
// Run the pipeline
pipeline.run().waitUntilFinish();
}
}
为了清楚起见,我删除了一些代码,但如果有必要,我将包含完整的代码。
最初的解决方法涉及在父级中创建一个类,该类将调用 TableSchema 并在父级方法中使用它。然而,这种方法仍然不起作用,并且调试过程没有提供足够清晰的信息来说明应该在哪里进行更改。 我想保留拥有一个超类的想法,这对于我拥有的表模式来说是不必要的,但我也想尝试继承方法。 我有什么遗漏的吗?
我对序列化做了更多研究,并修复了控制台中出现的错误。基本上,我还需要在测试类中包含序列化的实现。这是因为,根据我实现类和各种管道的方式,我还必须在测试类级别传递 tableSchema。所以,我改变了班级如下
public class TransformLitUwTest implements Serializable {
// the rest of the implementation goes here
}
在实现过程中,此时我还发现了另一个错误::
// Apply TransformLitUw transformation
PCollectionTuple outputTuple = inputCollection.apply("TransformLitUw", ParDo.of(new TransformLitUw())
.withOutputTags(TransformLitUw.OUTPUT_TAGS_LIT_UW, TupleTagList.of(TransformLitUw.FAILURE_TAGS_LIT_UW)));
在这种情况下,因为 TransformLitUw() 的 DoFn 接收一个 String 并返回 TableSchema,所以我必须通过以下方式将“假 CSV”读取为字符串数组:
PCollection<String> inputCollection = pipeline
.apply("CreateFakeInput", org.apache.beam.sdk.transforms.Create.of(Arrays.asList(fakeInput.split("\n"))));
我希望这些信息能够帮助面临序列化问题的人,并指导他们如何解决它