这是我的代码,我的kafka主题中有固定数量的数据,大约15k。我希望批量读取数据并将输出写入文件
PCollection<KafkaRecord<byte[], byte[]>> messages = pipeline.apply("ReadFromKafka",
KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(CONSUMER_CONFIGS.getKafkaServer())
.withConsumerFactoryFn(new consumerFactory())
);
PCollection<KafkaRecord<byte[], byte[]>> windowedMessages = messages.apply(Window.<KafkaRecord<byte[], byte[]>>into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow()) // Trigger once after watermark passes the end of the window
.discardingFiredPanes());
PCollection<Row> A= (ConvertToRowFn)
A.setCoder(RowCoder.of(schema2));
String outputPath = configs.getFolderPath() ;
PCollection<GenericRecord> records = A.apply("Convert Rows to GenericRecord",
MapElements.into(TypeDescriptor.of(GenericRecord.class))
.via(this::convertRowToGenericRecord));
records.setCoder(AvroCoder.of(GenericRecord.class, AvroSchema.getSchema()));
records.apply("Write Parquet",
FileIO.<GenericRecord> write()
.via(ParquetIO.sink(AvroSchema.getSchema()))
.to(outputPath)
.withSuffix(PARQUET));
pipeline.run().waitUntilFinish();
我正在阅读记录;解析逻辑也工作得很好。行到通用记录的转换也有效,通过日志验证了这一点。我只需要写入数据。我不明白为什么它没有被写入,也没有抛出错误日志,数据只是在没有日志语句的情况下被读取和读取。
由于数据的性质,我不想根据偏移量对数据进行加窗。
我希望根据分区的数量将数据写入多个文件,而且我也不希望等到读取所有记录后才开始处理。文件将存储在 s3 上。
每https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/FileIO.html#:~:text=the%20file%22%2C%20ex )%3B%0A%20%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20 %7D))%3B-,写入%20个文件,-write()%20and FileIO 按窗口/窗格进行写入,并为这些写入按键插入一个组。
为了解决这个问题,您需要插入允许写入完成的窗口,或者使用触发器提前触发 - https://beam.apache.org/documentation/programming-guide/#triggers