我可以使用外部 Python 引擎从 BigQuery 表中读取数据,但当我使用本地运行的 Apache Beam + Python 时,我无法进行身份验证。
文档仅提供 CLI 选项“当您在本地运行时,您的 Apache Beam 管道将作为您使用 Google Cloud CLI 可执行文件配置的 Google Cloud 帐户运行。因此,本地运行 Apache Beam SDK 操作并且您的 Google Cloud 帐户可以访问相同的文件和资源。”
PipelineOptions
没有认证参数。我想使用环境变量进行身份验证,就像我使用 bigquery.Client
所做的那样。
我添加了
method="STREAMING_INSERTS"
以避免使用水桶。
我目前得到的代码:
import os
import pandas
import pandas_gbq
from google.cloud import bigquery
from google.oauth2 import service_account
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery
assert 'google_key' in os.environ, "Please define a secret called google_key; its value must be the contents of access key in JSON format"
credentials = service_account.Credentials.from_service_account_info(eval("{" + os.environ['google_key'] +"}"))
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
# reading data - client.query() - works fine
query_job = client.query("SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 1")
df = query_job.to_dataframe()
# writing data using load_table_from_dataframe also works fine
# https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-dataframe
quotes_list = [{'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
{'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
]
df_quotes = pandas.DataFrame(quotes_list)
job_config = bigquery.LoadJobConfig(
schema=[bigquery.SchemaField(col, bigquery.enums.SqlTypeNames.STRING) for col in df_quotes.columns],
write_disposition="WRITE_TRUNCATE",
)
table_id = "dummy_dataset.quotes_table"
job = client.load_table_from_dataframe(df_quotes, table_id, job_config=job_config) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print(f"Loaded {table.num_rows} rows and {len(table.schema)} columns")
# writing data using pandas_gbq works fine
# https://pandas-gbq.readthedocs.io/en/latest/writing.html#writing-to-an-existing-table
pandas_gbq.to_gbq(df, "dummy_dataset.dummy_table2", credentials.project_id, credentials=credentials, if_exists='replace')
# writing using WriteToBigQuery does not work
beam_options = PipelineOptions()
with beam.Pipeline(options=beam_options) as pipeline:
quotes = pipeline | beam.Create(quotes_list)
quotes | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=f'{credentials.project_id}:dummy_dataset.dummy_table',
ignore_unknown_columns=True,
schema='source:STRING, quote:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method="STREAMING_INSERTS")
我使用
beam_options = PipelineOptions()
时遇到的错误和警告如下:
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth._default:No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth._default:No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
ERROR:apache_beam.runners.common:Project was not passed and could not be determined from the environment. [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
self.gcp_bq_client = client or gcp_bigquery.Client(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
super(Client, self).__init__(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
_ClientProjectMixin.__init__(self, project=project, credentials=credentials)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment.
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
self.gcp_bq_client = client or gcp_bigquery.Client(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
super(Client, self).__init__(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
_ClientProjectMixin.__init__(self, project=project, credentials=credentials)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/runner/BigQueryWrite/main.py", line 27, in <module>
with beam.Pipeline(options=beam_options) as pipeline:
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/pipeline.py", line 612, in __exit__
self.result = self.run()
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/pipeline.py", line 586, in run
return self.runner.run_pipeline(self, self._options)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 128, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 466, in run_stages
bundle_results = self._execute_bundle(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 794, in _execute_bundle
self._run_bundle(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1031, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1367, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 384, in push
response = self.worker.do_instruction(request)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in do_instruction
return getattr(self, request_type)(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 677, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1071, in process_bundle
op.start()
File "apache_beam/runners/worker/operations.py", line 929, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 931, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 934, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/common.py", line 1510, in apache_beam.runners.common.DoFnRunner.start
File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
self.gcp_bq_client = client or gcp_bigquery.Client(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
super(Client, self).__init__(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
_ClientProjectMixin.__init__(self, project=project, credentials=credentials)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment. [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
解决方法是创建一个临时文件,将服务帐户 API 密钥的内容写入该文件,并创建一个临时环境变量
GOOGLE_APPLICATION_CREDENTIALS
,其中包含临时文件的路径。
import os
import tempfile
tf = tempfile.NamedTemporaryFile()
with open(tf.name, 'w') as f:
f.write("{" + os.environ['google_sa_key'] +"}")
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = tf.name