PySpark 结构化流作业的正常关闭会引发 Py4JNetworkError

问题描述 投票:0回答:1

我正在开发一个 PySpark 结构化流作业,该作业从 Kafka 主题读取数据并实时处理它。我想使用信号处理实现正常关闭,但当我尝试停止流查询时遇到错误。

代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
import signal
import sys

checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"

spark = SparkSession.builder \
  .appName("KafkaConsumer1") \
  .getOrCreate()

spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")

consumer_group_id = "consumer-group-1"

shutdown_requested = False

def shutdown_handler(signum, frame):
  global shutdown_requested
  print("Graceful shutdown initiated...")
  shutdown_requested = True
  query.stop()

signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)

df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("assign", '{"sample-topic":[0]}') \
  .option("startingOffsets", "latest") \
  .option("kafka.group.id", consumer_group_id) \
  .load()

df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))

query = df.writeStream \
  .outputMode("append") \
  .format("console") \
  .option("checkpointLocation", f"{checkpoint_dir}/c1_consumer") \
  .start()

try:
  query.awaitTermination()
except Exception as e:
  print(f"Exception encountered: {e}")


`

当我按 Ctrl + C 启动正常关闭时,出现以下错误:

错误:

root: Exception while sending command.
Traceback (most recent call last):
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=3>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/context.py", line 381, in signal_handler
    self.cancelAllJobs()
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/context.py", line 2446, in cancelAllJobs
    self._jsc.sc().cancelAllJobs()
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o27.sc

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/spark_job/Documents/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


有人可以帮助我理解为什么会出现此错误以及如何修复它吗?

apache-spark pyspark apache-kafka-streams spark-structured-streaming py4j
1个回答
0
投票

您看到的错误可能是由于 PySpark 驱动程序不支持您的信号处理方式 尝试使用它。在您的脚本中,您正在尝试处理 SIGINT 和 SIGTERM 信号并停止正在运行的 Spark 流查询。然而, PySpark 的驱动程序已经为这些设置了信号处理程序 信号(以及其他信号)以清理自己的资源 当收到信号时。

当您设置自己的信号处理程序时,它会覆盖由 PySpark。然后,当收到信号时,您的信号处理程序是 调用。您的信号处理程序尝试停止 Spark 流 查询,需要向运行 Spark 的 JVM 发送命令 应用。然而,与此同时,PySpark驱动程序是 还尝试清理其资源以响应该信号。这 导致不支持的可重入调用,因此 运行时错误。

要解决此问题,您可以向 JVM 添加关闭挂钩 运行您的 Spark 应用程序以在以下情况下停止流式查询: JVM 正在关闭。以下是如何执行此操作的示例:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
from py4j.java_gateway import java_import

checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"

spark = SparkSession.builder \
  .appName("KafkaConsumer1") \
  .getOrCreate()

spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")

consumer_group_id = "consumer-group-1"

df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("assign", '{"sample-topic":[0]}') \
  .option("startingOffsets", "latest") \
  .option("kafka.group.id", consumer_group_id) \
  .load()

df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))

query = df.writeStream \
  .outputMode("append") \
  .format("console") \
  .option("checkpointLocation", f"{checkpoint_dir}/c1_consumer") \
  .start()

java_import(spark._jvm, 'org.apache.spark.sql.streaming.StreamingQuery')

def stop_query():
  query.stop()

shutdownHookManager = spark._jvm.org.apache.spark.util.ShutdownHookManager
shutdownHookManager.addShutdownHook(stop_query)

query.awaitTermination()

该脚本使用 PySpark 的 JavaGateway 从 JVM 导入类 并添加一个停止流查询的关闭钩子。关闭 当 JVM 关闭时会调用 hook,这会发生 当 PySpark 驱动程序响应信号而退出时。

© www.soinside.com 2019 - 2024. All rights reserved.