无法通过pyspark格式化kafka主题数据

问题描述 投票:0回答:1

我正在尝试使用以下代码通过

KSQL
将数据推送到Kafka主题:

CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
  WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');

INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('X',1,'FOO');
INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Y',2,'BAR');

现在,当我尝试运行下面的 Pyspark 代码并尝试获取数据并在控制台上打印时,它会打印该值以及一些垃圾值

from pyspark.sql.session import SparkSession
spark = SparkSession \
    .builder \
    .appName("Kafka_Test") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test01") \
    .option("startingOffsets","earliest") \
    .load()

df.selectExpr("cast(value as string) as value").writeStream.outputMode("append").format("console").start()

Output

enter image description here

如何更好地格式化它?

pyspark apache-kafka
1个回答
0
投票

与我写的here非常相似 - 你正在尝试解析一个字符串 - 当你正在编写 avro 记录时。

在您的代码中指定:

CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
  WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');

您可以看到您指定了 VALUE_FORMAT='AVRO'

这意味着您需要将消息作为 avro 记录读取 - 而不是字符串。

from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/test01.avsc", "r").read()

df.select(from_avro("value", jsonFormatSchema).alias("test01"))\
.select("test01.*")\
.writeStream\
.outputMode("append")
.format("console")\
.start()

spark.streams.awaitAnyTermination()

希望这有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.