我试图在Airflow中运行简单的SSHExecutorOperator。
这是我的.py文件:
from airflow.contrib.hooks.ssh_hook import SSHHook
from datetime import timedelta
default_args = {
'owner': 'airflow',
'start_date':airflow.utils.dates.days_ago(2),
'retries': 3
}
dag = DAG('Nas_Hdfs', description='Simple tutorial DAG',
schedule_interval=None,default_args=default_args,
catchup=False)
sshHook = SSHHook(conn_id='101')
sshHook.no_host_key_check = True
t2 = SSHExecuteOperator(task_id="NAS_TO_HDFS_FILE_COPY",
bash_command="hostname ",
ssh_hook=sshHook,
dag=dag
)
t2
我收到以下错误:
ERROR - Failed to create remote temp file
这是完整的日志:
INFO - Subtask: --------------------------------------------------------------------------------
INFO - Subtask: Starting attempt 1 of 4
INFO - Subtask: --------------------------------------------------------------------------------
INFO - Subtask:
INFO - Subtask: [2018-05-28 08:54:22,812] {models.py:1342} INFO - Executing <Task(SSHExecuteOperator): NAS_TO_HDFS_FILE_COPY> on 2018-05-28 08:54:12.876538
INFO - Subtask: [2018-05-28 08:54:23,303] {models.py:1417} ERROR - Failed to create remote temp file
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask: File "/opt/miniconda3/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
INFO - Subtask: result = task_copy.execute(context=context)
INFO - Subtask: File "/opt/miniconda3/lib/python2.7/site-packages/airflow/contrib/operators/ssh_execute_operator.py", line 128, in execute
INFO - Subtask: self.task_id) as remote_file_path:
INFO - Subtask: File "/opt/miniconda3/lib/python2.7/site-packages/airflow/contrib/operators/ssh_execute_operator.py", line 64, in __enter__
INFO - Subtask: raise AirflowException("Failed to create remote temp file")
INFO - Subtask: AirflowException: Failed to create remote temp file
INFO - Subtask: [2018-05-28 08:54:23,304] {models.py:1433} INFO - Marking task as UP_FOR_RETRY
INFO - Subtask: [2018-05-28 08:54:23,342] {models.py:1462} ERROR - Failed to create remote temp file
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask: File "/opt/miniconda3/bin/airflow", line 28, in <module>
INFO - Subtask: args.func(args)
INFO - Subtask: File "/opt/miniconda3/lib/python2.7/site-packages/airflow/bin/cli.py", line 422, in run
INFO - Subtask: pool=args.pool,
INFO - Subtask: File "/opt/miniconda3/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
INFO - Subtask: result = func(*args, **kwargs)
INFO - Subtask: File "/opt/miniconda3/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
INFO - Subtask: result = task_copy.execute(context=context)
INFO - Subtask: File "/opt/miniconda3/lib/python2.7/site-packages/airflow/contrib/operators/ssh_execute_operator.py", line 128, in execute
INFO - Subtask: self.task_id) as remote_file_path:
INFO - Subtask: File "/opt/miniconda3/lib/python2.7/site-packages/airflow/contrib/operators/ssh_execute_operator.py", line 64, in __enter__
INFO - Subtask: raise AirflowException("Failed to create remote temp file")
INFO - Subtask: airflow.exceptions.AirflowException: Failed to create remote temp file
INFO - Task exited with return code 1
任何帮助都非常感谢!
编辑:我在我的airflow用户python shell中运行它,这是输出:
from airflow.contrib.hooks.ssh_hook import SSHHook
sshHook = SSHHook(conn_id='101')
sshHook.no_host_key_check = True
sshHook.Popen(["-q", "mktemp", "--tmpdir", "tmp_XXXXXX"])
请确保按照以下3个步骤操作: