使用此处中的图片来描述我的问题。
如上,python UDF函数是在worker节点的python进程中执行的。 在 Executor(JVM) 进程启动之前,我创建了一个管道并向其写入消息,如下所示:
import os, sys
print ("-----------")
r, w = os.pipe()
os.write(w, b"Hello, world\n")
os.close(w)
os.environ["PIPE_READ_FD"] = str(r)
在Executor(JVM)进程启动后,我尝试获取读取文件描述符和读取消息,这可以正常工作:
pipe_read_fd_str = os.environ.get("PIPE_READ_FD", "-1")
print("--------- pipe_read_fd_str=" + pipe_read_fd_str)
r = int(pipe_read_fd_str)
r = os.fdopen(r)
print("-------- Read text:", r.read())
r.close()
但是如果我在 Python 进程中运行的 UDF 函数中执行相同的工作,我得到了
[Errno 9] 错误的文件描述符
问题: Executor(JVM)进程启动的Python进程是否关闭了读描述符? 我检查了源代码,没有找到任何线索,有人可以分享一些关于此的信息吗?
当您尝试从 PySpark UDF 中的管道读取时,您会遇到 [Errno 9] Bad 文件描述符错误。发生这种情况是因为在主 Python 进程中使用 os.pipe() 创建的文件描述符在 UDF 中不可访问。
Spark 执行器是工作节点上的独立进程。创建 UDF 时,Python 代码将在 Spark 执行器生成的新 Python 进程中执行。文件描述符不被子进程继承。这意味着在主进程中创建的文件描述符在UDF的进程中不存在。