我构建了一个梁作业,其中:
读取数据(像
{"user_id":"u1", "event_name":"logout", "region":"US"}
这是代码
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms import window
from google.cloud import bigquery
import os
import logging,json
from typing import Tuple,Iterable,Dict
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
def run(argv=None,save_main_session=True):
parser=argparse.ArgumentParser()
parser.add_argument('--outputTable',
dest='outputTable',
required=True)
parser.add_argument('--stagingLocation',
dest='stagingLocation',
required=True)
parser.add_argument('--tempLocation',
dest='tempLocation',
required=True)
parser.add_argument('--runner',
dest='runner',
required=True)
group=parser.add_mutually_exclusive_group(required=True)
group.add_argument('--inputTopic',
dest='inputTopic')
group.add_argument('--inputSub',
dest='inputSub')
known_args,pipeline_args=parser.parse_known_args(argv)
pipeline_options=PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session=save_main_session
pipeline_options.view_as(StandardOptions).streaming=True
p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
if known_args.inputSub:
message=(
p|beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
else:
message=(
p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
def extract_element_Fn(element)->Tuple[str,Dict]:
try:
print("extractElement Start")
data = element.data.decode('utf-8')
if json.loads(data).get('event_name') == 'logout':
user_id = json.loads(data).get('user_id')
# raise Exception("extract fail")
"""https://stackoverflow.com/questions/53912918/difference-between-beam-pardo-and-beam-map-in-the-output-type
Best practice: output list in ParDo(),single object in Map
"""
return (user_id, data)
except Exception as err:
step_name = 'extractElement'
failure=(step_name,user_id)
return beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)
mainData,extract_failure=(
message
|'filter logout event'>>beam.Map(extract_element_Fn).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
)
windowData=(
mainData
|'window' >> beam.WindowInto(window.FixedWindows(5,0))
|'group by key' >> beam.GroupByKey()
)
def enrich_country_Fn(element)->Tuple[str,str]:
try:
print("Enrich Country Start")
user_id=element[0]
# raise Exception("enrich country fail")
query = 'select country from `agolis-allen-first.dataflow_bole.country_dim` where user_id="{}"' \
.format(user_id)
client=bigquery.Client()
query_job = client.query(query)
result=query_job.result()
status=None
country=None
len_result = 0
for row in result:
country=row.country
len_result+=1
if len_result == 0:
status=OUTPUT_TAG_NO_REC
return (user_id,None)
else:
status = OUTPUT_TAG_COMPLETE
return (user_id,country)
except Exception as err:
step_name = 'enrich_country'
failure = (step_name,user_id)
return beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)
enrichCountry,country_failure = (
windowData
|'enrich country via ParDo' >> beam.Map(enrich_country_Fn).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
)
def enrich_history_Fn(element)->Tuple[str,Dict]:
try:
print("Enrich History Start")
user_id=element[0]
# raise Exception("enrich history fail")
query = 'select event_date,event_name,device from `agolis-allen-first.dataflow_bole.event_history` where user_id="{}"' \
.format(user_id)
client=bigquery.Client()
query_job = client.query(query)
result=query_job.result()
status=None
event_params=[]
len_result = 0
for row in result:
single_event_params={}
single_event_params['event_date']=row.event_date
single_event_params['event_name'] = row.event_name
single_event_params['device'] = row.device
event_params.append(single_event_params)
len_result+=1
if len_result == 0:
status=OUTPUT_TAG_NO_REC
return(user_id,None)
else:
status = OUTPUT_TAG_COMPLETE
return (user_id,event_params)
except Exception as err:
step_name = 'enrich_history'
failure = (step_name,user_id)
return beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)
enrichHistory,history_failure = (
windowData
|'enrich history' >> beam.Map(enrich_history_Fn).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
)
def merge_data(element):
print("Merge Data Start")
result_json={}
result_json["user_id"]=element[0]
result_json["country"]=element[1][0][0]
result_json["events"]=element[1][1][0]
return result_json
processedData = (
(enrichCountry,enrichHistory)
|beam.CoGroupByKey()
|'combine data' >> beam.Map(merge_data)
|'write complete data to bq' >> beam.io.WriteToBigQuery(
table='agolis-allen-first:dataflow_bole.result',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
)
)
def parse_failure(element):
print("parse_failure")
result_json={}
result_json["step_name"]=element[0]
result_json["user_id"]=element[1]
return result_json
failed_data=(
(extract_failure,country_failure,history_failure)
|"flattern" >> beam.Flatten()
|"format failure data" >> beam.Map(parse_failure)
| 'write failure data to bq' >> beam.io.WriteToBigQuery(
table='agolis-allen-first:dataflow_bole.result_err',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
)
)
p.run().wait_until_finish()
if __name__ == '__main__':
path_to_credential = '/Users/wangez/Downloads/GCP_Credentials/agolis-allen-first-13f3be86c3d1.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_credential
logging.getLogger().setLevel(logging.INFO)
OUTPUT_TAG_NO_REC = 'Norecord'
OUTPUT_TAG_COMPLETE = 'complete'
OUTPUT_TAG_FAILURE = 'failure'
run()
此代码可以在我的本地计算机上成功运行,但是,当我尝试在数据流上运行代码时,出现以下错误:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 297, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 372, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1051, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 568, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 261, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1513, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1533, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 208, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 265, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1495, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1506, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1055, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 209, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 717, in apache_beam.coders.coder_impl.MapCoderImpl.estimate_size
AttributeError: 'str' object has no attribute 'items' [while running 'filter logout event/Map(extract_element_Fn)-ptransform-76']
这是创建数据流管道的命令
python -m dataflow_bole_logout_event_complex_Map --outputTable agolis-allen-first:experiment.dataflow_insert --region us-central1 --stagingLocation gs://agolis-allen-first-dataflow/staging --tempLocation gs://agolis-allen-first-dataflow/temp --temp_location gs://agolis-allen-first-dataflow/temp --staging_location gs://agolis-allen-first-dataflow/staging --inputTopic projects/agolis-allen-first/topics/demo --runner DataflowRunner --project agolis-allen-first --network=first-vpc
您能建议如何处理这个问题吗?预先感谢。
DirectRunner
相当宽松。相比之下,
Dataflow
真的很严格。我相信事实上,
DirectRunner
实际上并没有在步骤之间进行解码/编码,而
Dataflow
却可以。无论如何,您将在以下
extract_element_Fn
内返回(如果出现异常):
failure=(step_name,user_id)
return beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)
但是,user_id
是字符串类型。您告诉
Dataflow
您的输出类型是
Tuple[str,Dict]
。因此,在解码/编码过程中,它试图循环遍历字典的项目。然而,由于
user_id
是一个字符串,因此它不具有属性
items
。解决方案: 要么跳过类型提示(因为在梁上下文中它们不仅仅是类型提示,请参见