我需要通过apache-beam从pub / sub将数据写入Bigquery。代码如下所示:
import argparse
import base64
import logging
import json
from datetime import datetime
from ast import literal_eval
from google.cloud import bigquery
import apache_beam as beam
class DataIngestion(beam.DoFn):
@classmethod
def parse_method(cls, string_input):
"""
:param string_input:
:return:
"""
try:
pubsub_message = literal_eval(string_input.data.decode('utf8'))
process_data = pubsub_message['data']
print('- ' * 20)
regex = '\n'
for i in process_data.split(regex)[:-1]:
d = eval(i)
d['dt'] = "{}".format(datetime.utcnow().strftime('%Y-%m-%d'))
return json.loads(d)
except Exception as e:
logger.exception(e)
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input_subscription', required=False,
help='Input PubSub subscription of the form "projects/<project>/subscriptions/<subscription_name>".',
default='projects/dataprocess/subscriptions/client_sub')
parser.add_argument('--output', dest='output', required=False,
help='Output BQ table to write results to.',
default='dataprocess:fm.client_event')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription, with_attributes=True)
table_info = 'dataprocess:fm.client_event'
table_schema = 'app_version:STRING, build:INTEGER, channel:STRING, client_ip:STRING, client_ip:STRING...'
transformed = {
lines
| 'String to BigQuery Row' >> beam.Map(lambda s: DataIngestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table_info,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
}
transformed.run().wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger()
logger.setLevel(logging.INFO)
run()
此工作是尝试将数据拆分并解析为单行,并通过使用写入bigquerybeam.io.WriteToBigQuery
,这将导致以下堆栈跟踪:
Traceback (most recent call last):
File "./bigquery_io_write.py", line 68, in <module>
run()
File "./bigquery_io_write.py", line 57, in run
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
TypeError: unhashable type: 'dict'
奇怪的是,我没有将任何命令传递给WriteToBigQuery
,并且我试图修改pub / sub数据的架构,甚至删除create_disposition
和write_disposition
。似乎某个参数的传递在某处出错,我已经证实它与parse_method
函数的数据格式无关,该工作在执行该步骤之前就失败了。
parse_method
返回调用json.loads(d)
的结果。根据Python's documentation,json.loads
返回一个字典。
此外,return语句位于for循环内,因此您仅解析每个消息中的第一行。您是否要使用发电机?