从HDFS目录中读取文件并使用Python在Spark中创建RDD

问题描述 投票:0回答:1

我有一些文本文件,我想使用这些文件创建一个RDD。文本文件存储在“Folder_1”和“Folder_2”中,这些文件夹存储在“text_data”文件夹中

当文件存储在本地存储中时,以下代码有效:

#Reading the corpus as an RDD

data_folder = '/home/user/text_data'

def read_data(data_folder):
    data = sc.parallelize([])
    for folder in os.listdir(data_folder):
        for txt_file in os.listdir( data_folder + '/' + folder  ):
            temp = open( data_folder + '/' + folder + '/' + txt_file)
            temp_da = temp.read()
            temp_da = unicode(temp_da, errors = 'ignore')
            temp.close()
            a = [ ( folder, temp_da) ]
            data = data.union(sc.parallelize( a ) )
    return data

函数read_data返回由文本文件组成的RDD。

如果将'text_data'文件夹移动到HDFS目录,如何执行上述功能?

该代码将部署在运行SPARK的Hadoop-Yarn集群中。

python hadoop apache-spark hdfs yarn
1个回答
0
投票

替换下面的hadoop环境的namenode

hdfs_folder = 'hdfs://<namenode>/home/user/text_data/*'

def read_data(hdfs_folder):
    data = sc.parallelize([])
    data = sc.textFile(hdfs_folder)
    return data

这是在Spark 1.6.2版本中测试的

>>> hdfs_folder = 'hdfs://coord-1/tmp/sparktest/0.txt'
>>> def read_data(hdfs_folder):
...     data = sc.parallelize([])
...     data = sc.textFile(hdfs_folder)
...     return data
...
>>> read_data(hdfs_folder).count()
17/03/15 00:30:57 INFO SparkContext: Created broadcast 14 from textFile at NativeMethodAccessorImpl.java:-2
17/03/15 00:30:57 INFO SparkContext: Starting job: count at <stdin>:1
17/03/15 00:30:57 INFO SparkContext: Created broadcast 15 from broadcast at DAGScheduler.scala:1012
189
>>>
© www.soinside.com 2019 - 2024. All rights reserved.