目前我的nifi流程如下:
ListS3 -> RouteOnAttribute -> RouteOnAttribute -> FetchS3Object -> ConvertRecord -> PutDatabaseRecord
我在 s3 存储桶中列出 CSV 文件,然后在两个不同的属性处理器上路由以选择要放入 Postgres 数据库中的文件。然后我从 s3 获取 CSV 文件。然后,我使用 ConvertRecord 处理器将 CSV 文件转换为 json 对象行,如下所示:
{"ideez": "1", "name": "John Doe", "age": "30", "city": "New York"}
{“ideez”:“2”,“姓名”:“简·史密斯”,“年龄”:“25”,“城市”:“洛杉矶”}
...
我想要这样做的原因是因为有许多 CSV 文件具有相似的标题(我路由以使用正确的标题)并且具有大部分相似的列。但有许多文件具有不同的列名称。每天都在变化。因此,将它们作为 json 对象插入到 Postgres 中的 jsonb 列中最适合我。
以下是我的 JsonRecordSetWriter 以及 PutDatabaseRecord 的设置方式:
以下是我收到的一些错误:
如果我尝试用“json_content”封装每一行:
{"json_content": {"ideez": "1", "name": "John Doe", "age": "30", "city": "New York"}},
{“json_content”:{“ideez”:“2”,“姓名”:“简·史密斯”,“年龄”:“25”,“城市”:“洛杉矶”}},
...
我收到以下错误:
“错误:json 类型的输入语法无效详细信息:令牌“MapRecord”无效。其中:JSONdata,第 1 行:MapRecord…未命名门户参数 $1=‘…’
有没有人有幸做过类似的过程?也许,我应该追求更好的过程。谢谢。
我认为你真的很接近!但问题是你的内容:
{"json_content": {"ideez": "1", "name": "John Doe", "age": "30", "city": "New York"}},
{"json_content": {"ideez": "2", "name": "Jane Smith", "age": "25", "city": "Los Angeles"}}
将 json 内容作为实际的 json 对象,NiFi 将其解释为记录。因此,关于 MapRecord 的错误。您需要“字符串化”该字段。我认为我可以研究一些解决方案,以便使 JSON 字符串化更加方便。但现在,您可以使用原始字符串操作来做到这一点。我会在 PutDatabaseRecord 之前使用 ReplaceText 处理器,并以这种方式配置它:
Replacement Strategy = Regex Replace
Search Value = (^.*$)
Replacement Value = {"json_content": "${'$1':escapeJson():replace('\\', '\\\\')}"}
Character Set = UTF-8
Maximum Buffer Size = 1 MB
Evaluation Mode = Line-by-Line
Line-by-Line Evaluation Mode = All
这应该处理将其转换为字符串并正确转义它。