在 python 中使用 apache beam 从 pubsub 输出中提取列值

问题描述 投票:0回答:1

我正在尝试从 PubSub 订阅中提取数据,最后,一旦提取了数据,我想做一些转换。目前,它是字节格式。我尝试了多种方法来使用自定义模式以 JSON 格式提取数据,但它因错误而失败

RuntimeError: json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1) [while running 'Map(<lambda at onlyRead.py:67>)']

readPubSub.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import typing

class MySchema(typing.NamedTuple):
    user_id:str
    event_ts:str    
    create_ts:str   
    event_id:str    
    ifa:str
    ifv:str
    country:str
    chip_balance:str    
    game:str    
    user_group:str
    user_condition:str  
    device_type:str 
    device_model:str    
    user_name:str   
    fb_connect:bool 
    is_active_event:bool    
    event_payload:str

TOPIC_PATH = "projects/nectar-259905/topics/events"

def parse_json_message(message):
    import ast
    """Parse the input json message and add 'score' & 'processing_time' keys."""
    #row = json.loads(message)
    row = ast.literal_eval(json.dumps(message,indent=4))
    return {
'user_id':row['user_id'],
'event_ts':row['event_ts'],
'create_ts':row['create_ts'],
'event_id':row['event_id'],
'ifa':row['ifa'],
'ifv':row['ifv'],
'country':row['country'],
'chip_balance':row['chip_balance'],
'game':row['game'],
'user_group':row['user_group'],
'user_condition':row['user_condition'],
'device_type':row['device_type'],
'device_model':row['device_model'],
'user_name':row['user_name'],
'fb_connect':row['fb_connect'],
'is_active_event':row['is_active_event'],
'event_payload':row['event_payload']
    }

def run(pubsub_topic):
    options = PipelineOptions(
        streaming=True
    )
    runner = 'DirectRunner'

    print("I reached before pipeline")

    with beam.Pipeline(runner, options=options) as pipeline:
        message=(
            pipeline
            | "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
            #| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
            #| 'Parse JSON messages' >> beam.Map(parse_json_message)
            #| "Convert to list" >> beam.Map(lambda x: x.split(","))
            |beam.Map(lambda x: json.loads(x.decode("utf8"))).with_output_types(MySchema)
            | "Writing to console" >> beam.Map(print))
        
    print("I reached after pipeline")
    result = message.run()
    result.wait_until_finish()


run(TOPIC_PATH)

下面我直接用的话

message=(
    pipeline
    | "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
    | 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
    | "Writing to console" >> beam.Map(print))

我得到输出为

    {
        'user_id': '102105290400258488',
        'event_ts': '2021-05-29 20:42:52.283 UTC',
        'event_id': 'Game_Request_Declined',
        'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
        'ifv': '3fc6eb8b4d0cf096c47e2252f41',
        'country': 'US',
        'chip_balance': '9140',
        'game': 'gru',
        'user_group': '[1, 36, 529702]',
        'user_condition': '[1, 36]',
        'device_type': 'phone',
        'device_model': 'TCL 5007Z',
        'user_name': 'Minnie',
        'fb_connect': True,
        'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
        'is_active_event': True
    }

{
    'user_id': '102105290400258488',
    'event_ts': '2021-05-29 20:54:38.297 UTC',
    'event_id': 'Decline_Game_Request',
    'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
    'ifv': '3fc6eb8b4d0cf096c47e2252f41',
    'country': 'US',
    'chip_balance': '9905',
    'game': 'gru',
    'user_group': '[1, 36, 529702]',
    'user_condition': '[1, 36]',
    'device_type': 'phone',
    'device_model': 'TCL 5007Z',
    'user_name': 'Minnie',
    'fb_connect': True,
    'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
    'is_active_event': True
}

如果我在将数据解析为 JSON 时做错了什么,请告诉我。我也在寻找示例来进行数据屏蔽并在 APache Beam 中运行一些 sqls

python google-cloud-dataflow apache-beam google-cloud-pubsub
1个回答
0
投票

我假设您的示例输出对应于两个单独的 PubSub 消息。

您的问题是由

ast.literal_eval
引起的,它无法评估字段
event_payload
(不是正确的json格式)的值。

您可以完全跳过您的

parse_json_message
,因为在解码之后,PubSub 消息已经是一个 json。相反,你可以像这样用一个简单的
beam.Map
填充你的NamedTuple

with beam.Pipeline(runner, options=options) as pipeline:
        message=(
            pipeline
            | "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
            | 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
            | 'Map to MySchema' >> beam.Map(lambda msg: MySchema(**msg)).with_output_types(MySchema)

MySchema(**msg)
仅在 NamedTuple 与 PubSub 输入相同时有效。如果它们不同,您需要手动填写架构,例如

MySchema(
 field_1 = msg['key_1'],
 field_2 = msg['key_2']
)

如果您的消息包含多个事件(例如,示例输出中的两个字典),您需要先将它们拆分。

© www.soinside.com 2019 - 2024. All rights reserved.