如何读取NiFi中ExecuteStreamCommand处理器中的文件

问题描述 投票:0回答:1

我的最终目标是屏蔽一个特定文件中的数据。我想将文件从一个地方移动到另一个地方。在此传输过程中,我必须使用Python脚本屏蔽数据。所以,我设计了以下流程:

GetFile > ExecuteStreamCommmand > PutFile

我使用Python设计了一个pandas脚本。我在NiFi上创建的虚拟机上运行这个Google Cloud Platform,我安装了Python-2.7NiFi-1.9.1。以下是我的熊猫代码:

import pandas as pd
readFile = pd.read_csv("/path",sep=" ",header=None)
readFile.columns = ['IP']
readFile['IP'] = readFile['IP'].replace(regex='((?<=[0-9])[0-9]|(?<=\.)[0-9])',value='X')
readFile.to_csv("/path", sep=' ')

我有些疑惑: 1)使用getFile处理器我将队列中的文件传递给下一个处理器,即ExecuteStreamCommand处理器。 2)另外,在我的Python代码中,我试图从GetFile处理器中传递的相同输入目录中读取数据,但现在该文件已被移动到getfile> executestreamcommand之间的队列中。那怎么读呢? 3)执行python脚本后,如何使用putFile处理器将其保存回其他地方?

我是NiFi的新手,所以试图理解基本的东西。另外,我附上了流程和错误截图。 enter image description here

python hadoop google-cloud-platform apache-nifi
1个回答
1
投票

ExecuteStreamCommand

流文件的内容作为stdin流传递给命令(在您的情况下为python)

所以,你必须使用以下代码:

readFile = pd.read_json(sys.stdin)

另一方面,如果你需要将regexp替换为流文件,你可以尝试使用ReplaceText处理器而不是ExecuteStreamCommand

© www.soinside.com 2019 - 2024. All rights reserved.