有没有办法通过Airflow DataprocCreateBatchOperator方法传递dataproc版本?

问题描述 投票:0回答:1

我刚刚遇到一个问题,dataproc 的默认版本升级并破坏了我正在通过气流使用 DataprocCreateBatchOperator 方法提交的作业。

     task2 = DataprocCreateBatchOperator(
         task_id="trip_level_data",
         project_id="generic_project_id",
         region="us-east4",
         batch_id="trip-"+"".join(random.choice(string.ascii_lowercase + string.digits) for i in range(35)),
         batch={
"pyspark_batch" : {
    "args" : [
        "--env=prod"
    ],
    "jar_file_uris" : [
        "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.0.jar"
    ],
    "main_python_file_uri" : "gs://generic_bucket/get_trip_info.py"
},
"labels" : {
    "dag_id" : "{{ run_id_format(dag.dag_id) }}",
    "dag_run_id" : "{{ run_id_format(run_id) }}",
    "task_id" : "{{ run_id_format(task.task_id) }}"
},
"environment_config" : {
    "execution_config" : {
        "service_account" : "svc@generic_project.iam.gserviceaccount.com",
        "subnetwork_uri" : "https://www.googleapis.com/compute/alpha/projects/shared-vpc-admin/regions/us-east4/subnetworks/prod-us-east4-01"
    }
}

})

我在文档中没有看到如何在创建运算符时向 DAG 方法提供版本号。 如果通过 gcloud 命令行运行,我可以使用 --version=2.0.85 手动运行该作业,它工作正常。

airflow dataproc google-cloud-dataproc-serverless astronomer
1个回答
0
投票

是的,您需要使用runtime_config并指定“版本”, 参考这个示例代码

       'environment_config': { 'execution_config': { 'subnetwork_uri': SUBNET, 'kms_key': KMS_KEY, }, 'peripherals_config': { 'metastore_service': METASTORE_SERVICE, 'spark_history_server_config': { 'dataproc_cluster': PHS_SERVER, }, }, },       "runtime_config": { "version": "2.2",}

© www.soinside.com 2019 - 2024. All rights reserved.