上载数据流模板时RuntimeValueProvider问题

问题描述 投票:0回答:1

每当我尝试使用add_value_provider_argument按照此处的说明执行我的管道模板:https://cloud.google.com/dataflow/docs/templates/creating-templates时,管道会尝试执行,从而导致错误,而不是上传到GCS存储桶。

这是我正在部署的代码:

#deploy

python -m main \
--runner DataflowRunner \
--project $PROJECT \
--staging_location gs://$DATAFLOW_BUCKET/staging \
--temp_location gs://$DATAFLOW_BUCKET/temp \
--output gs://$DATAFLOW_BUCKET/output \
--template_location gs://$DATAFLOW_BUCKET/templates/$TEMPLATE_NAME

#pipeline.py

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):      
    parser.add_value_provider_argument( #add_argument
                        '--date',
                        required = False,
                        default='2018-09-28',
                        help='Date to process, e.g. 2018-09-28'                        
                        )    


RUNNER_TYPE = 'DataflowRunner'

version = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')

pipeline_options = PipelineOptions()
custom_options = pipeline_options.view_as(MyOptions)

options = PipelineOptions()

google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = 'test-v{}'.format(version)
google_cloud_options.staging_location = 'gs://{}/staging'.format(STAGING_BUCKET)
google_cloud_options.temp_location = 'gs://{}/temp'.format(STAGING_BUCKET)
pipeline_options.view_as(StandardOptions).runner = RUNNER_TYPE
pipeline_options.view_as(StandardOptions).streaming = False

#installing packages used in process
setup_options = pipeline_options.view_as(SetupOptions)
setup_options.setup_file = './setup.py'
setup_options.save_main_session = False       

def run(argv=None):

    with beam.Pipeline(options=pipeline_options) as p: 

        read_file = 'gs://{}/{}-*'\
                            .format(DATA_BUCKET,custom_options.date)


        data = (p | 'Read' >> ReadFromText(read_file,coder=JsonCoder())
                  | 'ParseData' >> beam.ParDo(parse_data)
                  | 'FragmentData' >> beam.ParDo(fragment)
                  | 'CleanHeader' >> beam.ParDo(clean_header)
                  | 'RemoveMalformedRows' >> beam.ParDo(remove_malformed_rows)
                  | 'ZipData' >> beam.ParDo(zip_data)
                  | 'FilterFields' >> beam.ParDo(filter_fields)   
        )

        bigquery_write_fn = BigQueryWriteFn(table_id=TABLE_ID,dataset_id=DATASET_ID,
                                        project_id=PROJECT_ID,batch_size=100,schema=SCHEMA,
                                        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                        write_disposition=BigQueryDisposition.WRITE_APPEND, #WRITE_TRUNCATE, WRITE_APPEND
                                        client=None)        

        data | 'WriteToBigQuery' >> beam.ParDo(bigquery_write_fn)           

if __name__ == '__main__':
    run()

错误:

IOError: No files found based on the file pattern gs://<BUCKET>/RuntimeValueProvider(option: date, type: str, default_value: '2018-09-28')-*

奇怪的是当我使用parser.add_argument而不是parser.add_value_provider_argument模板上传到GCS但我无法更改默认参数。

当将解析器参数从add_argument更改为add_value_provider_argument时,为什么管道执行而不是上载。

google-cloud-platform google-cloud-dataflow
1个回答
1
投票

我最近碰到了同样的问题,问题是在管道构建期间ValueProvider对象不可用。这意味着使用Python,您无法指定文件名或基于RuntimeValueProviders构建动态文件名。

你需要做的是添加另一个替换“read_file”变量的参数,并直接传递给ReadFromText方法。

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):      
    parser.add_value_provider_argument( #add_argument
                        '--input',
                        required = False,
                        default='',
                        help='Full path to input file'                        
                        )    


RUNNER_TYPE = 'DataflowRunner'

version = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')

pipeline_options = PipelineOptions()
custom_options = pipeline_options.view_as(MyOptions)

options = PipelineOptions()

google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = 'test-v{}'.format(version)
google_cloud_options.staging_location = 'gs://{}/staging'.format(STAGING_BUCKET)
google_cloud_options.temp_location = 'gs://{}/temp'.format(STAGING_BUCKET)
pipeline_options.view_as(StandardOptions).runner = RUNNER_TYPE
pipeline_options.view_as(StandardOptions).streaming = False

#installing packages used in process
setup_options = pipeline_options.view_as(SetupOptions)
setup_options.setup_file = './setup.py'
setup_options.save_main_session = False       

def run():

    with beam.Pipeline(options=pipeline_options) as p: 

        data = (p | 'Read' >> ReadFromText(custom_options.input,coder=JsonCoder())
                  | 'ParseData' >> beam.ParDo(parse_data)
                  | 'FragmentData' >> beam.ParDo(fragment)
                  | 'CleanHeader' >> beam.ParDo(clean_header)
                  | 'RemoveMalformedRows' >> beam.ParDo(remove_malformed_rows)
                  | 'ZipData' >> beam.ParDo(zip_data)
                  | 'FilterFields' >> beam.ParDo(filter_fields)   
        )

        bigquery_write_fn = BigQueryWriteFn(table_id=TABLE_ID,dataset_id=DATASET_ID,
                                        project_id=PROJECT_ID,batch_size=100,schema=SCHEMA,
                                        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                        write_disposition=BigQueryDisposition.WRITE_APPEND, #WRITE_TRUNCATE, WRITE_APPEND
                                        client=None)        

        data | 'WriteToBigQuery' >> beam.ParDo(bigquery_write_fn)           

if __name__ == '__main__':
    run()
© www.soinside.com 2019 - 2024. All rights reserved.