Apache Avro是一个主要用于Apache Hadoop的数据序列化框架。
Debezium 连接:java.lang.NoClassDefFoundError:com/google/common/base/Ticker
问题描述 我在 Debezium 和 Confluence Schema Registry 上使用 Avro 序列化时遇到了问题。我遵循 Debezium 的官方文档。正如文档中提到的,来自
使用 Avro Schema 中的数组生成 Kafka 消息
我使用记录数组创建了 avro 模式 [ { “类型”:“记录”, “名称”:“BbbAvro”, “字段”:[ { ...
无法使用 Avro(自定义 Serdes)序列化 Spring Cloud Streams
我正在开发一个流处理应用程序,它连接两个流并将新记录输出到不同的主题。下面是我的配置 @豆 公共双功能 我正在开发一个流处理应用程序,它连接两个流并将新记录输出到不同的主题。以下是我的配置 @Bean public BiFunction<KStream<String, TestRecord>, KStream<String, TestRecord>, KStream<String, ModifiedTestPayload>> process() { return (walletCreated, placementCreated) -> placementCreated .selectKey((key, val) -> val.getClientId()) .join(walletCreated.selectKey((s, testPayload) -> testPayload.getClientId()), (ts, ts1) -> new ModifiedTestPayload(ts.getClientId()), JoinWindows.ofTimeDifferenceWithNoGrace(Duration.of(2, ChronoUnit.MINUTES)), StreamJoined.with(Serdes.String(), CustomSerdes.TestRecord(), CustomSerdes.TestRecord())); } 两个主题都将输入流以产生相同结构的数据。以下是我定义的自定义 Serdes。 public class CustomSerdes { public static Serde<TestRecord> TestRecord() { return new TestRecordSerde(); } public static class TestRecordSerde extends Serdes.WrapperSerde<TestRecord> { public TestRecordSerde() { super(new TestRecordSerializer(), new TestRecordDeserializer()); } } public static class TestRecordSerializer implements Serializer<TestRecord> { private final KafkaAvroSerializer inner; public TestRecordSerializer() { this.inner = new KafkaAvroSerializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public byte[] serialize(String topic, TestRecord data) { return inner.serialize(topic, data); } @Override public void close() { inner.close(); } } public static class TestRecordDeserializer implements Deserializer<TestRecord> { private final KafkaAvroDeserializer inner; public TestRecordDeserializer() { this.inner = new KafkaAvroDeserializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public TestRecord deserialize(String topic, byte[] data) { return (TestRecord) inner.deserialize(topic, data); } @Override public void close() { inner.close(); } } public static Serde<ModifiedTestPayload> ModifiedTestPayload() { return new ModifiedTestPayloadSerde(); } public static class ModifiedTestPayloadSerde extends Serdes.WrapperSerde<ModifiedTestPayload> { public ModifiedTestPayloadSerde() { super(new ModifiedTestPayloadSerializer(), new ModifiedTestPayloadDeserializer()); } } public static class ModifiedTestPayloadSerializer implements Serializer<ModifiedTestPayload> { private final KafkaAvroSerializer inner; public ModifiedTestPayloadSerializer() { this.inner = new KafkaAvroSerializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public byte[] serialize(String topic, ModifiedTestPayload data) { return inner.serialize(topic, data); } @Override public void close() { inner.close(); } } public static class ModifiedTestPayloadDeserializer implements Deserializer<ModifiedTestPayload> { private final KafkaAvroDeserializer inner; public ModifiedTestPayloadDeserializer() { this.inner = new KafkaAvroDeserializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public ModifiedTestPayload deserialize(String topic, byte[] data) { return (ModifiedTestPayload) inner.deserialize(topic, data); } @Override public void close() { inner.close(); } } } 和我的 application.yml 下面 spring: kafka: streams: application-id: test-app cloud: function: definition: process stream: kafka: streams: bindings: process-in-0: consumer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde process-in-1: consumer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde process-out-0: producer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: ai.wownettests.kf.kafkastreamtestone.CustomSerdes$ModifiedTestPayloadSerde configuration: schema.registry.url: http://localhost:8081 binder: brokers: localhost:9092 configuration: schema.registry.url: http://localhost:8081 specific.avro.reader: true default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde bindings: process-in-0: destination: testkf.placements.created process-in-1: destination: testkf.wallet.created process-out-0: destination: testkf.client.profile_completed logging: level: root: info 当我启动我的应用程序时,我收到如下错误(我有一些关于该应用程序尝试处理的 kafka 主题的数据) org.apache.kafka.streams.errors.StreamsException: Unable to serialize record. ProducerRecord(topic=[test-app-KSTREAM-KEY-SELECT-0000000003-repartition], partition=[null], timestamp=[1706526131230] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:231) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) ~[kafka-streams-3.6.1.jar:na] Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: You must configure() before serialize() or use serializer constructor with SchemaRegistryClient 通过创建自定义 serde 的 bean 修复了这个问题 @Bean public Serde<TestRecord> testRecordSerde() { Map<String, ?> config = Collections.singletonMap("schema.registry.url", "http://localhost:8081"); var r = CustomSerdes.TestRecord(); r.configure(config, false); return r; }
无法在我的 Maven 构建中获取 org.apache.flink.formats 包
我一直在尝试通过maven构建我的apache flink项目,但由于某种原因我遇到了编译错误。值得注意的是“org.apache.flink.formats 包不存在”...
我没有任何kafka和相关技术的经验,所以我需要你的帮助。 我面临以下问题: 在我当前的项目中,我们使用融合平台(kafka + schema 注册表)。对于
Debezium 2.0 汇合 AvroConverter 无法工作
自从 debezium 2.0 决定删除 io.confluence.connect.avro.AvroConverter 我尝试直接注入它,但它一直给我以下错误: 这是我的简化部署: apiVersi...
针对嵌套记录 AVRO 架构的 GCP Pub/Sub 消息验证失败
我有一个相当复杂的 AVRO 模式,它通过了验证,但发送给它的消息不知何故与我定义的 AVRO 模式不兼容。 从那时起,我尝试缩小范围...
Debezium 连接:java.lang.NoClassDefFoundError:io/confluence/connect/schema/AbstractDataConfig
我正在尝试按照这些帖子中的步骤执行类似的操作,但它对我不起作用,我正在使用 bezium 2.5 Final。 Debezium 2.0 汇合 AvroConverter 无法工作 Debezium 连接:...
json_encode 比 PHP 中的 Avro 编码器更快(基准测试)
所以我一直在阅读有关 Avro 编码如何比 JSON 更快的内容,但是当我运行自己的测试时,我得到了完全不同的结果。在我的测试中,JSON 没有什么异常,
我正在使用 docker 运行 kafka 和来自 https://github.com/confluenceinc/cp-all-in-one 的其他服务 在我的测试项目中使用 kafka、avro 和 schemaRegistry 的融合 nuget 包。 如果它去
如何通过模式自动检测将 Parquet/AVRO 加载到 Snowflake 中的多个列中?
当尝试将 Parquet/AVRO 文件加载到 Snowflake 表中时,出现错误: PARQUET 文件格式可以生成一且仅一列类型变体或对象或数组。如果您愿意,请使用 CSV 文件格式
不断发展的 Avro 架构(其中记录名称发生更改)和使用别名不起作用
所以我有一个 avro 模式,我想发展它并使用 topicRecordName 策略。我正在使用融合模式注册表。我想让它向后兼容,并且有必要更改
我正在使用 Avro C++ 库。我有一个使用“解码”函数和字节流作为源获得的二进制对象。我需要将其转换为 json 字符串、对象模式...
我正在尝试读取 pyspark 中的 avro 文件,但遇到错误: 我的机器上的 Spark 版本:3.5.0 我的机器上的 python 版本: 我已经使用以下参数启动了 pyspark 会话: pyspark --packa...
嗨,我在 Kafka 和 .Net 工作。我正在尝试生成如下消息。我已经为架构注册表和生产者创建了配置,如下所示。我正在使用证书连接到架构注册表...
如何在 Gradle 构建过程中从 Avro 模式生成 Java 类?
对于 Maven,有一个官方的 Avro 插件可以从 Avro 模式生成 Java 类。 然而,Gradle 不存在官方插件。 有 davidmc24/gradle-avro-plugin,但不再了
我使用其 avro 模式对字典进行 JSON 编码,使其成为字符串化 JSON: 导入 json 从日期时间导入日期时间 进口法斯塔夫罗 消息={ “名称”:“任何”, ...
有时为字段提供上下文信息很有用。 其他协议(例如 protobuf)通过它们所谓的 Option 来支持注释,该选项允许进行以下自定义(uuid 字段
如何使用 Avro SchemaBuilder 类设置 null 默认值?
我正在以编程方式创建架构,这是一个要求,例如使用 SchemaBuilder: Schema s = SchemaBuilder.record(collectionName).namespace("com.company.something").fields()...
PoJo 到 Avro 序列化抛出 KryoException:java.lang.UnsupportedOperationException
我的单元测试在 Flink 1.11.2 和 parquet-avro 1.10.0 下运行正常,一旦我使用 parquet-avro 1.12.0 升级到 1.12.0,我的单元测试将抛出异常 com.esotericsoftware.kryo.KryoException:java.lang.