我需要为我的项目使用什么正确的Kafka结构以及为什么的建议。
我的项目我创建了一个投资机器人管理平台。非常高级-您可以编写几种投资策略,然后将其上传到平台,它们将实时执行,提供有关性能的分析和实时信息。这些策略从4个数据流中获取信息。当他们从4个不同的Kafka主题中读取数据时,这些数据将传递给策略。这个kafka主题直接从交流websocket接收信息。在任何给定时间,平台中都有动态数量的机器人。
我所做的是:使用图像Kafka-wurmeister和zookeper来初始化kafka预先初始化我将需要的所有Kakfka主题。通过使用以下方法生成主题的所有信息,将所需的数据推送到Kafka:
payloads = [ { topic: topic, messages: JSON.stringify(message), partition: 0 } ] await producer.send(payloads, async function (err, data) { })
然后,我通过一个简单的使用者从主题中读取策略,如下所示:消费者=新消费者(客户,[{主题:主题,分区:0}]);Consumer.on('message',function(message){
// Parse the value consumed from kafka parsedPrice = JSON.parse(message.value) })
目的是讨论我如何使用kafka来确保自己能够做到,首先要从几个不同的使用者那里访问主题,其次要有足够的冗余以确保我的正常运行时间非常长。
我需要为我的项目使用什么正确的Kafka结构以及为什么的建议。我的项目Im正在创建投资机器人管理平台。非常高级-您可以编码...
如果要从多个使用者中访问一个主题,则可以建立一个消费者组,在该组中一个或多个消费者共同工作以消费一个主题。