我正在开发一个 PySpark 结构化流作业,该作业从 Kafka 主题读取数据并实时处理它。我想使用信号处理实现正常关闭,但当我尝试停止流查询时遇到错误。
代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
import signal
import sys
checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"
spark = SparkSession.builder \
.appName("KafkaConsumer1") \
.getOrCreate()
spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")
consumer_group_id = "consumer-group-1"
shutdown_requested = False
def shutdown_handler(signum, frame):
global shutdown_requested
print("Graceful shutdown initiated...")
shutdown_requested = True
query.stop()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("assign", '{"sample-topic":[0]}') \
.option("startingOffsets", "latest") \
.option("kafka.group.id", consumer_group_id) \
.load()
df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))
query = df.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", f"{checkpoint_dir}/c1_consumer") \
.start()
try:
query.awaitTermination()
except Exception as e:
print(f"Exception encountered: {e}")
`
当我按 Ctrl + C 启动正常关闭时,出现以下错误:
错误:
root: Exception while sending command.
Traceback (most recent call last):
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=3>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
File "/usr/lib/python3.8/socket.py", line 669, in readinto
return self._sock.recv_into(b)
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/context.py", line 381, in signal_handler
self.cancelAllJobs()
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/context.py", line 2446, in cancelAllJobs
self._jsc.sc().cancelAllJobs()
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o27.sc
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
有人可以帮助我理解为什么会出现此错误以及如何修复它吗?
您看到的错误可能是由于 PySpark 驱动程序不支持您的信号处理方式 尝试使用它。在您的脚本中,您正在尝试处理 SIGINT 和 SIGTERM 信号并停止正在运行的 Spark 流查询。然而, PySpark 的驱动程序已经为这些设置了信号处理程序 信号(以及其他信号)以清理自己的资源 当收到信号时。
当您设置自己的信号处理程序时,它会覆盖由 PySpark。然后,当收到信号时,您的信号处理程序是 调用。您的信号处理程序尝试停止 Spark 流 查询,需要向运行 Spark 的 JVM 发送命令 应用。然而,与此同时,PySpark驱动程序是 还尝试清理其资源以响应该信号。这 导致不支持的可重入调用,因此 运行时错误。
要解决此问题,您可以向 JVM 添加关闭挂钩 运行您的 Spark 应用程序以在以下情况下停止流式查询: JVM 正在关闭。以下是如何执行此操作的示例:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
from py4j.java_gateway import java_import
checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"
spark = SparkSession.builder \
.appName("KafkaConsumer1") \
.getOrCreate()
spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")
consumer_group_id = "consumer-group-1"
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("assign", '{"sample-topic":[0]}') \
.option("startingOffsets", "latest") \
.option("kafka.group.id", consumer_group_id) \
.load()
df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))
query = df.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", f"{checkpoint_dir}/c1_consumer") \
.start()
java_import(spark._jvm, 'org.apache.spark.sql.streaming.StreamingQuery')
def stop_query():
query.stop()
shutdownHookManager = spark._jvm.org.apache.spark.util.ShutdownHookManager
shutdownHookManager.addShutdownHook(stop_query)
query.awaitTermination()
该脚本使用 PySpark 的 JavaGateway 从 JVM 导入类 并添加一个停止流查询的关闭钩子。关闭 当 JVM 关闭时会调用 hook,这会发生 当 PySpark 驱动程序响应信号而退出时。