多个 Spark 结构化流作业使用相同 Kafka 主题的问题

问题描述 投票:0回答:1

我有两个单独的 Python 脚本(job1.py 和 job2.py),它们使用 Spark 结构化流来使用来自 Kafka 主题 test1 的数据。两个脚本都配置了相同的 Kafka 消费者组 (consumer-group-1),期望任何时候只有其中一个应该消费数据。但是,当我同时运行两个脚本并使用 bin/kafka-console- Producer.sh 将数据发送到 test1 时,两个作业似乎处理相同的数据。

job1.py:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
import signal
import sys

checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"

spark = SparkSession.builder \
    .appName("KafkaConsumer1") \
    .getOrCreate()

spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")

shutdown_requested = False

def shutdown_handler(signum, frame):
    global shutdown_requested
    print("Graceful shutdown initiated...")
    shutdown_requested = True
    query.stop()  

signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", "test1") \
    .option("startingOffsets", "latest") \
    .option("kafka.group.id", "consumer-group-1") \
    .load()

df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))

query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", f"{checkpoint_dir}/wordcount_dta") \
    .start()

try:
    query.awaitTermination()
except Exception as e:
    print(f"Exception encountered: {e}")

job2.py:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
import signal
import sys

checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"

spark = SparkSession.builder \
    .appName("KafkaConsumer2") \
    .getOrCreate()

spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")

shutdown_requested = False

def shutdown_handler(signum, frame):
    global shutdown_requested
    print("Graceful shutdown initiated...")
    shutdown_requested = True
    query.stop()  

signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", "test1") \
    .option("startingOffsets", "latest") \
    .option("kafka.group.id", "consumer-group-1") \
    .load()

df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))

query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", f"{checkpoint_dir}/wordcount") \
    .start()

try:
    query.awaitTermination()
except Exception as e:
    print(f"Exception encountered: {e}")

重现步骤:

同时启动 job1.py 和 job2.py。使用 bin/kafka-console- Producer.sh 向 Kafka 主题 test1 发送消息。

预期行为: 由于使用相同的消费者组 (consumer-group-1),因此只有一个 Spark 作业(job1.py 或 job2.py)应该消费来自 Kafka 主题的消息。

实际行为:两个 Spark 作业同时处理相同的消息,这不是共享消费者组的预期行为。

为什么两个 Spark 结构化流作业处理来自 Kafka 的相同数据,即使它们使用相同的消费者组?如何确保任何时候只有一个作业处理来自 Kafka 主题的消息,从而允许无缝更新,这样我就可以使用更新的查询启动新作业并停止旧作业,而不会中断或重复?

apache-spark pyspark apache-kafka-streams spark-structured-streaming spark-streaming-kafka
1个回答
0
投票

由于 Spark 结构化流的性质,两个作业最终可能会同时消耗主题的所有分区,尤其是当它们同时启动时。

做,

  1. 首先启动job1.py,让它建立Kafka消费者组并开始处理。
  2. 仅在 job1.py 成功启动并正在积极处理数据后才启动 job2.py。

这种顺序启动有助于 Kafka 在作业之间正确分配分区。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.