使用python连接kafka和elasticsearch

问题描述 投票:0回答:1

我对使用 Kafka 和 elasticsearch 很陌生。我一直在尝试使用弹性搜索,但遇到了一些问题。我已经开发了一个 docker compose 文件,其中包含构建环境所需的所有图像,然后使用 kafka 将数据产品化为特定主题的数据,然后我需要从 Kafka 的消费者数据中获取到 pub/sub 系统发送数据以摄取到elasticsearch中。 我使用 python 实现了这一切。我已经看到,在端口和 localhost 中,elasticsearch 的 ip 出现,而页面中的 kibana 出现以下句子: kibana 服务器尚未准备好

消费者Python与它类似,我从中获取主题数据:

from kafka import KafkaConsumer
# Import sys module
import sys

# Import json module to serialize data
import json

# Initialize consumer variable and set property for JSON decode
consumer = KafkaConsumer ('JSONtopic',bootstrap_servers = ['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
for message in consumer:
 print("Consumer records:\n")
 print(message)
 print("\nReading from JSON data\n")
 print("Name:",message[6]['name'])
 print("Email:",message[6]['email'])
 # Terminate the script
 sys.exit()

目标是使用elasticsearch进行分析,所以我需要使用它作为后端,将数据可视化到kibana中。如果有一个教程可以帮助我理解我应该做什么来链接这些信息,那我真的很感激。 (P.s. 数据可以毫无问题地从一个主题转移到另一个主题,但问题是获取这些信息并将其插入到弹性中,并有可能可视化这些信息)

python elasticsearch apache-kafka
1个回答
0
投票

如果您要将数据从 Kafka 推送到 Elasticsearch,那么使用 Consumer API 通常不是一个好主意,因为现有的工具可以做得更好并处理更多功能。

例如:

  • 卡夫卡连接
  • Logstash
© www.soinside.com 2019 - 2024. All rights reserved.