我想从 Kafka 主题中读取几行并创建一个 avro 文件。
我有部分代码在工作,它正在从 kafka 主题读取并打印到控制台工作。
我想知道如何使用 avroIO 将通用记录写入 avro 文件。
下面列出的是从kafka主题读取并打印到控制台的代码
public class BeamConsumer {
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
Schema schema =
new Schema.Parser()
.parse(new File("schema.avsc"));
PTransform<PBegin, PCollection<KafkaRecord<GenericRecord, GenericRecord>>> input =
KafkaIO.<GenericRecord, GenericRecord>read()
.withBootstrapServers(
"${kafkaserveraddress}")
.withTopic("my-topic") // use
// withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
"${schemaregistryaddress}",
"schemaregistrysubjectkey"))
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
"${schemaregistryaddress}",
"schemaregistrysubjectvalue"))
.withConsumerConfigUpdates(
ImmutableMap.of(
ConsumerConfig.GROUP_ID_CONFIG,
"my-group-id",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SSL",
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"/truststore.jks",
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
"******",
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
"/keystore.jks",
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
"*******",
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
"*******",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest",
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
600000));
pipeline
.apply(input)
.apply(
"ExtractRecord",
ParDo.of(
new DoFn<
KafkaRecord<GenericRecord, GenericRecord>,
KafkaRecord<GenericRecord, GenericRecord>>() {
@DoFn.ProcessElement
public void processElement(ProcessContext c) {
KafkaRecord<GenericRecord, GenericRecord> record =
(KafkaRecord<GenericRecord, GenericRecord>) c.element();
KV<GenericRecord, GenericRecord> log = record.getKV();
System.out.println("Key Obtained: " + log.getKey());
System.out.println("Value Obtained: " + log.getValue().toString());
c.output(record);
}
}));
// .apply("WriteToAvro",
// AvroIO.writeGenericRecords(schema).to("/Users/mjain34/code/avroutils/src/main/resources/file.avro"));
//
PipelineResult run = pipeline.run();
run.waitUntilFinish(Duration.standardSeconds(1000));
}
}
注意:我已经修改了这里的配置值以隐藏私人信息