我正在尝试使用以下代码通过
KSQL
将数据推送到Kafka主题:
CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('X',1,'FOO');
INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Y',2,'BAR');
现在,当我尝试运行下面的 Pyspark 代码并尝试获取数据并在控制台上打印时,它会打印该值以及一些垃圾值
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.appName("Kafka_Test") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test01") \
.option("startingOffsets","earliest") \
.load()
df.selectExpr("cast(value as string) as value").writeStream.outputMode("append").format("console").start()
Output
:
如何更好地格式化它?
与我写的here非常相似 - 你正在尝试解析一个字符串 - 当你正在编写 avro 记录时。
在您的代码中指定:
CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
您可以看到您指定了 VALUE_FORMAT='AVRO'
这意味着您需要将消息作为 avro 记录读取 - 而不是字符串。
from pyspark.sql.avro.functions import from_avro, to_avro
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/test01.avsc", "r").read()
df.select(from_avro("value", jsonFormatSchema).alias("test01"))\
.select("test01.*")\
.writeStream\
.outputMode("append")
.format("console")\
.start()
spark.streams.awaitAnyTermination()
希望这有帮助!