当我尝试将数据从 Pub/Sub 写入 BigQuery 时,我遇到了 GitHub 操作工作流程上的 GCP Dataflow 的 python 脚本问题。 它返回与写入 Big Query 相关的错误。
Python 脚本:
import argparse
import json
import os
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# Service account key path
INPUT_SUBSCRIPTION = f"projects/{os.getenv('PROJECT_ID')}/subscriptions/{os.getenv('SUBSCRIPTION_NAME')}"
#BIGQUERY_TABLE = f"{os.getenv('PROJECT_ID')}:{os.getenv('DATASET_NAME')}.{os.getenv('TABLE_NAME')}"
BIGQUERY_TABLE = "gcp-live-data-xxx:tx.tx"
# Load schema from file
# with open('terraform/schemas/tx_tx_schema.json', 'r') as file:
# SCHEMA = json.load(file)
# BIGQUERY_SCHEMA = ",".join([f"{item['name']}:{item['type']}" for item in SCHEMA])
BIGQUERY_SCHEMA = "TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC"
# Debugging
logging.info(f"Using BigQuery Table: {BIGQUERY_TABLE}")
logging.info(f"Using BigQuery Schema: {BIGQUERY_SCHEMA}")
print(f"Using BigQuery Table: {BIGQUERY_TABLE}")
print(f"Using BigQuery Schema: {BIGQUERY_SCHEMA}")
class CustomParsing(beam.DoFn):
""" Custom ParallelDo class to apply a custom transformation """
def to_runner_api_parameter(self, unused_context):
# Not very relevant, returns a URN (uniform resource name) and the payload
return "beam:transforms:custom_parsing:custom_v0", None
def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
"""
Simple processing function to parse the data and add a timestamp
For additional params see:
https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
"""
parsed = json.loads(element.decode("utf-8"))
parsed["timestamp"] = timestamp.to_rfc3339()
yield parsed
def run():
# Parsing arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_subscription",
help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."',
default=INPUT_SUBSCRIPTION,
)
parser.add_argument(
"--output_table", help="Output BigQuery Table", default=BIGQUERY_TABLE
)
parser.add_argument(
"--output_schema",
help="Output BigQuery Schema in text format",
default=BIGQUERY_SCHEMA,
)
known_args, pipeline_args = parser.parse_known_args()
# Debbuging
logging.info(f"Output Table: {known_args.output_table}")
logging.info(f"Output Schema: {known_args.output_schema}")
print(f"Output Table: {known_args.output_table}")
print(f"Output Schema: {known_args.output_schema}")
# Creating pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
# Defining our pipeline and its steps
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription, timestamp_attribute=None
)
| "CustomParse" >> beam.ParDo(CustomParsing())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table=known_args.output_table,
schema=known_args.output_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)
if __name__ == "__main__":
run()
GitHub 职位代码:
dataflow-job:
name: 'Dataflow-job'
runs-on: [self-hosted, linux, x64, gcp, terraform, iac]
# Use the Bash shell regardless whether the GitHub Actions runner is ubuntu-latest, macos-latest, or windows-latest
defaults:
run:
shell: bash
working-directory: .
env:
PROJECT_ID: "${{ vars.PROJECT_ID }}"
SUBSCRIPTION_NAME: "${{ vars.SUBSCRIPTION_NAME }}"
DATASET_NAME: "${{ vars.DATASET_NAME }}"
TABLE_NAME: "${{ vars.TABLE_NAME }}"
REGION: "${{ vars.REGION }}"
steps:
# Checkout the repository to the GitHub Actions runner
- name: Checkout
uses: actions/checkout@v4
# Set up Python environment
- name: "Set up Python"
uses: actions/setup-python@v4
with:
python-version: "3.11.4"
# Install dependencies
- name: "Install dependencies"
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
# Authenticate with Google Cloud
- name: "Authenticate with Google Cloud"
env:
GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }}
run: |
echo "${GOOGLE_APPLICATION_CREDENTIALS}" > ${HOME}/gcloud.json
gcloud auth activate-service-account --key-file=${HOME}/gcloud.json
gcloud config set project $PROJECT_ID
# Run Python script to trigger Dataflow job
- name: "Run Dataflow Job"
env:
GOOGLE_APPLICATION_CREDENTIALS: ${HOME}/gcloud.json
run: |
python src/python/dataflow-tx-pipeline.py --runner DataflowRunner --project gcp-live-data-xxx --region us-central1 --temp_location gs://ff-tx-dataflow/temp --staging_location gs://ff-tx-dataflow/staging
错误:
INFO:root:Using BigQuery Table: gcp-live-data-xxx:tx.tx
INFO:root:Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
INFO:root:Output Table: gcp-live-data-xxx:tx.tx
INFO:root:Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error.
WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error.
Traceback (most recent call last):
File "/xxx/src/python/dataflow-tx-pipeline.py", line 92, in <module>
run()
Using BigQuery Table: gcp-live-data-xxx:tx.tx
Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
Output Table: gcp-live-data-xxx:tx.tx
Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
File "/xxx/src/python/dataflow-tx-pipeline.py", line 83, in run
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery.py", line 2102, in __init__
self.table_reference = bigquery_tools.parse_table_reference(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 263, in parse_table_reference
if isinstance(table, TableReference):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
Error: Process completed with exit code 1.
目前已测试:
known_args.output_table
。我错过了什么?
您很有可能正在使用过时版本的 Apache Beam 库。尝试升级到 Apache Beam 的最新版本。因此,在您的requirements.txt 文件中,指定更新的版本。然后在 GitHub Actions 工作流程中重新运行 pip install -rrequirements.txt。可能至少 Beam 2.46.0 或更高版本应该可以解决此问题。