我正在尝试从 PubSub 订阅中提取数据,最后,一旦提取了数据,我想做一些转换。目前,它是字节格式。我尝试了多种方法来使用自定义模式以 JSON 格式提取数据,但它因错误而失败
RuntimeError: json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1) [while running 'Map(<lambda at onlyRead.py:67>)']
readPubSub.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import typing
class MySchema(typing.NamedTuple):
user_id:str
event_ts:str
create_ts:str
event_id:str
ifa:str
ifv:str
country:str
chip_balance:str
game:str
user_group:str
user_condition:str
device_type:str
device_model:str
user_name:str
fb_connect:bool
is_active_event:bool
event_payload:str
TOPIC_PATH = "projects/nectar-259905/topics/events"
def parse_json_message(message):
import ast
"""Parse the input json message and add 'score' & 'processing_time' keys."""
#row = json.loads(message)
row = ast.literal_eval(json.dumps(message,indent=4))
return {
'user_id':row['user_id'],
'event_ts':row['event_ts'],
'create_ts':row['create_ts'],
'event_id':row['event_id'],
'ifa':row['ifa'],
'ifv':row['ifv'],
'country':row['country'],
'chip_balance':row['chip_balance'],
'game':row['game'],
'user_group':row['user_group'],
'user_condition':row['user_condition'],
'device_type':row['device_type'],
'device_model':row['device_model'],
'user_name':row['user_name'],
'fb_connect':row['fb_connect'],
'is_active_event':row['is_active_event'],
'event_payload':row['event_payload']
}
def run(pubsub_topic):
options = PipelineOptions(
streaming=True
)
runner = 'DirectRunner'
print("I reached before pipeline")
with beam.Pipeline(runner, options=options) as pipeline:
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
#| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
#| 'Parse JSON messages' >> beam.Map(parse_json_message)
#| "Convert to list" >> beam.Map(lambda x: x.split(","))
|beam.Map(lambda x: json.loads(x.decode("utf8"))).with_output_types(MySchema)
| "Writing to console" >> beam.Map(print))
print("I reached after pipeline")
result = message.run()
result.wait_until_finish()
run(TOPIC_PATH)
下面我直接用的话
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| "Writing to console" >> beam.Map(print))
我得到输出为
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:42:52.283 UTC',
'event_id': 'Game_Request_Declined',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9140',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:54:38.297 UTC',
'event_id': 'Decline_Game_Request',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9905',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
如果我在将数据解析为 JSON 时做错了什么,请告诉我。我也在寻找示例来进行数据屏蔽并在 APache Beam 中运行一些 sqls
我假设您的示例输出对应于两个单独的 PubSub 消息。
您的问题是由
ast.literal_eval
引起的,它无法评估字段event_payload
(不是正确的json格式)的值。
您可以完全跳过您的
parse_json_message
,因为在解码之后,PubSub 消息已经是一个 json。相反,你可以像这样用一个简单的beam.Map
填充你的NamedTuple
with beam.Pipeline(runner, options=options) as pipeline:
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| 'Map to MySchema' >> beam.Map(lambda msg: MySchema(**msg)).with_output_types(MySchema)
MySchema(**msg)
仅在 NamedTuple 与 PubSub 输入相同时有效。如果它们不同,您需要手动填写架构,例如
MySchema(
field_1 = msg['key_1'],
field_2 = msg['key_2']
)
如果您的消息包含多个事件(例如,示例输出中的两个字典),您需要先将它们拆分。