如何使用pyspark检查指定的文件路径是否存在

问题描述 投票:0回答:1

我有以下两本词典

data_path = { 
                "person" : "/data/raw/person/*",
                "location" : "/data/raw/location/*",
                "person_int" : "/data/test/person_int/",
                "location_int" : "/data/test/location_int/*"
            }
            
interim_tables = {
                    "person_int" : ['ID','NAME', 'DATE'],
                    "location_int" : ['ID', 'LOCATION','DATE']
                    "person" : ['ID','NAME','DATE']
                 }

我需要检查路径中是否存在临时表。 如果存在,我需要加载增量(增量)数据。 如果不存在,我需要从“person”和“location”表加载历史数据。

这是我的代码:

check_results={}
for table, cols in interim_tables.items():
    if table in data_path:
        path=data_path[table]
        
        if os.path.exists(path):
            check_results[table] = "path exists"
            << logic to load incremental(delta) data >>
        else:
            check_results[table] = "path not exists"
            << logic to load historical data"
            
for table, status in check_results.items():
    print(f"{table}---{status})

目前,我只有人员和位置路径可用。 person_int 和 location_int 不存在。 但我得到的输出为:

person_int---path not exists
location_int---path not exists
person---path not exists

正确的做法是什么

python apache-spark pyspark
1个回答
0
投票

您面临的第一个问题是

os.path.exists()
无法解释文件路径中的
*
等通配符。相反,您应该使用
glob.glob()
但正如您在评论中提到的那样,它对您不起作用,这让我们遇到了您的第二个问题。 第二个问题是由 data_path 中文件路径中的前导
/
引起的。这个前导斜杠通常表示类 Unix 系统中的根目录,这导致路径与实际结构不匹配。从路径开头删除
/
解决了问题。

您修改后的解决方案可能如下所示:

import glob

data_path = { 
    "person": "data/raw/person/*",
    "location": "data/raw/location/*",
    "person_int": "data/test/person_int/",
    "location_int": "data/test/location_int/*"
}

interim_tables = {
    "person": ['ID', 'NAME', 'DATE'],
    "person_int": ['ID', 'NAME', 'DATE'],
    "location_int": ['ID', 'LOCATION', 'DATE'],
}

check_results = {}

for table, cols in interim_tables.items():
    if table in data_path:
        path = data_path[table]
        
        # Use glob to handle wildcard paths
        matching_files = glob.glob(path)
        
        if matching_files:  # Check if any file exists for the given pattern
            check_results[table] = "path exists"
            # Logic to load incremental (delta) data
        else:
            check_results[table] = "path not exists"
            # Logic to load historical data
            
for table, status in check_results.items():
    print(f"{table}---{status}")

我通过为

person
创建虚拟路径来对此进行测试,并得到以下正确结果:

person---path exists 
person_int---path not exists 
location_int---path not exists
© www.soinside.com 2019 - 2024. All rights reserved.