我遇到了 DeduplicateRecord 处理器的问题。 我正在尝试设置一个流程,其中流程文件记录中可能有重复的记录。
出于测试目的,我使用GenerateRecord处理器生成一个包含10个JSON对象的流文件记录。
这是 JSON 示例:
[{"city":"Pollyside"},{"city":"East Deweytown"},{"city":"Beulahland"},{"city":"Windlerstad"},{"city":"South Rocky"},{"city":"Nicolasachester"},{"city":"Doyleland"},{"city":"Framifort"},{"city":"Bartolettimouth"},{"city":"Lake Carroll"}]
在 DeduplicateRecord 处理器中,我设置了一个记录路径属性来对“city”字段值上的记录进行重复数据删除。
当 DeduplicateRecord 处理器被传入的流文件触发时,它会抛出异常:
2024-07-04 15:18:51,518 错误 [定时器驱动进程线程 4] o.a.n.p.standard.DeduplicateRecord DeduplicateRecord[id=7e4ddfab-0190-1000-3ed6-a985fe488c5b] 未能检测索引 0 处的重复记录 org.apache.nifi.record.path.exception.RecordPathException:第 1 行第 0 列出现意外标记 ''。查询: 在 org.apache.nifi.record.path.RecordPathParser.displayRecognitionError(RecordPathParser.java:120) 在org.antlr.runtime.BaseRecognizer.reportError(BaseRecognizer.java:186) 在 org.apache.nifi.record.path.RecordPathParser.filter(RecordPathParser.java:1356) 在org.apache.nifi.record.path.RecordPathParser.pathOrFunction(RecordPathParser.java:4415) 在org.apache.nifi.record.path.RecordPathParser.pathExpression(RecordPathParser.java:4468) 在 org.apache.nifi.record.path.RecordPath.compile(RecordPath.java:84) 在 com.github.benmanes.caffeine.cache.LocalLoadingCache.lambda$newMappingFunction$2(LocalLoadingCache.java:145) 在 com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406) 在 java.base/java.util.concurrent.ConcurrentHashMap.compute(来源未知) 在com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404) 在 com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387) 在 com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108) 在 com.github.benmanes.caffeine.cache.LocalLoadingCache.get(LocalLoadingCache.java:56) 在 org.apache.nifi.record.path.util.RecordPathCache.getCompiled(RecordPathCache.java:34) 在 org.apache.nifi.processors.standard.DeduplicateRecord.evaluateKeyFromDynamicProperties(DeduplicateRecord.java:555) 在org.apache.nifi.processors.standard.DeduplicateRecord.onTrigger(DeduplicateRecord.java:470) 在 org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) 在org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1361) 在org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:247) 在 org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) 在 org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(来源未知) 在 java.base/java.util.concurrent.FutureTask.runAndReset(来源未知) 在 java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(来源未知) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(来源未知) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(来源未知) 在 java.base/java.lang.Thread.run(来源未知)
我使用的是1.26.0版本的NiFi。
我在这里做错了什么?任何帮助表示赞赏。