Confluence Replicator 未将标头传递到 Converter#fromConnectData

问题描述 投票:0回答:1

tl;博士;如何实现使用标头的 Kafka 转换器?
(使用 Confluence Replicator 时)

我制作了一个自定义的 Kafka Connect Converter,据我了解,

toConnectData
在反序列化消息时使用。

界面中有 2 个函数,第二个包含 Headers 并提到它是 Connect 系统要调用的函数,而第一个存在是为了向后兼容。

两个接口:

    byte[] fromConnectData(String topic, Schema schema, Object value);
    byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)

接口参考: https://github.com/apache/kafka/blob/1eb7644349cb07139d6a3c1ad1986979647cac99/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L52-L68

实际上,我找到了第一个来代替 - 对于我的用例,我需要标头来执行该功能。

转换器实现示例

package com.example;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;

public class ExampleConverter implements Converter {

    ...

    @Override
    public SchemaAndValue toConnectData(String topic, byte[] value) {
        throw new RuntimeException("headers not supplied, these are required in order to decrypt");
    }

    @Override
    public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
        return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
    }

}

我使用 Confluence 的 Connect 容器映像运行此转换器

confluentinc/cp-enterprise-replicator:7.7.0

我收到以下错误 - 这清楚地表明它正在调用旧的(已弃用?)函数,没有标题:

java.lang.RuntimeException: headers not supplied, these are required in order to decrypt
    at com.example.ExampleConverter.toConnectData(ExampleConverter.java:50)
    at io.confluent.connect.replicator.ReplicatorSourceTask.convertKeyValue(ReplicatorSourceTask.java:637)
    at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:536)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:488)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:360)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

寻求建议——我做错了什么吗?

java apache-kafka apache-kafka-connect confluent-platform
1个回答
0
投票

Confluence 的响应 — Confluence Replicator 不支持此功能

感谢您联系 Confluence。 你是对的,转换器 接口有 2 个实现 fromConnectData() 或 连接数据():

value); byte[] fromConnectData(String topic, Headers headers, Schema
schema, Object value)

toConnectData(String topic, byte[] value) toConnectData(String topic,
Headers headers, byte[] value)

Replicator 调用 fromConnectData(),然后调用 toConnectData(),无需 使用 headers 这就是为什么 toConnectData(String topic, Headers headers, byte[] value) 未被调用。如果您想执行自定义转换 在标题上,您可以使用 set

header.converter=com.example.ExampleConverter
代替。

© www.soinside.com 2019 - 2024. All rights reserved.