Google Dataflow作业使用Apache Beam的KafkaIO库与AvroIO和Windowed Writes将输出写入Google Cloud Storage存储桶中的“.avro”文件。但是,它默认为Streaming作为生产数据上的处理作业类型。
是否可以使用批处理使用KafkaIO在Dataflow中使用Kafka主题中的数据。此数据流作业不需要近实时处理(流式传输)。有没有办法将传入的记录插入到BigQuery表中,而不需要流式插入成本来启用批处理类型处理。
运行频率较低的批处理可以工作,从而减少内存,vCPU和计算成本。
按照:https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
KafkaIO源返回无限制的Kafka记录集合作为PCollection>。
这是否意味着Kafka是无限制的源无法以批处理模式运行?
测试.withMaxNumRecords(1000000)条件以批处理模式运行作业。但是,要在实时传入数据中运行作业,我需要删除此条件。
我已尝试使用窗口并将流模式选项标记设置为false,但未成功,如下面的代码所示。
// not streaming mode
options.setStreaming(false);
...
PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("IPADDRESS:9092")
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(props)
.withConsumerFactoryFn(new ConsumerFactory())
// .withMaxNumRecords(1000000)
.withoutMetadata()
).apply(Values.<String>create())
.apply(Window.into(FixedWindows.of(Duration.standardDays(1))));
...
//convert to Avro GenericRecord
.apply("AvroToGCS", AvroIO.writeGenericRecords(AVRO_SCHEMA)
.withWindowedWrites()
.withNumShards(1)
.to("gs://BUCKET/FOLDER/")
.withSuffix(".avro"));
该代码导致流式作业类型包含4个vCPU和1个工作程序,用于处理1.8百万条记录的9分钟。在此之后,我不得不停止工作(排水)以防止成本。
在数据流中对传入数据执行批处理,是否可以收集将其作为avro文件写入的批记录,并继续这样做,直到偏移量达到最新状态。
任何示例或示例代码都非常感谢。
无限制的源无法以批处理模式运行。这是设计使然,因为批处理管道期望读取有限数量的数据并在完成处理时终止。
但是,您可以通过约束它所读取的记录数来将无界源转换为有界源。注意:无法保证读取哪些记录。
流式传输管道始终处于启动状态,因此可以读取实时数据。批处理管道用于读取存储数据的积压。
批处理管道对读取实时数据的响应不会很好,它将在您启动管道然后终止时读取任何数据。