我使用 pyspark 创建了一个 Spark 流作业,它使用 readStream 从 kafka 主题读取数据,并使用 writeStream 写入 Oracle 数据库中的表。 该作业可以成功地实时读取数据并写入Oracle表,我尝试在运行Spark Streaming作业时生成消息,它能够立即读取数据并将其写入Oracle表。 我的问题是我无法找到从 kafka 消费的 Spark 作业的消费者组 话题。我使用这个命令查找它:
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server myserver --list
但是什么都没有!!而且它也没有显示在kafka UI中!!
这是 Spark 作业的代码:
topic = ""
schema_registry_url = ""
streaming_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "")\
.option("subscribe", topic) \
.option("startingOffsets", "earliest") \
.option("partition", "0") \
.option("groupIdPrefix", "IamTheConsumer")\
.load()
parsed_df=streaming_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
parsed_df.writeStream \
.outputMode("append")\
.foreachBatch(SaveToOracle)\ ##there is a function for this that does its job correctly
.start() \
.awaitTermination()
我本来希望在 kafka UI 或 /opt/kafka/bin/kafka-consumer-groups.sh 中找到这个消费者,但我没有找到任何相关信息。 *考虑到它将数据写入目标表中,因此消耗了作业。
原因可能是spark Streaming作业配置了
enable.auto.commit = false
,请再次检查。
如果enable.auto.commit配置属性为false,那么即使你阅读了消息,你仍然无法在下面的列表中看到你的消费者。
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server myserver --list
如果
enable.auto.commit = true
那么 kafka 将定期接收来自 Spark 流作业的已提交偏移量。