Python请求:将iter_content块传输到熊猫read_csv函数中

问题描述 投票:2回答:1

我正在尝试从url中读取巨大的csv.gz文件成块,然后将其动态写入数据库。我必须在内存中进行所有操作,磁盘上不能存在任何数据。

我具有下面的生成器函数,该函数将响应块生成为Dataframe对象。

它使用请求的response.raw作为pd.read_csv函数的输入,但它看起来不可靠,有时会引发超时错误:urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(10054, \'WSAECONNRESET\')",)', OSError("(10054, 'WSAECONNRESET')",))

response = session.get(target, stream=True)
df_it = pd.read_csv(response.raw, compression='gzip', chunksize=10**6, 
                    header=None, dtype=str, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
    if df.empty:
        continue
    if (i % 10) == 0:
        time.sleep(10)
    yield df

我决定改用iter_content,因为我认为它应该更可靠。我已经实现了以下功能,但出现此错误:EOFError: Compressed file ended before the end-of-stream marker was reached

[我认为这与我传递压缩的Bytes对象(?)有关,但我不确定如何传递pandas.read_csv它会接受的对象。

response = session.get(target, stream=True)
for chunk in response.iter_content(chunk_size=10**6):
    file_obj = io.BytesIO()
    file_obj.write(chunk)
    file_obj.seek(0)
    df_it = pd.read_csv(file_obj, compression='gzip', dtype=str,
                        header=None, names=columns, parse_dates=['datetime'])
    for i, df in enumerate(self.process_df(df_it)):
        if df.empty:
            continue
        if (i % 10) == 0:
            time.sleep(10)
        yield df

任何想法都非常感谢!

谢谢

python stream python-requests compression bytestream
1个回答
0
投票

您不妨尝试一下:

def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
        def readable(self):
            return True
        def readinto(self, b):
            try:
                l = len(b)  # We're supposed to return at most this much
                chunk = self.leftover or next(iterable)
                output, self.leftover = chunk[:l], chunk[l:]
                b[:len(output)] = output
                return len(output)
            except StopIteration:
                return 0    # indicate EOF
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)

然后

response = session.get(target, stream=True)
response.raw.decode_content = decode
df = pd.read_csv(iterable_to_stream(response.iter_content()), sep=';')

我用它来流odsclient中的csv文件。尽管我没有尝试gz压缩,但它似乎可以正常工作。

来源:odsclient

© www.soinside.com 2019 - 2024. All rights reserved.