我有一个 Beam 管道来使用多个阶段(PTransforms)的流事件来处理它们。请看下面的代码,
pipeline.apply("Read Data from Stream", StreamReader.read())
.apply("Decode event and extract relevant fields", ParDo.of(new DecodeExtractFields()))
.apply("Deduplicate process", ParDo.of(new Deduplication()))
.apply("Conversion, Mapping and Persisting", ParDo.of(new DataTransformer()))
.apply("Build Kafka Message", ParDo.of(new PrepareMessage()))
.apply("Publish", ParDo.of(new PublishMessage()))
.apply("Commit offset", ParDo.of(new CommitOffset()));
使用KafkaIO读取的流事件和
StreamReader.read()
方法实现是这样的,
public static KafkaIO.Read<String, String> read() {
return KafkaIO.<String, String>read()
.withBootstrapServers(Constants.BOOTSTRAP_SERVER)
.withTopics(Constants.KAFKA_TOPICS)
.withConsumerConfigUpdates(Constants.CONSUMER_PROPERTIES)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class);
}
通过 KafkaIO 读取流式事件/消息后,我们可以提交偏移量。
我需要做的是在执行所有先前的 PTransform 时,在最后一个 Commit offset
PTransform 内手动提交偏移量。
我的问题是,如何手动提交偏移量? 感谢是否可以共享资源/示例代码。
Read.commitOffsetsInFinalize()
方法,应该在完成检查点时提交偏移量,以及
AUTO_COMMIT
消费者配置选项,用于由 Kafka 消费者自动提交读取记录。但是,在您的情况下,它不起作用,您需要通过对同一主题/分区/窗口的偏移量进行分组并在您的 CommitOffset
DoFn 中创建一个新的 Kafka 客户端实例来手动执行此操作,该实例将提交这些偏移量。您需要按分区对偏移量进行分组,否则可能会出现竞争条件,因为将同一分区的偏移量提交到不同的工作线程上。
当我尝试这样做时尝试手动提交偏移量时出错
Offset commit failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
完整代码在这里
package orbi.kafkabeam;
import autovalue.shaded.com.google.common.collect.ImmutableMap; import orbi.kafkabeam.models.GCSWriteTypeObject; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Validation.Required; import org.joda.time.Duration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.joda.time.LocalDateTime;
import java.util.*;
public class KafkaToGCS {
public interface Options extends GcpOptions {
@Description("Kafka bootstrap servers")
@Default.String("localhost:9092")
String getBootstrapServers();
void setBootstrapServers(String value);
@Description("Kafka topic")
@Required
String getKafkaTopic();
void setKafkaTopic(String value);
@Description("Output GCS path")
@Required
String getOutputGCSPath();
void setOutputGCSPath(String value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args)
.as(Options.class);
Pipeline pipeline = Pipeline.create(options);
String bootstrapServers = options.getBootstrapServers();
String kafkaTopic = options.getKafkaTopic();
String getGCSOutputPath = options.getOutputGCSPath();
String groupName = "g-prod-kafka-to-gcs-cg";
List<String> topics = Arrays.asList("mongo.prescription-store.prescription", "mysql.appointment.transaction");
// Consume messages from Kafka
PCollection<KafkaRecord<String, String>> kafkaMessages = pipeline.apply(
KafkaIO.<String, String>read()
.withBootstrapServers(bootstrapServers)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10")
.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000" )
.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "180000")
.build())
//.withoutMetadata() // If you want metadata, use `withMetadata()`
);
PCollection<KV<String, KafkaRecord<String, String>>> keyedKafkaMessages = kafkaMessages.apply("MakeKeyedValuesForWindowing", ParDo.of(new DoFn<KafkaRecord<String, String>, KV<String, KafkaRecord<String, String>>>(){
@ProcessElement
public void processElement(ProcessContext c){
KafkaRecord<String, String> kafkaRecord = c.element();
assert kafkaRecord != null;
c.output(KV.of(kafkaRecord.getTopic(), kafkaRecord));
}
}));
// Apply fixed windowing
PCollection<KV<String, KafkaRecord<String, String>>> windowedMessages = keyedKafkaMessages.apply(
Window.<KV<String, KafkaRecord<String, String>>>into(FixedWindows.of(Duration.standardMinutes(5)))
);
// Aggregation on Windowing to group the data
PCollection<KV<String, GCSWriteTypeObject>> windowedAggregatedMessages = windowedMessages.apply("GroupByWindow", GroupByKey.create())
.apply("FormatData", ParDo.of(new DoFn<KV<String, Iterable<KafkaRecord<String, String>>>, KV<String, GCSWriteTypeObject>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Iterable<KafkaRecord<String, String>> elements = c.element().getValue();
String topic = "";
Map<String, Map<Integer, Long>> outerMap = new HashMap<>();
StringBuilder combinedData = new StringBuilder();
assert elements != null;
for (KafkaRecord<String, String> element : elements) {
topic = element.getTopic();
if (!outerMap.containsKey(topic)) {
Map<Integer, Long> innerMap = new HashMap<>();
innerMap.put(element.getPartition(), element.getOffset());
outerMap.put(topic, innerMap);
}else{
if (outerMap.get(topic).containsKey(element.getPartition())){
if (element.getOffset() > outerMap.get(topic).get(element.getPartition()) ){
Map<Integer, Long> innerMap = outerMap.get(topic);
innerMap.put(element.getPartition(), element.getOffset());
outerMap.put(topic, innerMap);
}
}else{
Map<Integer, Long> innerMap = new HashMap<>();
innerMap.put(element.getPartition(), element.getOffset());
outerMap.put(topic, innerMap);
}
}
combinedData.append(element.getKV().getValue()).append("\n");
}
// Format or combine the data as needed
System.out.print("OuterMap");
System.out.println(List.of(outerMap));
LocalDateTime writingMoment = LocalDateTime.now();
int year = writingMoment.getYear();
int month = writingMoment.getMonthOfYear();
int day = writingMoment.getDayOfMonth();
c.output(KV.of(topic, new GCSWriteTypeObject(combinedData.toString(), outerMap, String.format("%d/%02d/%02d/%s/", year, month, day, topic))));
}
}));
// Write Dynamic messages to GCS
windowedAggregatedMessages.apply("WriteDynamicToGCS", FileIO.<String, KV<String,GCSWriteTypeObject>>writeDynamic()
.by(element -> {
assert element != null;
return Objects.requireNonNull(element.getValue()).getPrefix();
})
.withDestinationCoder(AvroCoder.of(String.class))
.via(
Contextful.fn((SerializableFunction<KV<String, GCSWriteTypeObject>, String>) input -> {
assert input != null;
return Objects.requireNonNull(input.getValue()).getPayload();
}),
TextIO.sink()
)
.to(getGCSOutputPath)
.withNaming(type -> FileIO.Write.defaultNaming(type, ".txt"))
.withNumShards(1)
);
// Commit offsets after successful processing
windowedAggregatedMessages.apply("CommitOffsets", ParDo.of(new DoFn<KV<String, GCSWriteTypeObject>, Void>() {
@ProcessElement
public void processElement(@Element KV<String, GCSWriteTypeObject> element, OutputReceiver<Void> receiver) {
// Manually commit offset
GCSWriteTypeObject gcsWriteTypeObject = element.getValue();
assert gcsWriteTypeObject != null;
gcsWriteTypeObject.getOffsetMap().forEach((topic, partitionMap) -> {
System.out.println("Topic: " + topic + ", PartitionMap: " + partitionMap);
partitionMap.forEach((partition, offset) -> {
System.out.println("Partition: " + partition + ", Offset: " + offset);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
currentOffsets.put(
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1));
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName); // Use appropriate group id
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000" );
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "180000");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer())) {
consumer.subscribe(Collections.singletonList(topic));
try {
consumer.commitSync(currentOffsets); // Attempt to commit the offset
System.out.println("Offset committed successfully.");
} catch (CommitFailedException e) {
System.err.println("Offset commit failed: " + e.getMessage());
// Handle the exception (e.g., retry, alert, etc.)
}
} catch (Exception e) {
System.err.println("Kafka consumer error: " + e.getMessage());
// Handle other exceptions related to Kafka consumer
}
});
});
}
}));
pipeline.run();
} }