无法将写入的kafka主题中的数据推送到Postgres表

问题描述 投票:0回答:1

我正在尝试将写入 Kafka 主题的数据加载到

Postgres
表中。我可以看到该主题每秒都在接收新消息,而且数据看起来不错。

但是,当我使用以下 JDBC 接收器配置时,它无法加载 Postgres 表中的数据:

{
  "name": "JdbcSinkConnectorConnector_0",
  "config": {
    "name": "JdbcSinkConnectorConnector_0",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "sample_b",
    "connection.url": "jdbc:postgresql://postgres:5432/",
    "connection.user": "postgres",
    "connection.password": "********",
    "insert.mode": "insert",
    "table.name.format": "${topic}",
    "auto.create": "true"
  }
}

将数据写入 Kafka 主题的 Pyspark 代码:

transformed_df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream.outputMode(outputMode='Append').format('kafka') \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sample_b") \
.option("checkpointLocation", "src/checkpointLoc") \
.start() \
.awaitTermination()

当我检查该主题的 Kafka 控制中心选项卡时

sample_b
它提供了以下格式的数据:

{
  "id": "00037167-0894-4373-9a56-44c49d2285c9",
  "is_active": true,
  "is_deleted": false,
  "created_by": 70516,
  "created_at": "2024-10-05T13:42:25.069+05:30",
  "created_ip": "10.160.0.76",
  "created_dept_id": 4,
  "updated_by": 70516,
  "updated_at": "2024-10-05T14:55:55.218+05:30",
  "updated_ip": "10.84.0.1",
  "updated_dept_id": 4,
  "sql_id": 0,
  "ipa_no": "0",
  "pe_id": 165587147,
  "uid": "22516767",
  "mr_no": "P5942023",
  "site_id": 1,
  "entered_date": "2024-10-05"
}

现在,我尝试通过另一个名为

test01
的 Kafka 主题推送数据。我通过 KSQL 创建了主题,并将数据推送到其中,并使用相同的 JDBC 接收器配置,我能够将数据推送到 Postgres 表,没有任何问题。

 CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
    WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO')
 INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('V',4,'EOO');

test01 的数据在 Kafka Control Center 选项卡中如下所示:

{
  "COL1": {
    "int": 4
  },
  "COL2": {
    "string": "EOO"
  }
}

我可以看到两个主题之间的架构差异。那么我需要在

sample_b
主题编写中进行哪些具体更改才能匹配
test01
有效负载格式?

我在

Kafka Connect
日志中没有看到任何错误。

apache-spark pyspark apache-kafka apache-kafka-connect
1个回答
0
投票

SampleB 主题是 JSON,通过使用

to_json
Spark 函数。

test01 主题,是 Avro,通过在 ksqlDB 中使用

VALUE_FORMAT=AVRO


与任何 Kafka 应用程序一样,序列化格式必须在主题两端匹配...Avro 反序列化器不会接受 JSON,反之亦然

无法加载Postgres表中的数据

Connect 服务器具有日志和

/status
API。看看有没有错误。

我怀疑您将 Kafka 连接配置为读取 Avro 数据而不是 JSON

© www.soinside.com 2019 - 2024. All rights reserved.