我在
Postgres
中创建了一个表:
CREATE TABLE IF NOT EXISTS public.sample_a
(
id text COLLATE pg_catalog."default" NOT NULL,
is_active boolean NOT NULL,
is_deleted boolean NOT NULL,
created_by integer NOT NULL,
created_at timestamp with time zone NOT NULL,
created_ip character varying(30) COLLATE pg_catalog."default" NOT NULL,
created_dept_id integer NOT NULL,
updated_by integer,
updated_at timestamp with time zone,
updated_ip character varying(30) COLLATE pg_catalog."default",
updated_dept_id integer,
deleted_by integer,
deleted_at timestamp with time zone,
deleted_ip character varying(30) COLLATE pg_catalog."default",
deleted_dept_id integer,
sql_id bigint NOT NULL,
ipa_no character varying(30) COLLATE pg_catalog."default" NOT NULL,
pe_id bigint NOT NULL,
uid character varying(30) COLLATE pg_catalog."default" NOT NULL,
mr_no character varying(15) COLLATE pg_catalog."default" NOT NULL,
site_id integer NOT NULL,
entered_date date NOT NULL,
CONSTRAINT pk_patient_dilation PRIMARY KEY (id)
);
我已插入数据如下:
INSERT INTO sample_a (id, is_active, is_deleted, created_by, created_at, created_ip, created_dept_id, updated_by, updated_at, updated_ip, updated_dept_id, deleted_by, deleted_at, deleted_ip, deleted_dept_id, sql_id, ipa_no, pe_id, uid, mr_no, site_id, entered_date)
VALUES ('00037167-0894-4373-9a56-44c49d2285c9', TRUE, FALSE, 70516, '2024-10-05 08:12:25.069941+00','10.160.0.76', 4, 70516, '2024-10-05 09:25:55.218961+00', '10.84.0.1',4,NULL, NULL, NULL, NULL, 0,0,165587147,'22516767','P5942023',1,'10/5/24');
现在,我已经创建了
JDBC source connector
配置,如下所示:
{
"name": "JdbcSourceConnectorConnector_0",
"config": {
"name": "JdbcSourceConnectorConnector_0",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.user": "postgres",
"connection.password": "********",
"table.whitelist": "sample_a",
"mode": "bulk"
}
}
因此,当数据从数据库推送到 Kafka Topic 时,我可以在 Kafka Control Center 选项卡中看到可读格式的数据。由于我使用的是
bulk
模式,因此数据会持续加载。
我的问题是当我通过
Pyspark
获取数据时,它不可读:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
spark = SparkSession \
.builder \
.appName("Kafka_Test") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sample_a") \
.option("startingOffsets","latest") \
.load()
df.selectExpr("cast(value as string) as value").writeStream.format("console").start()
spark.streams.awaitAnyTermination()
输出:
H00037167-0894-4373-9a56-44c49d2285c9?ڹ??d10.160.0.7??????d10.84.0.0????22516767P5942023¸
那么我可以访问特定属性吗?我需要任何解串器类吗?
TIA。
我认为值类型不是字符串,而是 Avro 记录。 这意味着您不能像以前那样使用字符串转换来读取字节数组:
cast(value as string) as value
相反,您需要遵循 avro 文档 并执行以下操作:
from pyspark.sql.avro.functions import from_avro, to_avro
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/sample_a_schema.avsc", "r").read()
df.select(from_avro("value", jsonFormatSchema).alias("sample_a"))\
.select("sample_a.*")\
.writeStream\
.format("console")\
.start()
spark.streams.awaitAnyTermination()
我还没有测试过,但似乎就是这样......
希望这有帮助!