我对使用 Kafka 和 elasticsearch 很陌生。我一直在尝试使用弹性搜索,但遇到了一些问题。我已经开发了一个 docker compose 文件,其中包含构建环境所需的所有图像,然后使用 kafka 将数据产品化为特定主题的数据,然后我需要从 Kafka 的消费者数据中获取到 pub/sub 系统发送数据以摄取到elasticsearch中。 我使用 python 实现了这一切。我已经看到,在端口和 localhost 中,elasticsearch 的 ip 出现,而页面中的 kibana 出现以下句子: kibana 服务器尚未准备好
消费者Python与它类似,我从中获取主题数据:
from kafka import KafkaConsumer
# Import sys module
import sys
# Import json module to serialize data
import json
# Initialize consumer variable and set property for JSON decode
consumer = KafkaConsumer ('JSONtopic',bootstrap_servers = ['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
for message in consumer:
print("Consumer records:\n")
print(message)
print("\nReading from JSON data\n")
print("Name:",message[6]['name'])
print("Email:",message[6]['email'])
# Terminate the script
sys.exit()
目标是使用elasticsearch进行分析,所以我需要使用它作为后端,将数据可视化到kibana中。如果有一个教程可以帮助我理解我应该做什么来链接这些信息,那我真的很感激。 (P.s. 数据可以毫无问题地从一个主题转移到另一个主题,但问题是获取这些信息并将其插入到弹性中,并有可能可视化这些信息)
如果您要将数据从 Kafka 推送到 Elasticsearch,那么使用 Consumer API 通常不是一个好主意,因为现有的工具可以做得更好并处理更多功能。
例如: