这是我的问题:我在HDFS中有一个文件,该文件可能很大(=不足以容纳所有内存)
我想做的是避免必须将此文件缓存在内存中,而仅像逐行处理常规文件一样逐行处理它:
for line in open("myfile", "r"):
# do some processing
我正在寻找是否有一种简单的方法可以在不使用外部库的情况下正确完成此操作。我可能可以使它与libpyhdfs或python-hdfs一起使用,但我希望尽可能避免在系统中引入新的依赖项和未经测试的库,尤其是因为这两个文件似乎都没有得到很好的维护,并声明它们不应不能用于生产。
我曾考虑使用Python subprocess
模块使用标准的“ hadoop”命令行工具执行此操作,但由于没有命令行工具可以执行我的操作,因此我似乎无法执行所需的操作处理,我想以流方式为每行执行一个Python函数。
是否可以使用子过程模块将Python函数用作管道的正确操作数?甚至更好,像打开文件一样将其作为生成器打开,这样我就可以轻松地处理每一行?
cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
如果有另一种无需使用外部库即可实现上述内容的方法,我也很开放。
感谢您的帮助!
您想要xreadlines,它将从文件中读取行,而不会将整个文件加载到内存中。
编辑:
现在我看到了您的问题,您只需要从Popen
对象中获取标准输出管道:
cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in cat.stdout:
print line
过去两年中,关于Hadoop-Streaming的运动很多。根据Cloudera的说法,这非常快:http://blog.cloudera.com/blog/2013/01/a-guide-to-python-frameworks-for-hadoop/我在此方面取得了很好的成功。
您可以使用WebHDFS Python库(在urllib3之上构建):
from hdfs import InsecureClient
client_hdfs = InsecureClient('http://host:port', user='root')
with client_hdfs.write(access_path) as writer:
dump(records, writer) # tested for pickle and json (doesnt work for joblib)
或者您可以在python中将请求包用作:
import requests
from json import dumps
params = (('op', 'CREATE')
('buffersize', 256))
data = dumps(file) # some file or object - also tested for pickle library
response = requests.put('http://host:port/path', params=params, data=data)
希望这会有所帮助!