鉴于数据集如下
{"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544}
{"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545}
我尝试通过type:ba
过滤那些json数据,并使用python sdk将它们插入bigquery
ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'
class ParseJsonDoFn(beam.DoFn):
B_TYPE = 'tag_B'
def process(self, element):
text_line = element.trip()
data = json.loads(text_line)
if data['type'] == 'ba':
ba = {'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']}
yield pvalue.TaggedOutput(self.B_TYPE, ba)
def run():
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='data/path/data',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DirectRunner',
'--project=project-id',
'--job_name=data-job',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
multiple_lines = (
lines
| 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
ParseJsonDoFn.B_TYPE)))
ba_line = multiple_lines.tag_B
(ba_line
| "output_ba" >> beam.io.Write(
beam.io.BigQuerySink(
table = 'ba',
dataset = 'temp',
project = 'project-id',
schema = ba_schema,
# write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
))
p.run().wait_until_finish()
输出是
/usr/local/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:342: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will not be supported
pipeline.replace_all(_get_transform_overrides(pipeline.options))
INFO:root:Running pipeline with DirectRunner.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Writing 2 rows to project-id:temp.ba table.
INFO:root:Running pipeline with DirectRunner.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Writing 2 rows to project-id:temp.ba table.
我们注意到有两行INFO:root:Writing 2 rows to project-id:temp.ba table.
,并在bigquery中查询此表
select * from `temp.ba`;
此表中有4个重复数据记录。
我想知道为什么管道运行两次相同的工作?
with
的Pipeline
声明运行管道。特别:
with beam.Pipeline(...) as p:
[...code...]
相当于:
p = beam.Pipeline(...)
[...code...]
p.run().wait_until_finish()