无法通过Kafka、JDBC源连接器和pyspark获取正确格式的postgres数据

问题描述 投票:0回答:1

我在

Postgres
中创建了一个表:

CREATE TABLE IF NOT EXISTS public.sample_a
(
    id text COLLATE pg_catalog."default" NOT NULL,
    is_active boolean NOT NULL,
    is_deleted boolean NOT NULL,
    created_by integer NOT NULL,
    created_at timestamp with time zone NOT NULL,
    created_ip character varying(30) COLLATE pg_catalog."default" NOT NULL,
    created_dept_id integer NOT NULL,
    updated_by integer,
    updated_at timestamp with time zone,
    updated_ip character varying(30) COLLATE pg_catalog."default",
    updated_dept_id integer,
    deleted_by integer,
    deleted_at timestamp with time zone,
    deleted_ip character varying(30) COLLATE pg_catalog."default",
    deleted_dept_id integer,
    sql_id bigint NOT NULL,
    ipa_no character varying(30) COLLATE pg_catalog."default" NOT NULL,
    pe_id bigint NOT NULL,
    uid character varying(30) COLLATE pg_catalog."default" NOT NULL,
    mr_no character varying(15) COLLATE pg_catalog."default" NOT NULL,
    site_id integer NOT NULL,
    entered_date date NOT NULL,
    CONSTRAINT pk_patient_dilation PRIMARY KEY (id)
);

我已插入数据如下:

INSERT INTO sample_a (id, is_active, is_deleted, created_by, created_at, created_ip, created_dept_id, updated_by, updated_at, updated_ip, updated_dept_id, deleted_by, deleted_at, deleted_ip, deleted_dept_id, sql_id, ipa_no, pe_id, uid, mr_no, site_id, entered_date)
VALUES ('00037167-0894-4373-9a56-44c49d2285c9', TRUE, FALSE, 70516, '2024-10-05 08:12:25.069941+00','10.160.0.76', 4, 70516, '2024-10-05 09:25:55.218961+00', '10.84.0.1',4,NULL, NULL, NULL, NULL, 0,0,165587147,'22516767','P5942023',1,'10/5/24');

现在,我已经创建了

JDBC source connector
配置,如下所示:

{
  "name": "JdbcSourceConnectorConnector_0",
  "config": {
    "name": "JdbcSourceConnectorConnector_0",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/",
    "connection.user": "postgres",
    "connection.password": "********",
    "table.whitelist": "sample_a",
    "mode": "bulk"
  }
}

因此,当数据从数据库推送到 Kafka Topic 时,我可以在 Kafka Control Center 选项卡中看到可读格式的数据。由于我使用的是

bulk
模式,因此数据会持续加载。

我的问题是当我通过

Pyspark
获取数据时,它不可读:

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col

spark = SparkSession \
    .builder \
    .appName("Kafka_Test") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sample_a") \
    .option("startingOffsets","latest") \
    .load()

df.selectExpr("cast(value as string) as value").writeStream.format("console").start()

spark.streams.awaitAnyTermination()

输出

H00037167-0894-4373-9a56-44c49d2285c9?ڹ??d10.160.0.7??????d10.84.0.0????22516767P5942023¸  

那么我可以访问特定属性吗?我需要任何解串器类吗?

TIA。

postgresql apache-spark pyspark apache-kafka apache-kafka-connect
1个回答
0
投票

我认为值类型不是字符串,而是 Avro 记录。 这意味着您不能像以前那样使用字符串转换来读取字节数组:

cast(value as string) as value

相反,您需要遵循 avro 文档 并执行以下操作:

from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/sample_a_schema.avsc", "r").read()

df.select(from_avro("value", jsonFormatSchema).alias("sample_a"))\
.select("sample_a.*")\
.writeStream\
.format("console")\
.start()

spark.streams.awaitAnyTermination()

我还没有测试过,但似乎就是这样......

希望这有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.