使用 MSK 连接器从 MSK 流式传输到 RDS PostgreSQL

问题描述 投票:0回答:1

这几天我一直在兜圈子。我正在使用 kafkajs 向 Kafka 发送数据。每次我生成一条消息时,我都会为 message.key 值分配一个 UUID,并将 message.value 设置为这样的事件,然后进行字符串化:

// the producer is written in typescript
const event = {
    eventtype: "event1",
    eventversion: "1.0.1",
    sourceurl: "https://some-url.com/source"
};
// stringified because the kafkajs producer only accepts `string` or `Buffer`
const stringifiedEvent = JSON.stringify(event);

我使用以下配置启动我的独立连接 JDBC 接收器连接器:

# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://postgres:5432/eventservice
connection.password=postgres
connection.user=postgres

auto.create=true
auto.evolve=true
topics=topic1
tasks.max=1
insert.mode=upsert
pk.mode=record_key
pk.fields=id
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
value.converter.schema.registry.url=http://schema-registry:8081 

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

bootstrap.servers=localhost:9092
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1

当我使用 connect-standalone worker.properties connect-standalone.properties 启动连接器时,它会启动并毫无问题地连接到 PostgreSQL。但是,当我生成一个事件时,它失败并显示以下错误消息:

WorkerSinkTask{id=local-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted. Error: Sink connector 'local-jdbc-sink-
connector' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records 
with a non-null Struct value and non-null Struct schema, but found record at (topic='topic1',partition=0,offset=0,timestamp=1676309784254) with a HashMap value and null value schema. 
(org.apache.kafka.connect.runtime.WorkerSinkTask:609)

有了这个堆栈跟踪:

org.apache.kafka.connect.errors.ConnectException: Sink connector 'local-jdbc-sink-connector' is configured with 
'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and 
non-null Struct schema, but found record at (topic='txningestion2',partition=0,offset=0,timestamp=1676309784254) 
with a HashMap value and null value schema.
    at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
    at io.confluent.connect.jdbc.sink.RecordValidator.lambda$and$1(RecordValidator.java:41)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

我一直在来回尝试让它阅读我的消息,但我不确定出了什么问题。一个解决方案只会导致另一个错误,而新错误的解决方案又会导致上一个错误。什么是正确的配置?我该如何解决?

jdbc apache-kafka apache-kafka-connect kafkajs
1个回答
0
投票

为了读取和解析 JSON 数据然后将其写入 PostgreSQL,您需要传递模式。首先,你需要改变你的 Kafka 消息值结构:

{
  "schema": { "..." }
  "payload": { "..." }
};

模式是 JSON 模式,它定义了有效负载将包含的内容。例如,顶级字段

schema
将包含以下内容:

{
  "type": "struct",
  "name": "schema-name",
  "fields": [
    { "type": "string", "optional" false, "field": "source" },
    { "type": "string", "optional" false, "field": "message" }
  ]
}

payload
顶级字段将包含类似于以下内容的内容:

{
  "source": "https://example.com/",
  "message": "establishing connection..."
}

然后您可以将其传递给您的

kafkajs
制作人:

producer.send({
  topic: "topic1",
  messages: [
    { 
      key: "key", 
      value: {
        schema: {
          type: "struct",
          name: "schema-name",
          fields: [
            { type: "string", optional false, field: "source" },
            { type: "string", optional false, field: "message" }
          ]
        },
        payload: {
          source: "https://example.com/",
          message: "establishing connection..."
        }
      }
    }
  ]  
});

现在消息已配置,您需要对您的

worker.properties
文件进行这些更改:

# key converters are just for strings, because the Kafka message key is a string
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

# values are in JSON, and a schema is passed, so "schemas.enable" must be "true"
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

您需要对您的

connect-standalone.properties
文件进行这些更改:

# this must be insert, because "upsert" requires that a 
# primary key be provided by the message, either by the Kafka 
# message value or key
insert.mode=insert

# pk.mode=none because you are not writing a primary key 
# using the sink connector - each record generates a serial PK value
pk.mode=none

# delete.enabled=false because we are not treating null fields as deletes
delete.enabled=false

进行这些更改,您的配置文件将如下所示:

# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://<host>:<port>/<database>
connection.password=<password>
connection.user=<user>
auto.create=true
auto.evolve=true
topics=<topics>
tasks.max=1
insert.mode=insert
delete.enabled=false
pk.mode=none
consumer.auto.offset.reset=latest
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

bootstrap.servers=<brokers>
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1
consumer.auto.offset.reset=latest

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1

这将使您的生产者能够发送到您的集群,并且接收器连接器将能够读取和解析您的 JSON 值并将它们写入 Postgres。如需更多详细信息,您的数据库表将如下所示:

CREATE TABLE IF NOT EXISTS table1(
  id SERIAL PRIMARY KEY,
  origin varchar(132),
  message text
);
© www.soinside.com 2019 - 2024. All rights reserved.