我在使用
DataprocCreateClusterOperator
在 Dataproc 集群中安装软件包时遇到一些问题
我正在尝试升级到 Airflow 2.0
错误信息:
ValueError: metadata was invalid: [('bigquery-connector-version', '1.1.1'), ('spark-bigquery-connector-version', '0.17.2'), ('PIP_PACKAGES', 'oyaml'), ('x-goog-api-client', 'gl-python/3.8.12 grpc/1.39.0 gax/1.31.1 gccl/airflow_v2.1.2+composer')]
进一步挖掘发现了一个[github链接](https://github.com/apache/airflow/pull/19446),其中
ValueError: metadata was invalid
的问题在社区论坛中进行了讨论。
我按照链接并使用 CLUSTER_CONFIG 方式为 DataprocCreateClusterOperator 生成 cluster_config,但现在遇到了另一个新问题,如下所示:
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 67, in error_remapped_callable
return callable_(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "Compute Engine instance tag '-' must match pattern (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)"
debug_error_string = "{"created":"@1640080533.396337969","description":"Error received from peer ipv4:142.250.97.95:443","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Compute Engine instance tag '-' must match pattern (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)","grpc_status":3}"
我没有看到太多信息。
这是代码:
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator
CLUSTER_CONFIG = ClusterGenerator(
project_id="project_id",
region="us-central1",
cluster_name="cluster_name",
tags="dataproc",
num_workers=2,
storage_bucket=None,
num_masters=1,
master_machine_type="n1-standard-4",
master_disk_type="pd-standard",
master_disk_size=1024,
worker_machine_type="n1-standard-4",
worker_disk_type="pd-standard",
worker_disk_size=1024,
properties={},
image_version="1.5-ubuntu18",
autoscaling_policy=None,
idle_delete_ttl=7200,
optional_components=['JUPYTER', 'ANACONDA'],
metadata={"gcs-connector-version" : '2.1.1' ,
"bigquery-connector-version": '1.1.1',
"spark-bigquery-connector-version": '0.17.2',
"PIP_PACKAGES" : 'datalab shap oyaml click apache-airflow apache-airflow-providers-google'
},
init_actions_uris =['gs://goog-dataproc-initialization-actions-{region}/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-{region}/python/pip-install.sh']
).make()
with dag:
create_dataproc_cluster = DataprocCreateClusterOperator(
task_id="create_test_dataproc_cluster",
cluster_name="cluster_name",
project_id="project_id",
region="us-central1",
cluster_config=CLUSTER_CONFIG,
)
create_dataproc_cluster
以下 dag 按预期工作,已更改:
cluster_name
-> cluster-name
)。import os
from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator
CLUSTER_CONFIG = ClusterGenerator(
project_id="project_id",
region="us-central1",
cluster_name="cluster-name",
tags=["dataproc"],
num_workers=2,
storage_bucket=None,
num_masters=1,
master_machine_type="n1-standard-4",
master_disk_type="pd-standard",
master_disk_size=1024,
worker_machine_type="n1-standard-4",
worker_disk_type="pd-standard",
worker_disk_size=1024,
properties={},
image_version="1.5-ubuntu18",
autoscaling_policy=None,
idle_delete_ttl=7200,
optional_components=['JUPYTER', 'ANACONDA'],
metadata={"gcs-connector-version" : '2.1.1' ,
"bigquery-connector-version": '1.1.1',
"spark-bigquery-connector-version": '0.17.2',
"PIP_PACKAGES" : 'datalab shap oyaml click apache-airflow apache-airflow-providers-google'
},
init_actions_uris =['gs://goog-dataproc-initialization-actions-us-central1/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh']
).make()
with models.DAG(
"example_gcp_dataproc",
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
create_dataproc_cluster = DataprocCreateClusterOperator(
task_id="create_test_dataproc_cluster",
cluster_name="cluster-name",
project_id="<your-project-name>",
region="us-central1",
cluster_config=CLUSTER_CONFIG,
)
create_dataproc_cluster
pip_install.sh 的内容是什么?