我正在尝试将写入 Kafka 主题的数据加载到
Postgres
表中。我可以看到该主题每秒都在接收新消息,而且数据看起来不错。
但是,当我使用以下 JDBC 接收器配置时,它无法加载 Postgres 表中的数据:
{
"name": "JdbcSinkConnectorConnector_0",
"config": {
"name": "JdbcSinkConnectorConnector_0",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "sample_b",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.user": "postgres",
"connection.password": "********",
"insert.mode": "insert",
"table.name.format": "${topic}",
"auto.create": "true"
}
}
将数据写入 Kafka 主题的 Pyspark 代码:
transformed_df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream.outputMode(outputMode='Append').format('kafka') \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sample_b") \
.option("checkpointLocation", "src/checkpointLoc") \
.start() \
.awaitTermination()
当我检查该主题的 Kafka 控制中心选项卡时
sample_b
它提供了以下格式的数据:
{
"id": "00037167-0894-4373-9a56-44c49d2285c9",
"is_active": true,
"is_deleted": false,
"created_by": 70516,
"created_at": "2024-10-05T13:42:25.069+05:30",
"created_ip": "10.160.0.76",
"created_dept_id": 4,
"updated_by": 70516,
"updated_at": "2024-10-05T14:55:55.218+05:30",
"updated_ip": "10.84.0.1",
"updated_dept_id": 4,
"sql_id": 0,
"ipa_no": "0",
"pe_id": 165587147,
"uid": "22516767",
"mr_no": "P5942023",
"site_id": 1,
"entered_date": "2024-10-05"
}
现在,我尝试通过另一个名为
test01
的 Kafka 主题推送数据。我通过 KSQL 创建了主题,并将数据推送到其中,并使用相同的 JDBC 接收器配置,我能够将数据推送到 Postgres 表,没有任何问题。
CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO')
INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('V',4,'EOO');
test01 的数据在 Kafka Control Center 选项卡中如下所示:
{
"COL1": {
"int": 4
},
"COL2": {
"string": "EOO"
}
}
我可以看到两个主题之间的架构差异。那么我需要在
sample_b
主题编写中进行哪些具体更改才能匹配 test01
有效负载格式?
我在
Kafka Connect
日志中没有看到任何错误。
SampleB 主题是 JSON,通过使用
to_json
Spark 函数。
test01 主题,是 Avro,通过在 ksqlDB 中使用
VALUE_FORMAT=AVRO
。
与任何 Kafka 应用程序一样,序列化格式必须在主题两端匹配...Avro 反序列化器不会接受 JSON,反之亦然
无法加载Postgres表中的数据
Connect 服务器具有日志和
/status
API。看看有没有错误。
我怀疑您将 Kafka 连接配置为读取 Avro 数据而不是 JSON