这几天我一直在兜圈子。我正在使用 kafkajs 向 Kafka 发送数据。每次我生成一条消息时,我都会为 message.key 值分配一个 UUID,并将 message.value 设置为这样的事件,然后进行字符串化:
// the producer is written in typescript
const event = {
eventtype: "event1",
eventversion: "1.0.1",
sourceurl: "https://some-url.com/source"
};
// stringified because the kafkajs producer only accepts `string` or `Buffer`
const stringifiedEvent = JSON.stringify(event);
我使用以下配置启动我的独立连接 JDBC 接收器连接器:
# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://postgres:5432/eventservice
connection.password=postgres
connection.user=postgres
auto.create=true
auto.evolve=true
topics=topic1
tasks.max=1
insert.mode=upsert
pk.mode=record_key
pk.fields=id
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
value.converter.schema.registry.url=http://schema-registry:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
bootstrap.servers=localhost:9092
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
当我使用 connect-standalone worker.properties connect-standalone.properties 启动连接器时,它会启动并毫无问题地连接到 PostgreSQL。但是,当我生成一个事件时,它失败并显示以下错误消息:
WorkerSinkTask{id=local-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception.
Task is being killed and will not recover until manually restarted. Error: Sink connector 'local-jdbc-sink-
connector' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records
with a non-null Struct value and non-null Struct schema, but found record at (topic='topic1',partition=0,offset=0,timestamp=1676309784254) with a HashMap value and null value schema.
(org.apache.kafka.connect.runtime.WorkerSinkTask:609)
有了这个堆栈跟踪:
org.apache.kafka.connect.errors.ConnectException: Sink connector 'local-jdbc-sink-connector' is configured with
'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and
non-null Struct schema, but found record at (topic='txningestion2',partition=0,offset=0,timestamp=1676309784254)
with a HashMap value and null value schema.
at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
at io.confluent.connect.jdbc.sink.RecordValidator.lambda$and$1(RecordValidator.java:41)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我一直在来回尝试让它阅读我的消息,但我不确定出了什么问题。一个解决方案只会导致另一个错误,而新错误的解决方案又会导致上一个错误。什么是正确的配置?我该如何解决?
为了读取和解析 JSON 数据然后将其写入 PostgreSQL,您需要传递模式。首先,你需要改变你的 Kafka 消息值结构:
{
"schema": { "..." }
"payload": { "..." }
};
模式是 JSON 模式,它定义了有效负载将包含的内容。例如,顶级字段
schema
将包含以下内容:
{
"type": "struct",
"name": "schema-name",
"fields": [
{ "type": "string", "optional" false, "field": "source" },
{ "type": "string", "optional" false, "field": "message" }
]
}
payload
顶级字段将包含类似于以下内容的内容:
{
"source": "https://example.com/",
"message": "establishing connection..."
}
然后您可以将其传递给您的
kafkajs
制作人:
producer.send({
topic: "topic1",
messages: [
{
key: "key",
value: {
schema: {
type: "struct",
name: "schema-name",
fields: [
{ type: "string", optional false, field: "source" },
{ type: "string", optional false, field: "message" }
]
},
payload: {
source: "https://example.com/",
message: "establishing connection..."
}
}
}
]
});
现在消息已配置,您需要对您的
worker.properties
文件进行这些更改:
# key converters are just for strings, because the Kafka message key is a string
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
# values are in JSON, and a schema is passed, so "schemas.enable" must be "true"
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
您需要对您的
connect-standalone.properties
文件进行这些更改:
# this must be insert, because "upsert" requires that a
# primary key be provided by the message, either by the Kafka
# message value or key
insert.mode=insert
# pk.mode=none because you are not writing a primary key
# using the sink connector - each record generates a serial PK value
pk.mode=none
# delete.enabled=false because we are not treating null fields as deletes
delete.enabled=false
进行这些更改,您的配置文件将如下所示:
# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://<host>:<port>/<database>
connection.password=<password>
connection.user=<user>
auto.create=true
auto.evolve=true
topics=<topics>
tasks.max=1
insert.mode=insert
delete.enabled=false
pk.mode=none
consumer.auto.offset.reset=latest
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
bootstrap.servers=<brokers>
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1
consumer.auto.offset.reset=latest
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
这将使您的生产者能够发送到您的集群,并且接收器连接器将能够读取和解析您的 JSON 值并将它们写入 Postgres。如需更多详细信息,您的数据库表将如下所示:
CREATE TABLE IF NOT EXISTS table1(
id SERIAL PRIMARY KEY,
origin varchar(132),
message text
);