我启动了一个 AWS EMR-EC2 集群,但在让 apache-beam 的 Sparkrunner 工作时遇到问题。 我有一个将使用 apache-beam 的 python 脚本。我已经尝试过
aws emr add-steps
或 ssh 进入主节点,并且 spark-submit
两次都得到
package not found error on apache-beam
#!/bin/bash
pip install --upgrade pip
pip install setuptools --upgrade
pip install "apache_beam[aws]==2.61.0"
当我添加步骤或提交作业时,仍然说找不到包。
我尝试在
root
下的所有3个节点中手动安装apache-beam,但是当我运行spark提交时,它仍然说找不到包。
我尝试使用docker镜像
spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=user/image:latest \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.pyspark.python=python3 \
--conf spark.pyspark.driver.python=python3 \
--conf spark.executorEnv.PYTHONPATH="/usr/local/lib/python3.7/site-packages" \
beam_test_local.py \
--input_pattern_list="s3path" \
--runner=SparkRunner \
--spark_master=yarn \
--region=us-east-1
我检查并 grep 了 docker 或我的用户名的所有日志,但根本没有 docker 被拉取的迹象。
aws emr add-steps \
--cluster-id j-id \
--steps Type=Spark,Name="SparkJob",ActionOnFailure=CONTINUE,Args="[
'--master', 'yarn',
'--deploy-mode', 'cluster',
'--conf', 'spark.archives=s3://s3bucket/pyspark_venv.tar.gz#environment',
'--conf', 'spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python',
'--conf', 'spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python',
'pythonfile'
]" \
--region us-east-1
这个会克服找不到包的错误,但它会抱怨
WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1733453351.558696 7840 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
Traceback (most recent call last):
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/beam_test.py", line 55, in <module>
run()
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/beam_test.py", line 49, in run
(p
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/pipeline.py", line 620, in __exit__
self.result = self.run()
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/pipeline.py", line 594, in run
return self.runner.run_pipeline(self, self._options)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/runner.py", line 180, in run_pipeline
return self.run_portable_pipeline(
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 375, in run_portable_pipeline
job_service_handle = self.create_job_service(options)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 289, in create_job_service
return self.create_job_service_handle(server.start(), options)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/job_server.py", line 79, in start
self._endpoint = self._job_server.start()
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/job_server.py", line 104, in start
cmd, endpoint = self.subprocess_cmd_and_endpoint()
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/job_server.py", line 149, in subprocess_cmd_and_endpoint
jar_path = self.local_jar(self.path_to_jar(), self._jar_cache_dir)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/runners/portability/job_server.py", line 146, in local_jar
return subprocess_server.JavaJarServer.local_jar(url, jar_cache_dir)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1733453166709_0001/container_1733453166709_0001_02_000001/environment/lib64/python3.9/site-packages/apache_beam/utils/subprocess_server.py", line 382, in local_jar
os.makedirs(cache_dir)
File "/usr/lib64/python3.9/os.py", line 215, in makedirs
makedirs(head, exist_ok=exist_ok)
File "/usr/lib64/python3.9/os.py", line 215, in makedirs
makedirs(head, exist_ok=exist_ok)
File "/usr/lib64/python3.9/os.py", line 225, in makedirs
mkdir(name, mode)
PermissionError: [Errno 13] Permission denied: '/home/.apache_beam'
我现在很失落。将不胜感激任何帮助
当我使用spark-submit时,我不需要将runner设置为SparkRunner,因为spark-submit是一个专门用于提交Spark应用程序的命令行工具。通过使用spark-submit,Spark会自动处理集群上作业的执行,我不需要在代码中指定运行器。