如何删除引号并以原始格式发送数据原始的JSON格式为:
{
"@timestamp": "2020-06-02T09:38:03.183186Z"
}
此数据在另一个主题中
"{\"@timestamp\": \"2020-05-25T17:40:47.582778Z\"}"
这是在服务器之间发送数据的代码
def parse(d):
if str(type(d)) == "<class 'dict'>":
return (r)
return -1
producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
value_serializer=lambda x: dumps(x).encode('utf-8')) # utf-8
consumer = KafkaConsumer(bootstrap_servers=param["BOOTSTRAP_SERVERS"]+'1',
auto_offset_reset=param["AUTO_OFFSET_RESET"],
consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
enable_auto_commit=False,
auto_commit_interval_ms=60000,
group_id=param["GROUP_ID"],
client_id=param["CLIENT_ID"]
)
consumer.subscribe([param["TOPIC_IN"]])
while True:
num_rows = 0
for msg in consumer:
num_rows = num_rows + 1
m = json.loads(msg.value)
j = parse(m)
if j != -1:
data = json.dumps(j)
producer.send(param["TOPIC_OUT"], value=data)
您当前正在将值序列化为字符串。如果要使用JSON而不是字符串,则需要正确序列化值。
以下应完成的技巧:
import json
producer = KafkaProducer(
bootstrap_servers='mykafka-broker',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)