无法将处理后的数据写入Parquet

问题描述 投票:0回答:1

这是我的代码,我的kafka主题中有固定数量的数据,大约15k。我希望批量读取数据并将输出写入文件

PCollection<KafkaRecord<byte[], byte[]>> messages = pipeline.apply("ReadFromKafka",
                KafkaIO.<byte[], byte[]>read()
                        .withBootstrapServers(CONSUMER_CONFIGS.getKafkaServer())
                        .withConsumerFactoryFn(new consumerFactory())
                        );

        PCollection<KafkaRecord<byte[], byte[]>> windowedMessages = messages.apply(Window.<KafkaRecord<byte[], byte[]>>into(new GlobalWindows())
                .triggering(AfterWatermark.pastEndOfWindow()) // Trigger once after watermark passes the end of the window
                .discardingFiredPanes());
                        

        PCollection<Row> A= (ConvertToRowFn)

        A.setCoder(RowCoder.of(schema2)); 

        String outputPath = configs.getFolderPath() ;

        PCollection<GenericRecord> records = A.apply("Convert Rows to GenericRecord",   
                                                                        MapElements.into(TypeDescriptor.of(GenericRecord.class))
                                                                               .via(this::convertRowToGenericRecord));

        records.setCoder(AvroCoder.of(GenericRecord.class, AvroSchema.getSchema()));

        records.apply("Write Parquet",
                          FileIO.<GenericRecord> write()
                                .via(ParquetIO.sink(AvroSchema.getSchema())) 
                                .to(outputPath)
                                .withSuffix(PARQUET));


        pipeline.run().waitUntilFinish();

我正在阅读记录;解析逻辑也工作得很好。行到通用记录的转换也有效,通过日志验证了这一点。我只需要写入数据。我不明白为什么它没有被写入,也没有抛出错误日志,数据只是在没有日志语句的情况下被读取和读取。

由于数据的性质,我不想根据偏移量对数据进行加窗。

我希望根据分区的数量将数据写入多个文件,而且我也不希望等到读取所有记录后才开始处理。文件将存储在 s3 上。

apache-beam apache-beam-io apache-beam-kafkaio
1个回答
0
投票

https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/FileIO.html#:~:text=the%20file%22%2C%20ex )%3B%0A%20%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20 %7D))%3B-,写入%20个文件,-write()%20and FileIO 按窗口/窗格进行写入,并为这些写入按键插入一个组。

为了解决这个问题,您需要插入允许写入完成的窗口,或者使用触发器提前触发 - https://beam.apache.org/documentation/programming-guide/#triggers

© www.soinside.com 2019 - 2024. All rights reserved.