Apache Beam 将 kafka 记录写入 Avro 文件

问题描述 投票:0回答:0

我想从 Kafka 主题中读取几行并创建一个 avro 文件。

我有部分代码在工作,它正在从 kafka 主题读取并打印到控制台工作。

我想知道如何使用 avroIO 将通用记录写入 avro 文件。

下面列出的是从kafka主题读取并打印到控制台的代码




public class BeamConsumer {

    public static void main(String[] args) throws IOException {

        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);
        Schema schema =
                new Schema.Parser()
                        .parse(new File("schema.avsc"));

        PTransform<PBegin, PCollection<KafkaRecord<GenericRecord, GenericRecord>>> input =
                KafkaIO.<GenericRecord, GenericRecord>read()
                        .withBootstrapServers(
                                "${kafkaserveraddress}")
                        .withTopic("my-topic") // use
                        // withTopics(List<String>) to read from multiple topics.
                        .withKeyDeserializer(
                                ConfluentSchemaRegistryDeserializerProvider.of(
                                        "${schemaregistryaddress}",
                                        "schemaregistrysubjectkey"))
                        .withValueDeserializer(
                                ConfluentSchemaRegistryDeserializerProvider.of(
                                        "${schemaregistryaddress}",
                                        "schemaregistrysubjectvalue"))
                        .withConsumerConfigUpdates(
                                ImmutableMap.of(
                                        ConsumerConfig.GROUP_ID_CONFIG,
                                        "my-group-id",
                                        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                        "SSL",
                                        SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
                                        "/truststore.jks",
                                        SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
                                        "******",
                                        SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
                                        "/keystore.jks",
                                        SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
                                        "*******",
                                        SslConfigs.SSL_KEY_PASSWORD_CONFIG,
                                        "*******",
                                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                                        "latest",
                                        ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
                                        600000));

        pipeline
                .apply(input)
                .apply(
                        "ExtractRecord",
                        ParDo.of(
                                new DoFn<
                                        KafkaRecord<GenericRecord, GenericRecord>,
                                        KafkaRecord<GenericRecord, GenericRecord>>() {
                                    @DoFn.ProcessElement
                                    public void processElement(ProcessContext c) {
                                        KafkaRecord<GenericRecord, GenericRecord> record =
                                                (KafkaRecord<GenericRecord, GenericRecord>) c.element();
                                        KV<GenericRecord, GenericRecord> log = record.getKV();
                                        System.out.println("Key Obtained: " + log.getKey());
                                        System.out.println("Value Obtained: " + log.getValue().toString());
                                        c.output(record);
                                    }
                                }));
        //            .apply("WriteToAvro",
        // AvroIO.writeGenericRecords(schema).to("/Users/mjain34/code/avroutils/src/main/resources/file.avro"));
        //
        PipelineResult run = pipeline.run();
        run.waitUntilFinish(Duration.standardSeconds(1000));
    }
}

注意:我已经修改了这里的配置值以隐藏私人信息

apache apache-kafka beam
© www.soinside.com 2019 - 2024. All rights reserved.