我正在尝试从url中读取巨大的csv.gz文件成块,然后将其动态写入数据库。我必须在内存中进行所有操作,磁盘上不能存在任何数据。
我具有下面的生成器函数,该函数将响应块生成为Dataframe对象。
它使用请求的response.raw作为pd.read_csv函数的输入,但它看起来不可靠,有时会引发超时错误:urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(10054, \'WSAECONNRESET\')",)', OSError("(10054, 'WSAECONNRESET')",))
response = session.get(target, stream=True)
df_it = pd.read_csv(response.raw, compression='gzip', chunksize=10**6,
header=None, dtype=str, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
if df.empty:
continue
if (i % 10) == 0:
time.sleep(10)
yield df
我决定改用iter_content,因为我认为它应该更可靠。我已经实现了以下功能,但出现此错误:EOFError: Compressed file ended before the end-of-stream marker was reached
。
[我认为这与我传递压缩的Bytes对象(?)有关,但我不确定如何传递pandas.read_csv它会接受的对象。
response = session.get(target, stream=True)
for chunk in response.iter_content(chunk_size=10**6):
file_obj = io.BytesIO()
file_obj.write(chunk)
file_obj.seek(0)
df_it = pd.read_csv(file_obj, compression='gzip', dtype=str,
header=None, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
if df.empty:
continue
if (i % 10) == 0:
time.sleep(10)
yield df
任何想法都非常感谢!
谢谢
您不妨尝试一下:
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
"""
Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
input stream.
The stream implements Python 3's newer I/O API (available in Python 2's io module).
For efficiency, the stream is buffered.
"""
class IterStream(io.RawIOBase):
def __init__(self):
self.leftover = None
def readable(self):
return True
def readinto(self, b):
try:
l = len(b) # We're supposed to return at most this much
chunk = self.leftover or next(iterable)
output, self.leftover = chunk[:l], chunk[l:]
b[:len(output)] = output
return len(output)
except StopIteration:
return 0 # indicate EOF
return io.BufferedReader(IterStream(), buffer_size=buffer_size)
然后
response = session.get(target, stream=True)
response.raw.decode_content = decode
df = pd.read_csv(iterable_to_stream(response.iter_content()), sep=';')
我用它来流odsclient
中的csv文件。尽管我没有尝试gz压缩,但它似乎可以正常工作。
来源:odsclient