我有两个不同的应用程序,名为 A1 和 A2。每个应用程序都有自己的 Kafka 服务器。来自这两个 KAFKA 服务器(代理)的消息将发送至 NiFi。
每个 Kafka 都有不同的主题名称,基于此我可以区分来自 Kafka 的消息。但是除了 Kafka 的主题名称之外,NiFi 中还有其他方法来区分来自两个不同 Kafka 的消息吗?是否有任何 NiFi 处理器可以检查主题名称,然后决定下一步要采取的路线?
如果您使用NiFi的Kafka处理器(ConsumeKafka/ConsumeKafkaRecord)从Kafka接收消息,它们将以
FlowFiles
的形式输出消息。它们带有一个名为 kafka.topic
的属性,该属性将包含消息来自的主题的名称。
要根据主题名称路由消息,您可以使用
RouteOnAttribute
处理器。举例来说,您有两个主题 topicA
和 topicB
。然后你必须像这样配置 RouteOnAttribute 处理器:
然后根据您的要求将关系
topic-a
和 topic-b
连接到单独的流。如果您要添加更多 Kafka 源,您所要做的就是用更多的动态关系更新 RouteOnAttribute
。例如:topic-c : ${kafka.topic:equals('topicC')}
CryptographicHashContent
处理器来实现此目的。您获得内容的唯一哈希码,并基于此您可以确定您的消息。
HashContent
处理器从 Apache NiFi 1.8.0 起被标记为已弃用,并且可能会在即将发布的版本中删除。