某些后端端点以八位字节流的形式返回镶木地板文件。
在 Pandas 中我可以做这样的事情:
result = requests.get("https://..../file.parquet")
df = pd.read_parquet(io.BytesIO(result.content))
我可以在 Dask 中做这件事吗?
此代码:
dd.read_parquet("https://..../file.parquet")
引发异常(显然,因为这是类似字节的对象):
File "to_parquet_dask.py", line 153, in <module>
main(*parser.parse_args())
File "to_parquet_dask.py", line 137, in main
download_parquet(
File "to_parquet_dask.py", line 121, in download_parquet
dd.read_parquet(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 313, in read_parquet
read_metadata_result = engine.read_metadata(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 733, in read_metadata
parts, pf, gather_statistics, base_path = _determine_pf_parts(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 148, in _determine_pf_parts
elif fs.isdir(paths[0]):
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 88, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 69, in sync
raise result[0]
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
result[0] = await coro
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 418, in _isdir
return bool(await self._ls(path))
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 195, in _ls
out = await self._ls_real(url, detail=detail, **kwargs)
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 150, in _ls_real
text = await r.text()
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 1082, in text
return self._body.decode(encoding, errors=errors) # type: ignore
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x90 in position 7: invalid start byte
UPD
随着 @mdurant 答案中 fsspec 的更改,我收到此错误:
ValueError: Cannot seek streaming HTTP file
所以我将“simplecache::”添加到我的网址中,接下来我面临:
Traceback (most recent call last):
File "to_parquet_dask.py", line 161, in <module>
main(*parser.parse_args())
File "to_parquet_dask.py", line 145, in main
download_parquet(
File "to_parquet_dask.py", line 128, in download_parquet
dd.read_parquet(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 313, in read_parquet
read_metadata_result = engine.read_metadata(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 733, in read_metadata
parts, pf, gather_statistics, base_path = _determine_pf_parts(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 185, in _determine_pf_parts
pf = ParquetFile(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fastparquet/api.py", line 127, in __init__
raise ValueError("Opening directories without a _metadata requires"
ValueError: Opening directories without a _metadata requiresa filesystem compatible with fsspec
临时解决方法
也许这种方式很肮脏并且不是最佳的,但是某种方法是有效的:
@dask.delayed
def parquet_from_http(url, token):
result = requests.get(
url,
headers={'Authorization': token}
)
return pd.read_parquet(io.BytesIO(result.content))
delayed_download = parquet_from_http(url, token)
df = dd.from_delayed(delayed_download, meta=meta)
附注这种方法中的meta参数是必要的,因为否则Dask将使用该函数两次:查找meta并进行计算,因此将发出两个请求。
这不是答案,但我相信 fsspec 中的以下更改将解决您的问题。如果您愿意尝试并确认,我们可以将其打造成补丁。
--- a/fsspec/implementations/http.py
+++ b/fsspec/implementations/http.py
@@ -472,7 +472,10 @@ class HTTPFileSystem(AsyncFileSystem):
async def _isdir(self, path):
# override, since all URLs are (also) files
- return bool(await self._ls(path))
+ try:
+ return bool(await self._ls(path))
+ except (FileNotFoundError, ValueError):
+ return False
(我们可以将其放在一个分支中,如果这样可以让您更轻松地安装)
-编辑-
第二个问题(在两个镶木地板引擎中都是相同的)源于服务器不提供文件的大小,或者不允许范围获取。 parquet 格式需要随机访问数据才能读取。解决这个问题的唯一方法(无需改进服务器)是在本地复制整个文件,例如,通过在 URL 前面添加“simplecache::”。