我有一台气流机器,其版本为apache-airflow == 1.10.5。我知道如何运行自动创建集群的dag并运行该步骤并终止集群。使用气流UI中的连接,我能够实现这一目标。但是要在现有的aws emr群集上运行dag,我无法知道我需要在连接中传递哪些参数。
AIRFLOW UI->管理员->连接->创建的Conn ID(EMR Default1),conn类型为Elastic Mapreduce。
[2019-10-14 12:12:40,919] {taskinstance.py:1051} ERROR - Parameter validation failed:
Missing required parameter in input: "Instances"
Traceback (most recent call last):
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
result = task_copy.execute(context=context)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/operators/emr_create_job_flow_operator.py", line 68, in execute
response = emr.create_job_flow(self.job_flow_overrides)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/hooks/emr_hook.py", line 55, in create_job_flow
response = self.get_conn().run_job_flow(**config)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 586, in _make_api_call
api_params, operation_model, context=request_context)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 621, in _convert_to_request_dict
api_params, operation_model)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/validate.py", line 291, in serialize_to_request
raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in input: "Instances"
[2019-10-14 12:12:40,920] {taskinstance.py:1082} INFO - Marking task as FAILED.
在第一种情况下,也可以通过扩展SparkSubmitOperator
运算符来实现它,而不是使用UI动态创建/终止集群。启动EMR群集后,您可以将* .xml(例如core-site.xml)文件从EMR主服务器复制到气流节点上的某个位置,然后在气流中的“火花提交”任务中指向这些文件。至少我们那天在产品中做到这一点。为了进行逻辑上的扩展,如果您打算重用现有集群,那么您所需要做的就是知道这些* .xml文件已存储在何处。然后,其余部分将相同。您只需要在触发任务时引用这些文件即可。
更多详细信息
我不知道任何这样的文档,所以我只能建议您根据我所掌握的知识探索以下内容:
我们需要编写一个自定义插件来提交Spark。作为此自定义插件模块的一部分,让我们定义一个CustomSparkSubmitOperator
类。它需要扩展BaseOperator
。您可以找到许多有关在气流中编写自定义插件的文章。 This可能是一个不错的起点。 Here,您可以查看BaseOperator
的更多详细信息。
在BaseOperator
中,您将找到一个名为pre_execute
的方法。在此方法内执行以下操作是一个可行的选择:
a。等待集群启动。如果传递了cluster-id,则可以使用boto3轻松实现。
b。群集启动后,获取EMR主节点的ip,并将与/etc/hadoop/conf/*-site.xml
相匹配的内容复制到您的气流节点。这可以通过python中的子进程调用来实现。
一旦有了xml文件,在execute
方法中,只需使用SparkSubmitHook
提交您的spark-job。您需要确保气流节点上的火花二进制文件正在使用此路径进行火花提交。
如果需要,您可以使用post_execute
方法清理群集。