使用 DataprocCreateClusterOperator 传递元数据字段时面临的问题 (Airflow 2.0)

问题描述 投票:0回答:2

我在使用

DataprocCreateClusterOperator
在 Dataproc 集群中安装软件包时遇到一些问题 我正在尝试升级到 Airflow 2.0

错误信息:

ValueError: metadata was invalid: [('bigquery-connector-version', '1.1.1'), ('spark-bigquery-connector-version', '0.17.2'), ('PIP_PACKAGES', 'oyaml'), ('x-goog-api-client', 'gl-python/3.8.12 grpc/1.39.0 gax/1.31.1 gccl/airflow_v2.1.2+composer')] 

进一步挖掘发现了一个[github链接](https://github.com/apache/airflow/pull/19446),其中

ValueError: metadata was invalid
的问题在社区论坛中进行了讨论。

我按照链接并使用 CLUSTER_CONFIG 方式为 DataprocCreateClusterOperator 生成 cluster_config,但现在遇到了另一个新问题,如下所示:

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 67, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "Compute Engine instance tag '-' must match pattern (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)"
    debug_error_string = "{"created":"@1640080533.396337969","description":"Error received from peer ipv4:142.250.97.95:443","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Compute Engine instance tag '-' must match pattern (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)","grpc_status":3}"

我没有看到太多信息。

这是代码:

from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator

CLUSTER_CONFIG = ClusterGenerator(
    project_id="project_id",
    region="us-central1",
    cluster_name="cluster_name",
    tags="dataproc",
    num_workers=2,
    storage_bucket=None,
    num_masters=1,
    master_machine_type="n1-standard-4",
    master_disk_type="pd-standard",
    master_disk_size=1024,
    worker_machine_type="n1-standard-4",
    worker_disk_type="pd-standard",
    worker_disk_size=1024,
    properties={},
    image_version="1.5-ubuntu18",
    autoscaling_policy=None,
    idle_delete_ttl=7200,
    optional_components=['JUPYTER', 'ANACONDA'],
    metadata={"gcs-connector-version" : '2.1.1' , 
                  "bigquery-connector-version": '1.1.1',
                  "spark-bigquery-connector-version": '0.17.2',
                  "PIP_PACKAGES" : 'datalab shap oyaml click apache-airflow apache-airflow-providers-google'
                 },
    init_actions_uris =['gs://goog-dataproc-initialization-actions-{region}/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-{region}/python/pip-install.sh']
).make()

with dag:
   create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_test_dataproc_cluster",
        cluster_name="cluster_name",
        project_id="project_id",
        region="us-central1",
        cluster_config=CLUSTER_CONFIG,
    )
    
   create_dataproc_cluster

python airflow google-cloud-dataproc airflow-2.x
2个回答
1
投票

以下 dag 按预期工作,已更改:

  • 集群名称 (
    cluster_name
    ->
    cluster-name
    )。
  • 脚本的路径。
  • 达格定义。
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator

CLUSTER_CONFIG = ClusterGenerator(
    project_id="project_id",
    region="us-central1",
    cluster_name="cluster-name",
    tags=["dataproc"],
    num_workers=2,
    storage_bucket=None,
    num_masters=1,
    master_machine_type="n1-standard-4",
    master_disk_type="pd-standard",
    master_disk_size=1024,
    worker_machine_type="n1-standard-4",
    worker_disk_type="pd-standard",
    worker_disk_size=1024,
    properties={},
    image_version="1.5-ubuntu18",
    autoscaling_policy=None,
    idle_delete_ttl=7200,
    optional_components=['JUPYTER', 'ANACONDA'],
    metadata={"gcs-connector-version" : '2.1.1' , 
                  "bigquery-connector-version": '1.1.1',
                  "spark-bigquery-connector-version": '0.17.2',
                  "PIP_PACKAGES" : 'datalab shap oyaml click apache-airflow apache-airflow-providers-google'
                 },
    init_actions_uris =['gs://goog-dataproc-initialization-actions-us-central1/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh']
).make()


with models.DAG(
    "example_gcp_dataproc",
    schedule_interval='@once',
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    
   create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_test_dataproc_cluster",
        cluster_name="cluster-name",
        project_id="<your-project-name>",
        region="us-central1",
        cluster_config=CLUSTER_CONFIG,
    )
    
   create_dataproc_cluster

0
投票

pip_install.sh 的内容是什么?

© www.soinside.com 2019 - 2024. All rights reserved.