GCP 数据流的 Python 脚本在 GitHub 操作流中失败

问题描述 投票:0回答:1

当我尝试将数据从 Pub/Sub 写入 BigQuery 时,我遇到了 GitHub 操作工作流程上的 GCP Dataflow 的 python 脚本问题。 它返回与写入 Big Query 相关的错误。

Python 脚本:

import argparse
import json
import os
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)

# Service account key path
INPUT_SUBSCRIPTION = f"projects/{os.getenv('PROJECT_ID')}/subscriptions/{os.getenv('SUBSCRIPTION_NAME')}"
#BIGQUERY_TABLE = f"{os.getenv('PROJECT_ID')}:{os.getenv('DATASET_NAME')}.{os.getenv('TABLE_NAME')}"
BIGQUERY_TABLE = "gcp-live-data-xxx:tx.tx"

# Load schema from file
# with open('terraform/schemas/tx_tx_schema.json', 'r') as file:
#     SCHEMA = json.load(file)

# BIGQUERY_SCHEMA = ",".join([f"{item['name']}:{item['type']}" for item in SCHEMA])
BIGQUERY_SCHEMA = "TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC"

# Debugging
logging.info(f"Using BigQuery Table: {BIGQUERY_TABLE}")
logging.info(f"Using BigQuery Schema: {BIGQUERY_SCHEMA}")
print(f"Using BigQuery Table: {BIGQUERY_TABLE}")
print(f"Using BigQuery Schema: {BIGQUERY_SCHEMA}")

class CustomParsing(beam.DoFn):
    """ Custom ParallelDo class to apply a custom transformation """

    def to_runner_api_parameter(self, unused_context):
        # Not very relevant, returns a URN (uniform resource name) and the payload
        return "beam:transforms:custom_parsing:custom_v0", None

    def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        """
        Simple processing function to parse the data and add a timestamp
        For additional params see:
        https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
        """
        parsed = json.loads(element.decode("utf-8"))
        parsed["timestamp"] = timestamp.to_rfc3339()
        yield parsed


def run():
    # Parsing arguments
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_subscription",
        help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."',
        default=INPUT_SUBSCRIPTION,
    )
    parser.add_argument(
        "--output_table", help="Output BigQuery Table", default=BIGQUERY_TABLE
    )
    parser.add_argument(
        "--output_schema",
        help="Output BigQuery Schema in text format",
        default=BIGQUERY_SCHEMA,
    )
    known_args, pipeline_args = parser.parse_known_args()

    # Debbuging
    logging.info(f"Output Table: {known_args.output_table}")
    logging.info(f"Output Schema: {known_args.output_schema}")
    print(f"Output Table: {known_args.output_table}")
    print(f"Output Schema: {known_args.output_schema}")

    # Creating pipeline options
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(StandardOptions).streaming = True

    # Defining our pipeline and its steps
    with beam.Pipeline(options=pipeline_options) as p:
        (
            p
            | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
                subscription=known_args.input_subscription, timestamp_attribute=None
            )
            | "CustomParse" >> beam.ParDo(CustomParsing())
            | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
                table=known_args.output_table,
                schema=known_args.output_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            )
        )


if __name__ == "__main__":
    run()

GitHub 职位代码:

dataflow-job:
    name: 'Dataflow-job'
    runs-on: [self-hosted, linux, x64, gcp, terraform, iac]

    # Use the Bash shell regardless whether the GitHub Actions runner is ubuntu-latest, macos-latest, or windows-latest
    defaults:
      run:
        shell: bash
        working-directory: .

    env:
      PROJECT_ID: "${{ vars.PROJECT_ID }}"
      SUBSCRIPTION_NAME: "${{ vars.SUBSCRIPTION_NAME }}"
      DATASET_NAME: "${{ vars.DATASET_NAME }}"
      TABLE_NAME: "${{ vars.TABLE_NAME }}"
      REGION: "${{ vars.REGION }}"

    steps:
    # Checkout the repository to the GitHub Actions runner
    - name: Checkout
      uses: actions/checkout@v4

    # Set up Python environment
    - name: "Set up Python"
      uses: actions/setup-python@v4
      with:
        python-version: "3.11.4"

    # Install dependencies
    - name: "Install dependencies"
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt

    # Authenticate with Google Cloud
    - name: "Authenticate with Google Cloud"
      env:
        GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }}
      run: |
        echo "${GOOGLE_APPLICATION_CREDENTIALS}" > ${HOME}/gcloud.json
        gcloud auth activate-service-account --key-file=${HOME}/gcloud.json
        gcloud config set project $PROJECT_ID

    # Run Python script to trigger Dataflow job
    - name: "Run Dataflow Job"
      env:
        GOOGLE_APPLICATION_CREDENTIALS: ${HOME}/gcloud.json
      run: |
        python src/python/dataflow-tx-pipeline.py --runner DataflowRunner --project gcp-live-data-xxx --region us-central1 --temp_location gs://ff-tx-dataflow/temp --staging_location gs://ff-tx-dataflow/staging

错误:

INFO:root:Using BigQuery Table: gcp-live-data-xxx:tx.tx
INFO:root:Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
INFO:root:Output Table: gcp-live-data-xxx:tx.tx
INFO:root:Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error.
WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error.
Traceback (most recent call last):
  File "/xxx/src/python/dataflow-tx-pipeline.py", line 92, in <module>
    run()
Using BigQuery Table: gcp-live-data-xxx:tx.tx
Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
Output Table: gcp-live-data-xxx:tx.tx
Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
  File "/xxx/src/python/dataflow-tx-pipeline.py", line 83, in run
    | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
                           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery.py", line 2102, in __init__
    self.table_reference = bigquery_tools.parse_table_reference(
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 263, in parse_table_reference
    if isinstance(table, TableReference):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
Error: Process completed with exit code 1.

目前已测试:

  1. 硬编码模式,
  2. 检查变量是否提供正确,
  3. 用 dict() 替换
    known_args.output_table

我错过了什么?

python google-bigquery github-actions google-cloud-dataflow apache-beam
1个回答
0
投票

您很有可能正在使用过时版本的 Apache Beam 库。尝试升级到 Apache Beam 的最新版本。因此,在您的requirements.txt 文件中,指定更新的版本。然后在 GitHub Actions 工作流程中重新运行 pip install -rrequirements.txt。可能至少 Beam 2.46.0 或更高版本应该可以解决此问题。

© www.soinside.com 2019 - 2024. All rights reserved.