Google Cloud:获取“可运行的工作流没有指定的步骤。”在 Dataflow 中运行自定义模板时

问题描述 投票:0回答:0

我只是在探索 Dataflow 并决定创建一个从我的 S3 读取数据的管道,格式化它(因为文件是 gz 文件)然后将它存储在 bigtable 中。

一切似乎都运行良好。我可以创建一个模板文件,但是当我在我的 Dataflow 模板中使用这个模板文件来创建一个作业时,作业失败说

Runnable workflow has no steps specified.

代码在 Jupyter notebook 中运行,我可以在 bigtable 中看到数据。一切正常,但在数据流(自定义模板)中作为作业提交时却不行。我做错了什么?

import gzip
from io import BytesIO
import json
import ast
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.io.aws.s3io import S3IO


PROJECT = "pro-name"
INSTANCE = "inst-name"
TABLE = "tblname"


class CustomPipelineOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--bigtable_project', default='proname'),
        parser.add_argument('--bigtable_instance', default='instname'),
        parser.add_argument('--bigtable_table', default='tblname')


class ConvertByteArrayToTuple(beam.DoFn):
    def __init__(self, options):

        self.instance_id = options.bigtable_instance
        self.table_id = options.bigtable_table


    def setup(self):
        pass

    def process(self, ddb_row):
        try:
            # print(type(ddb_row))
            gzipfile = BytesIO(ddb_row)
            gzipfile = gzip.GzipFile(fileobj=gzipfile)
            my_dict = ast.literal_eval(gzipfile.read().decode('utf-8').replace("\n{",",{"))
            # print(my_dict[0]['Item']['region']['S'])
            
            from google.cloud.bigtable import row
            import datetime
            
            for rows in range(len(my_dict)):
                direct_row = row.DirectRow(row_key=my_dict[rows]['Item']['region']['S'])
                direct_row.set_cell(
                    "col_family",
                    my_dict[rows]['Item']['value1']['S'],
                    my_dict[rows]['Item']['value2']['S'],
                    datetime.datetime.now())


                yield direct_row
            

        except Exception as e:
            raise e



def run():
    options = CustomPipelineOptions(
            save_main_session=True,
            bigtable_project=PROJECT,
            bigtable_table=TABLE,
            bigtable_instance=INSTANCE,
    )



    with beam.Pipeline(options=options) as p:
        S3IO(options=pipeline_options \
            .S3Options([
                "--s3_region_name=REGION-xxx",
                "--s3_access_key_id=ACCESS_KEY-xxx",
                "--s3_secret_access_key=ACCESS_KEY-xxx"
            ])).open("s3://filepath/", 'r') \
        | beam.ParDo(ConvertByteArrayToTuple(options)) \
        | WriteToBigTable(project_id=options.bigtable_project,
                           instance_id=options.bigtable_instance,
                           table_id=options.bigtable_table)
        
if __name__ == "__main__":
    run()


编辑:代码在某些时间间隔内非常重复且不科学,因为我仍在努力学习和实施。

google-cloud-platform google-cloud-dataflow google-cloud-bigtable
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.