pyspark addPyFile 添加 .py 文件的 zip,但仍然找不到模块

问题描述 投票:0回答:3

使用

addPyFiles()
似乎没有将所需的文件添加到spark作业节点(对于spark来说是新手,所以这里可能缺少一些基本的使用知识)。

尝试使用 pyspark 运行脚本,并看到错误,指出找不到某些可导入的模块。以前从未使用过spark,但其他帖子(来自有问题的包https://github.com/cerndb/dist-keras/issues/36#issuecomment-378918484https://stackoverflow.com/a/39779271/ 8236733)建议压缩模块并通过

sparkContext.addPyFiles(mymodulefiles.zip)
添加到 Spark 作业,但仍然出现错误。相关代码片段是...

from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *

(我在这里导入的包可以在 https://github.com/cerndb/dist-keras 找到),

conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)  #master='yarn-client'
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

if using_spark_2:
    from pyspark.sql import SparkSession

    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
    sc.sparkContext.addPyFile("/home/me/Downloads/distkeras.zip") # see https://github.com/cerndb/dist-keras/issues/36#issuecomment-378918484 and https://forums.databricks.com/answers/10207/view.html
    print sc.version

(distkeras.zip 是该目录的压缩文件:https://github.com/cerndb/dist-keras/tree/master/distkeras),以及

transformer = OneHotTransformer(output_dim=nb_classes, input_col="label_index", output_col="label")
dataset = transformer.transform(dataset)

"""throwing error...
.....
  File "/opt/mapr/spark/spark-2.1.0/python/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj)
ImportError: No module named distkeras.utils

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
.....
"""

从文档和示例中我可以找到(http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.SparkContext.addPyFilehttps://forums.databricks .com/questions/10193/the-proper-way-to-add-in-dependency-py-files.html),上面的代码似乎应该对我有用(同样,以前从未使用过 Spark)。有人知道我在这里做错了什么吗?可以发布更多对调试有用的信息吗?

apache-spark pyspark
3个回答
30
投票

已解决问题。诚然,解决方案并不完全与火花相关,但为了可能有类似问题的其他人而留下问题,因为给定的错误消息并没有从一开始就完全清楚我的错误。

TLDR:确保加载的 zip 文件的包内容(因此它们应该在每个目录中包含 __init.py__。)的结构和命名方式符合您的代码所需的方式。


我试图通过 zip 加载到 Spark 上下文中的包的形式为

mypkg
    file1.py
    file2.py
    subpkg1
        file11.py
    subpkg2
        file21.py

运行时我的拉链

less mypkg.zip
,显示了

file1.py file2.py subpkg1 subpkg2

所以这里有两件事是错误的。

  1. 没有压缩顶级目录。这是编码期望使用的主包
  2. 没有压缩较低级别的目录。

解决了

zip -r mypkg.zip mypkg

更具体地说,必须制作 2 个 zip 文件

  1. 对于 dist-keras 包:

    cd dist-keras; zip -r distkeras.zip distkeras

参见https://github.com/cerndb/dist-keras/tree/master/distkeras

  1. 对于 distkeras 使用的 keras 软件包(未跨集群安装):

    cd keras; zip -r keras.zip keras

参见https://github.com/keras-team/keras/tree/master/keras

所以声明 Spark 会话看起来像

conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)  #master='yarn-client'
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    from pyspark.sql import SparkSession

    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
    sc.sparkContext.addPyFile("/home/me/projects/keras-projects/exploring-keras/keras-dist_test/dist-keras/distkeras.zip")
    sc.sparkContext.addPyFile("/home/me/projects/keras-projects/exploring-keras/keras-dist_test/keras/keras.zip")
    print sc.version

6
投票

如果您的模块如下

我的模块
- init.py
-spark1.py
-spark2.py

不要进入 myModule 文件夹并添加到 zip 中。您提到的这个错误。

相反,请转到 myModule 文件夹之外。 右键单击并将 myModule 文件夹添加到 zip 并指定另一个名称。

这个想法是,当 Spark 提取你的 zip 时,应该存在具有相同名称和层次结构的 myModule 文件夹


0
投票

上述解决方案都不适合我,我仍然遇到 ModuleCannotFind 错误。 对于其他开发人员,我是这样解决的: 假设我有一个如下代码文件夹

mypkg
    __init__.py # you MUST have this in your module folder root.
    file1.py
    file2.py # note in file2.py calling method from file1 should be from mypke.file1 import func_from_file1

我所做的只是使用

addFile

sc.addFile("your_path_to_mypkg_parent/mypkg/",  recursive=True)

然后(如果你使用jupyter notebook和sparkmagic

%%spark
访问该模块,则需要

%%spark
from mypkg import file1
from mypke import file2 as f2

foo = f2.func_from_file2(xx)
© www.soinside.com 2019 - 2024. All rights reserved.