我已经在VS代码中安装了ADL扩展,现在我正在编写一个Python脚本,我需要读取Azure Data Lake Storage(ADLS Gen1)中的csv文件。对于本地文件,以下代码正在运行:
df = pd.read_csv(Path('C:\\Users\\Documents\\breslow.csv'))
print (df)
我如何从ADLS读取数据?即使在成功安装和连接(使用我的Azure帐户)ADL扩展后,我仍然需要创建范围和秘密吗?
我尝试编写一个示例代码,将Azure Data Lake中的csv文件中的数据读取到pandas中的数据框中。
这是我的示例代码,如下所示。
from azure.datalake.store import core, lib, multithread
import pandas as pd
tenant_id = '<your Azure AD tenant id>'
username = '<your username in AAD>'
password = '<your password>'
store_name = '<your ADL name>'
token = lib.auth(tenant_id, username, password)
# Or you can register an app to get client_id and client_secret to get token
# If you want to apply this code in your application, I recommended to do the authentication by client
# client_id = '<client id of your app registered in Azure AD, like xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx'
# client_secret = '<your client secret>'
# token = lib.auth(tenant_id, client_id=client_id, client_secret=client_secret)
adl = core.AzureDLFileSystem(token, store_name=store_name)
f = adl.open('<your csv file path, such as data/test.csv in my ADL>')
df = pd.read_csv(f)
注意:如果您使用client_id
和client_secret
进行身份验证,则必须为至少在Azure AD中具有Reader
角色的应用程序添加必要的访问权限,如下图所示。有关访问安全性的更多信息,请参阅官方文档Security in Azure Data Lake Storage Gen1
。同时,关于如何在Azure AD中注册应用程序,您可以参考我对其他SO线程How to get an AzureRateCard with Java?的回答。
如有任何疑虑,请随时告诉我。
以下是从ADLS读取csv文件的示例代码。
# -*- coding: utf-8 -*-
"""
Created on Wed Mar 20 11:37:19 2019
@author: Mohit Verma
"""
from azure.datalake.store import core, lib, multithread
token = lib.auth(tenant_id, username, password)
adl = core.AzureDLFileSystem(token, store_name=store_name)
# typical operations
adl.ls('')
adl.ls('tmp/', detail=True)
adl.ls('tmp/', detail=True, invalidate_cache=True)
adl.cat('littlefile')
adl.head('gdelt20150827.csv')
# file-like object
with adl.open('gdelt20150827.csv', blocksize=2**20) as f:
print(f.readline())
print(f.readline())
print(f.readline())
# could have passed f to any function requiring a file object:
# pandas.read_csv(f)
with adl.open('anewfile', 'wb') as f:
# data is written on flush/close, or when buffer is bigger than
# blocksize
f.write(b'important data')
adl.du('anewfile')
# recursively download the whole directory tree with 5 threads and
# 16MB chunks
multithread.ADLDownloader(adl, "", 'my_temp_dir', 5, 2**24)
请尝试此代码并查看是否有帮助。有关Azure Data Lake的其他示例,请参阅以下github repo。
https://github.com/Azure/azure-data-lake-store-python/tree/master/azure
此外,如果您想了解ADLS中的不同类型的身份验证,请检查以下代码库。
https://github.com/Azure-Samples/data-lake-analytics-python-auth-options/blob/master/sample.py