在apache beam管道中,我从云存储中获取输入并尝试在biqguery表中编写它。但是在执行管道期间遇到此错误。 “AttributeError:'module'对象没有属性'storage'”
def run(argv=None):
with open('gl_ledgers.json') as json_file:
schema = json.load(json_file)
schema = json.dumps(schema)
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://bucket_name/poc/table_name/2019-04-12/2019-04-12 13:47:03.219000_file_name.csv',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
required=False,
default="path to bigquery table",
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
(p
| 'read' >> ReadFromText(known_args.input)
# | 'Format to json' >> (beam.ParDo(self.format_output_json))
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(known_args.output, schema=schema)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 773, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 269, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 410, in load_session
module = unpickler.load()
File "/usr/lib/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1139, in load_reduce
value = func(*args)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 828, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
AttributeError: 'module' object has no attribute 'storage'```
这可能与pipeline_options.view_as(SetupOptions).save_main_session = True
有关。你需要那条线吗?
尝试删除它,看看它是否解决了问题。您的某个进口很可能无法进行腌制。没有导入我无法帮助您进一步调试。您也可以尝试将导入移动到run函数中。