我使用 fsspec,它使用 paramiko 的内置功能,但无法真正找到一种方法来对响应进行分页。
有没有办法在这里实现该功能?
用例就像每个目录都有 100000 个文件,我想在内存中单独列出所有这些是个坏主意。
有一个 sftp.listdir_iter 但我们在 fsspec 中有这个功能吗?
listdir_iter
将提供一种更直接的方法来实现分页,因为它返回一个迭代器,允许您一个接一个地检索项目。
import paramiko
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def listdir_paginated(self, path, page_size=1000, start_page=0):
if not self.isdir(path):
raise ValueError("Path should be a directory")
sftp = self.ftp
# Use Paramiko's listdir_iter method
items_iter = sftp.listdir_iter(path)
# Skip the items before the starting page
for _ in range(start_page * page_size):
try:
next(items_iter)
except StopIteration:
break
items = []
for i, item in enumerate(items_iter):
if i >= page_size:
break
items.append(item)
return items
listdir_attr
,它一次加载所有项目,然后对列表进行切片以获得所需的页面:这样会更快。
这意味着您可以尝试通过切片返回的 SFTPAttributes
对象列表来实现分页。例如:
import paramiko
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def listdir_paginated(self, path, page_size=1000, start_page=0):
if not self.isdir(path):
raise ValueError("Path should be a directory")
sftp = self.ftp
# Use Paramiko's listdir_attr method
items = sftp.listdir_attr(path)
# Slice the items list to implement pagination
start_index = start_page * page_size
end_index = start_index + page_size
paginated_items = items[start_index:end_index]
# Extract filenames from the SFTPAttributes objects
filenames = [item.filename for item in paginated_items]
return filenames
您可以将其用作:
fs = PaginatedSFTPFileSystem(host="your_host", username="your_username", password="your_password")
paginated_response = fs.listdir_paginated("/path/to/your/directory", page_size=1000, start_page=0)
listdir_iter
的方法稍微高效一些,因为它避免了一个一个地遍历项目。SFTPAttributes
对象。除非您有大量文件和有限的内存资源,否则这种内存开销可能不是问题。