tl;博士;如何实现使用标头的 Kafka 转换器?
(使用 Confluence Replicator 时)
我制作了一个自定义的 Kafka Connect Converter,据我了解,
toConnectData
在反序列化消息时使用。
界面中有 2 个函数,第二个包含 Headers 并提到它是 Connect 系统要调用的函数,而第一个存在是为了向后兼容。
两个接口:
byte[] fromConnectData(String topic, Schema schema, Object value);
byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)
实际上,我找到了第一个来代替 - 对于我的用例,我需要标头来执行该功能。
转换器实现示例
package com.example;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
public class ExampleConverter implements Converter {
...
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
throw new RuntimeException("headers not supplied, these are required in order to decrypt");
}
@Override
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
}
}
我使用 Confluence 的 Connect 容器映像运行此转换器
confluentinc/cp-enterprise-replicator:7.7.0
我收到以下错误 - 这清楚地表明它正在调用旧的(已弃用?)函数,没有标题:
java.lang.RuntimeException: headers not supplied, these are required in order to decrypt
at com.example.ExampleConverter.toConnectData(ExampleConverter.java:50)
at io.confluent.connect.replicator.ReplicatorSourceTask.convertKeyValue(ReplicatorSourceTask.java:637)
at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:536)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:488)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:360)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
寻求建议——我做错了什么吗?
Confluence 的响应 — Confluence Replicator 不支持此功能
感谢您联系 Confluence。 你是对的,转换器 接口有 2 个实现 fromConnectData() 或 连接数据():
value); byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) toConnectData(String topic, byte[] value) toConnectData(String topic, Headers headers, byte[] value)
Replicator 调用 fromConnectData(),然后调用 toConnectData(),无需 使用 headers 这就是为什么 toConnectData(String topic, Headers headers, byte[] value) 未被调用。如果您想执行自定义转换 在标题上,您可以使用 set
代替。header.converter=com.example.ExampleConverter