错误:使用Cloud Dataflow写入BigQuery时无法散列的类型'dict'

问题描述 投票:0回答:1

我需要通过apache-beam从pub / sub将数据写入Bigquery。代码如下所示:

import argparse
import base64
import logging
import json
from datetime import datetime
from ast import literal_eval
from google.cloud import bigquery

import apache_beam as beam


class DataIngestion(beam.DoFn):

    @classmethod
    def parse_method(cls, string_input):
        """
        :param string_input:
        :return:
        """
        try:
            pubsub_message = literal_eval(string_input.data.decode('utf8'))
            process_data = pubsub_message['data']
            print('- ' * 20)
            regex = '\n'
            for i in process_data.split(regex)[:-1]:
                d = eval(i)
                d['dt'] = "{}".format(datetime.utcnow().strftime('%Y-%m-%d'))
                return json.loads(d)
        except Exception as e:
            logger.exception(e)

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_subscription', required=False,
                        help='Input PubSub subscription of the form "projects/<project>/subscriptions/<subscription_name>".',
                        default='projects/dataprocess/subscriptions/client_sub')
    parser.add_argument('--output', dest='output', required=False,
                        help='Output BQ table to write results to.',
                        default='dataprocess:fm.client_event')

    known_args, pipeline_args = parser.parse_known_args(argv)

    with beam.Pipeline(argv=pipeline_args) as p:
        lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription, with_attributes=True)
        table_info = 'dataprocess:fm.client_event'
        table_schema = 'app_version:STRING, build:INTEGER, channel:STRING, client_ip:STRING, client_ip:STRING...'
        transformed = {
            lines
            | 'String to BigQuery Row' >> beam.Map(lambda s: DataIngestion.parse_method(s))
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                                     table_info,
                                     schema=table_schema,
                                     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
        }

    transformed.run().wait_until_finish()


if __name__ == '__main__':
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    run()

此工作是尝试将数据拆分并解析为单行,并通过使用写入bigquerybeam.io.WriteToBigQuery,这将导致以下堆栈跟踪:

Traceback (most recent call last):
  File "./bigquery_io_write.py", line 68, in <module>
    run()
  File "./bigquery_io_write.py", line 57, in run
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
TypeError: unhashable type: 'dict'

奇怪的是,我没有将任何命令传递给WriteToBigQuery,并且我试图修改pub / sub数据的架构,甚至删除create_dispositionwrite_disposition。似乎某个参数的传递在某处出错,我已经证实它与parse_method函数的数据格式无关,该工作在执行该步骤之前就失败了。

python google-bigquery google-cloud-dataflow apache-beam
1个回答
0
投票

parse_method返回调用json.loads(d)的结果。根据Python's documentationjson.loads返回一个字典。

此外,return语句位于for循环内,因此您仅解析每个消息中的第一行。您是否要使用发电机?

© www.soinside.com 2019 - 2024. All rights reserved.