我有一个使用Sagemaker处理来使用Spark的业务需求,我需要使用pandas、numpy、gensim和sklearn运行分布式代码。我生成了所有已安装包的 .zip 文件并将其放在 s3 上,然后将该文件作为输入参数传递到我的 PySpark 进程运行方法中。但我面临一个问题,spark 不承认 gensim 和 sklearn。我可以毫无问题地加载 pandas 和 numpy。但对于 gensim 和 sklearn 我得到错误:
No module name sklearn
No module name gensim
我尝试分4步解决这个问题:
numpy==1.23.5
pandas==1.5.3
scikit-learn==1.2.2
pyarrow==11.0.0
gensim==4.3.1
cd /home/ec2-user/SageMaker/python_dependencies
pip install -r requirements.txt -t ./packages"
zip -r dependencies.zip ./packages
aws s3 cp dependencies.zip s3://data-science/code/dependencies/
#Define the PySparkProcessor:
spark_processor = PySparkProcessor(
base_job_name="sm-spark",
framework_version="3.1",
role=role,
instance_count=default_instance_count, # Adjust the instance count as needed
instance_type=default_instance, # Adjust the instance type as needed
max_runtime_in_seconds=1200)
# Setting input bucket:
input_bucket = 'data-science”
# Define the number of records wanted:
number = "100" # Change this as needed
# Run the Spark job:
spark_processor.run(
submit_app="process.py",
arguments=[input_bucket, number],
submit_py_files=["s3://data-science/code/dependencies/dependencies.zip"],
spark_event_logs_s3_uri="s3://data-science/spark_event_logs",
logs=False,)
# Initialize Spark session:
spark = SparkSession.builder \
.appName("Spark Processing Job") \
.getOrCreate()
print("Spark session initialized with optimized configuration.")
# setting the spark context to pick the py file for dependencies
sc = SparkContext.getOrCreate()
sc.addPyFile(local_dependencies_path)
print("Py file added")
# Import all python dependencies:
try:
import pandas as pd
print(f"Pandas version: {pd.__version__}")
import numpy as np
print(f"Numpy version: {np.__version__}")
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics.pairwise import euclidean_distances
print("Sklearn metrics loaded")
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
print("Gensim models loaded")
except ImportError as e:
# Log the error and terminate the job
print(f"Dependency loading error: {e}")
raise SystemExit(f"Job terminated due to missing dependencies: {e}")
简单猜测是“它们没有安装在 Spark 运行的运行时环境中。”