使用Google Dataflow在批处理模式下使用KafkaIO

问题描述 投票:1回答:1

Google Dataflow作业使用Apache Beam的KafkaIO库与AvroIO和Windowed Writes将输出写入Google Cloud Storage存储桶中的“.avro”文件。但是,它默认为Streaming作为生产数据上的处理作业类型。

是否可以使用批处理使用KafkaIO在Dataflow中使用Kafka主题中的数据。此数据流作业不需要近实时处理(流式传输)。有没有办法将传入的记录插入到BigQuery表中,而不需要流式插入成本来启用批处理类型处理。

运行频率较低的批处理可以工作,从而减少内存,vCPU和计算成本。

按照:https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/kafka/KafkaIO.html

KafkaIO源返回无限制的Kafka记录集合作为PCollection>。

这是否意味着Kafka是无限制的源无法以批处理模式运行?

测试.withMaxNumRecords(1000000)条件以批处理模式运行作业。但是,要在实时传入数据中运行作业,我需要删除此条件。

我已尝试使用窗口并将流模式选项标记设置为false,但未成功,如下面的代码所示。


// not streaming mode
options.setStreaming(false);

...
PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
               .withBootstrapServers("IPADDRESS:9092")
               .withTopic(topic)                                
               .withKeyDeserializer(StringDeserializer.class)
               .withValueDeserializer(StringDeserializer.class)                
               .updateConsumerProperties(props)
               .withConsumerFactoryFn(new ConsumerFactory())                            
//             .withMaxNumRecords(1000000)
               .withoutMetadata() 
       ).apply(Values.<String>create())
       .apply(Window.into(FixedWindows.of(Duration.standardDays(1))));


...
//convert to Avro GenericRecord

.apply("AvroToGCS", AvroIO.writeGenericRecords(AVRO_SCHEMA)
.withWindowedWrites()
.withNumShards(1)
.to("gs://BUCKET/FOLDER/")
.withSuffix(".avro"));

该代码导致流式作业类型包含4个vCPU和1个工作程序,用于处理1.8百万条记录的9分钟。在此之后,我不得不停止工作(排水)以防止成本。

在数据流中对传入数据执行批处理,是否可以收集将其作为avro文件写入的批记录,并继续这样做,直到偏移量达到最新状态。

任何示例或示例代码都非常感谢。

java google-cloud-platform apache-kafka google-cloud-dataflow apache-beam
1个回答
1
投票

无限制的源无法以批处理模式运行。这是设计使然,因为批处理管道期望读取有限数量的数据并在完成处理时终止。

但是,您可以通过约束它所读取的记录数来将无界源转换为有界源。注意:无法保证读取哪些记录。

流式传输管道始终处于启动状态,因此可以读取实时数据。批处理管道用于读取存储数据的积压。

批处理管道对读取实时数据的响应不会很好,它将在您启动管道然后终止时读取任何数据。

© www.soinside.com 2019 - 2024. All rights reserved.