如何使用本地运行的 ApacheBeam 写入 BigQuery 表而不运行 Google Cloud CLI 可执行文件?

问题描述 投票:0回答:1

我可以使用外部 Python 引擎从 BigQuery 表中读取数据,但当我使用本地运行的 Apache Beam + Python 时,我无法进行身份验证。

文档仅提供 CLI 选项“当您在本地运行时,您的 Apache Beam 管道将作为您使用 Google Cloud CLI 可执行文件配置的 Google Cloud 帐户运行。因此,本地运行 Apache Beam SDK 操作并且您的 Google Cloud 帐户可以访问相同的文件和资源。”

PipelineOptions
没有认证参数。我想使用环境变量进行身份验证,就像我使用
bigquery.Client
所做的那样。

我添加了

method="STREAMING_INSERTS"
以避免使用水桶。

我目前得到的代码:

import os
import pandas
import pandas_gbq

from google.cloud import bigquery
from google.oauth2 import service_account

import apache_beam as beam
from apache_beam.options.pipeline_options import  PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery

assert 'google_key' in os.environ, "Please define a secret called google_key; its value must be the contents of access key in JSON format"

credentials = service_account.Credentials.from_service_account_info(eval("{" + os.environ['google_key'] +"}"))
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# reading data - client.query() - works fine
query_job = client.query("SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 1")
df = query_job.to_dataframe()
# writing data using load_table_from_dataframe also works fine
# https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-dataframe
quotes_list = [{'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
               {'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
              ]
df_quotes = pandas.DataFrame(quotes_list)
job_config = bigquery.LoadJobConfig(
    schema=[bigquery.SchemaField(col, bigquery.enums.SqlTypeNames.STRING) for col in df_quotes.columns],
    write_disposition="WRITE_TRUNCATE",
)
table_id = "dummy_dataset.quotes_table"
job = client.load_table_from_dataframe(df_quotes, table_id, job_config=job_config)  # Make an API request.
job.result()  # Wait for the job to complete.
table = client.get_table(table_id)  # Make an API request.
print(f"Loaded {table.num_rows} rows and {len(table.schema)} columns")

# writing data using pandas_gbq works fine
# https://pandas-gbq.readthedocs.io/en/latest/writing.html#writing-to-an-existing-table
pandas_gbq.to_gbq(df, "dummy_dataset.dummy_table2", credentials.project_id, credentials=credentials, if_exists='replace')

# writing using WriteToBigQuery does not work
beam_options = PipelineOptions()

with beam.Pipeline(options=beam_options) as pipeline:
  quotes = pipeline | beam.Create(quotes_list)
  quotes | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=f'{credentials.project_id}:dummy_dataset.dummy_table',
    ignore_unknown_columns=True,
    schema='source:STRING, quote:STRING',
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    method="STREAMING_INSERTS")

我使用

beam_options = PipelineOptions()
时遇到的错误和警告如下:

WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth._default:No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth._default:No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
ERROR:apache_beam.runners.common:Project was not passed and could not be determined from the environment. [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
    self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
    self.gcp_bq_client = client or gcp_bigquery.Client(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
    super(Client, self).__init__(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
    _ClientProjectMixin.__init__(self, project=project, credentials=credentials)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
    raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment.
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
    self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
    self.gcp_bq_client = client or gcp_bigquery.Client(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
    super(Client, self).__init__(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
    _ClientProjectMixin.__init__(self, project=project, credentials=credentials)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
    raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/runner/BigQueryWrite/main.py", line 27, in <module>
    with beam.Pipeline(options=beam_options) as pipeline:
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/pipeline.py", line 612, in __exit__
    self.result = self.run()
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/pipeline.py", line 586, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 128, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 466, in run_stages
    bundle_results = self._execute_bundle(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 794, in _execute_bundle
    self._run_bundle(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1031, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1367, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 384, in push
    response = self.worker.do_instruction(request)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in do_instruction
    return getattr(self, request_type)(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 677, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1071, in process_bundle
    op.start()
  File "apache_beam/runners/worker/operations.py", line 929, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 931, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 934, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/common.py", line 1510, in apache_beam.runners.common.DoFnRunner.start
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
    self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
    self.gcp_bq_client = client or gcp_bigquery.Client(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
    super(Client, self).__init__(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
    _ClientProjectMixin.__init__(self, project=project, credentials=credentials)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
    raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment. [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
python authentication google-bigquery apache-beam
1个回答
0
投票

解决方法是创建一个临时文件,将服务帐户 API 密钥的内容写入该文件,并创建一个临时环境变量

GOOGLE_APPLICATION_CREDENTIALS
,其中包含临时文件的路径。

import os
import tempfile

tf = tempfile.NamedTemporaryFile()
with open(tf.name, 'w') as f:
  f.write("{" + os.environ['google_sa_key'] +"}")
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = tf.name
© www.soinside.com 2019 - 2024. All rights reserved.