querydatabasetable --> 从源获取大约 1GB 数据。
convertavrotojson ---> 将avro从at数据转换为json格式。
splitjson --> 将 json 流文件拆分为单个 json 记录/行。
evaluatejsonpath --> 映射属性。
replacetext --> 准备插入语句并使用正则表达式对数据进行一些转换。
putsql--> 将所有数据摄取到 postgresql 中。我没有在 putsql 的成功关系中保存任何数据,使用流文件过期时间为 2 秒。我正在清空队列。因为记录数量巨大
因为队列阈值为 10,000。为了 putsql 的成功,我已经给了流程文件过期 2 秒。
因此,在摄取所有数据并且流文件在到期后变空之后,我需要触发下一个executesql处理器来运行存储过程。
那么如何在putsql处理器之后触发下一个处理器。在这里,我将 json 文件拆分为单个 json 记录,以对数据执行一些转换。因此,在碎片整理策略中使用合并内容并以 zip 形式合并格式是行不通的。我不能在 putsql 之后保留数据太长时间,因为当达到阈值大小时队列将被搁置。
那么,如何在所有记录被摄取后仅触发一次executesql处理器。你能帮我创建这个流程吗?
我使用rest api检查队列大小是否小于0或大于0。如果小于0则仅调用存储过程。但如何仅在摄取完成后触发invokehttp(rest api)或executesql来调用过程。
选项1
querydbtable
填充属性 querydbtable.row.count
。
为什么不将此参数传递给存储过程调用 - 这样,它可以检查源表中的行数是否对应于临时表中的行数。
选项2
querydbtable
填充可能对 fragment.*
有用的 mergecontent
属性。
您可以将 json 合并到 json 数组中。合并完成后 - 您可以调用 sproc 或 api...