Apache NiFi是一个易于使用,功能强大且可靠的分布式系统,用于转换和分发数据。
如何在 Apache NiFi 中解析和转换二进制数据结构?
我正在使用 Apache NiFi,我需要解析和转换传入的二进制数据,这些数据遵循格式字符串 ' 定义的特定结构 我正在使用 Apache NiFi,我需要解析和转换传入的二进制数据,这些数据遵循每条消息的格式字符串 '<BBHBBHHHHHHh'(20 字节)定义的特定结构(类似于 Python 的 struct 模块格式字符串)。数据表示一系列不同类型的字段(无符号字符、无符号短整型、有符号短整型),我需要提取这些值以便在 NiFi 流中进行进一步处理。使用 python 从 mqtt 中使用并使用 struct 将其转换为 json 非常容易,但是如何在 nifi 中做到这一点?我设法使用 mqtt 但无法解码。将来我每秒会收到许多这样的 20 字节数据包,我希望它稳定且高性能。 我知道 NiFi 主要处理基于文本或更通用的数据结构,例如 JSON 或 XML,并且我正在寻找在 NiFi 中处理这种二进制数据解析的最佳方法,如果可能的话,无需依赖外部脚本或工具。 这是我迄今为止考虑或尝试过的: 基于记录的处理器,例如ConvertRecord,但我不确定如何为二进制数据配置记录读取器。 脚本处理器,例如 ExecuteScript 或 InvokeScriptedProcessor,但我担心 Jython 或 Groovy 中处理二进制数据结构的性能和复杂性。 外部工具或脚本,我希望避免使用这些工具或脚本,以便将处理保留在 NiFi 的托管环境中。 将此类二进制数据解析集成到 NiFi 数据流中的最佳实践或模式。 您的经验中的任何建议或见解将不胜感激! 如果 nifi 不是合适的工具,我对其他工具持开放态度,我确实研究过 flink,但这看起来很复杂,我喜欢 nifi 的流程思想。我也研究过 redis gears,但从 api 开发来看,它看起来不稳定。 这很奇怪,但最有效的变体是使用 ExecuteGroovyScript (或 ScriptedProcessor,但在性能上相同 ScriptedProc 有一些附加选项)或外部脚本和 ExecuteStreamCommand。但我认为 ExecuteGroovyScript 是不太复杂的解决方案。 更通用的解决方案编写 ScriptedReader 并根据测试使用 ConverRecord 它不是那么性能,但更通用的解决方案编写 ScriptedReader 您可以比在您选择的任何过程中使用它。
我想在NiFi中将MySQL DB的获取操作分成4部分。我按顺序放置了 4 个 ExecuteSQLRecord,我希望在最后一部分中拥有所有数据(我知道 MergeContent,但是我...
所以,我有一个如下所示的输入: { “实体类型”:“工作”, “身份证”:1019167, “特性” : { "TemplateName" : [ "开发Bug&
有什么方法可以将 Apache Nifi 服务器时区传递到处理器中吗? 例如,在 UpdateRecord 中我尝试了以下表达式: ${Duser.时区} 还: #{-Duser.timezone} 但它不会重蹈覆辙...
假设我有一个处理器生成的 100 个流文件,每个文件都包含不同的行。我想要一个包含 100 行的新流文件。我怎样才能做到这一点? 我尝试过 MergeCont...
使用 Nifi JoltTransformJson 转换数组中的 JSON 对象
我在Nifi中使用JOLTTransformJson处理器。 我的Json输入格式有以下4种: [ { “一”:“1” } ] 或者 [ { “一”:“1”, &
使用UpdateRecord,我尝试将json字符串转换为有效的json: 输入: { "用户" : "{\"id\":\"1\",\"name\":\"TEST\"}" } 当前...
这是我尝试过的语法 0 30 0 * * ?或 0 30 1 * * ?,应分别在午夜 12:30 或 1:30 运行。 另外,我厌倦了每 20 分钟在 NiFi 中使用 cron 运行一次流程,...
如何配置 OAuth 处理器/控制器服务以通过客户端凭据对安全 REST API 进行身份验证从 Apache Nifi 获取访问令牌
如何在 Apache Nifi 中配置 OAuth 2.0 处理器/控制器服务,以使用 StandardOauth2AccessTokenProvider 和客户端凭据对安全 REST API 进行身份验证 尝试了我所做的一切...
传入的 csv 文件具有以下架构: 纬度;经度 90.232;24.244 我需要通过使用新字段“json”更新传入架构来生成流文件。 预计结果...
Apache NiFi 2.0.0 中缺少 PutHDFS 处理器
我使用的是 Apache NiFi 2.0.0,不幸的是它不包括 PutHDFS 处理器。我的项目需要这个版本的 NiFi,因为它具有与 Python 脚本的集成功能,所以升级...
我正在使用 Apache NIFI 1.28 版本,我正在尝试创建一个简约的数据流,我在其中生成数据并希望在 HDP(Hortonworks 数据平台)2.5.0 中摄取 HDFS,我正在...
我有一个 JSON 输入,我使用 JOLT 表示法将其转换为另一个 JSON。 我有需要保存为键值对的措施,站名和站号合并为一个字段和日期字段
如何将`nifi.properties`文件位置从`/opt/nifi/nifi-current/conf/`更改为其他位置
我正在尝试做的事情: 我正在尝试将 nifi 作为 kubernetes 中的单个 pod statefulset 运行,并将持久卷附加到 /mnt/ 并在此 /mnt/ 中安装一些配置映射 我尝试过的: 我…
我有一个 JOLT,可以将温度值转换为摄氏度。我在 NIFI 流中使用它,并获得具有不同值的不同 JSON 输入。当有温度场时,我的 JOLT 工作正常......
我有以下 JSON 结构: { “1”:{ "前面的 ID": "0", “值”:“A” }, “2”:{ “precedingId”:“...
我尝试启动 nifi-toolkit 的容器,收到错误消息“未指定程序选项”。可用选项包括: encrypt-config s2s flow-analyzer node-manager tls-toolkit file-mana...
Apache Nifi 表达式语言:查找与正则表达式匹配的部分内容
任务是找到表达式中与所选正则表达式匹配的部分,并将该部分放入新字段的值中。 例如,“描述”字段中的表达式: “拉蒂特...
在 nifi 应用程序中运行的 Python 脚本,用于在最后时钟大于 nowminus1 分钟时根据流文件内容路由流文件
{ “最后时钟”:纪元格式, “最后值”:98.5778 } 我想根据最后一个时钟值来路由流文件,如果它大于现在 - 1 分钟,则路由到成功,剩余到失败