我有两个单独的 Python 脚本(job1.py 和 job2.py),它们使用 Spark 结构化流来使用来自 Kafka 主题 test1 的数据。两个脚本都配置了相同的 Kafka 消费者组 (consumer-group-1),期望任何时候只有其中一个应该消费数据。但是,当我同时运行两个脚本并使用 bin/kafka-console- Producer.sh 将数据发送到 test1 时,两个作业似乎处理相同的数据。
job1.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
import signal
import sys
checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"
spark = SparkSession.builder \
.appName("KafkaConsumer1") \
.getOrCreate()
spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")
shutdown_requested = False
def shutdown_handler(signum, frame):
global shutdown_requested
print("Graceful shutdown initiated...")
shutdown_requested = True
query.stop()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", "test1") \
.option("startingOffsets", "latest") \
.option("kafka.group.id", "consumer-group-1") \
.load()
df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))
query = df.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", f"{checkpoint_dir}/wordcount_dta") \
.start()
try:
query.awaitTermination()
except Exception as e:
print(f"Exception encountered: {e}")
job2.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
import signal
import sys
checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"
spark = SparkSession.builder \
.appName("KafkaConsumer2") \
.getOrCreate()
spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")
shutdown_requested = False
def shutdown_handler(signum, frame):
global shutdown_requested
print("Graceful shutdown initiated...")
shutdown_requested = True
query.stop()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", "test1") \
.option("startingOffsets", "latest") \
.option("kafka.group.id", "consumer-group-1") \
.load()
df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))
query = df.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", f"{checkpoint_dir}/wordcount") \
.start()
try:
query.awaitTermination()
except Exception as e:
print(f"Exception encountered: {e}")
重现步骤:
同时启动 job1.py 和 job2.py。使用 bin/kafka-console- Producer.sh 向 Kafka 主题 test1 发送消息。
预期行为: 由于使用相同的消费者组 (consumer-group-1),因此只有一个 Spark 作业(job1.py 或 job2.py)应该消费来自 Kafka 主题的消息。
实际行为:两个 Spark 作业同时处理相同的消息,这不是共享消费者组的预期行为。
为什么两个 Spark 结构化流作业处理来自 Kafka 的相同数据,即使它们使用相同的消费者组?如何确保任何时候只有一个作业处理来自 Kafka 主题的消息,从而允许无缝更新,这样我就可以使用更新的查询启动新作业并停止旧作业,而不会中断或重复?
由于 Spark 结构化流的性质,两个作业最终可能会同时消耗主题的所有分区,尤其是当它们同时启动时。
做,
这种顺序启动有助于 Kafka 在作业之间正确分配分区。