以节省内存的方式从 python 中的流创建 Parquet 文件

问题描述 投票:0回答:3

Python 中创建 Parquet 文件的最常见方法是首先创建 Pandas 数据框,然后使用 pyarrow 将表写入 parquet。我担心这可能会导致内存使用量过大 - 因为它需要至少将数据集的一个完整副本存储在内存中才能创建 pandas 数据框。

我想知道是否由于列压缩要求而需要将整个数据集加载到内存中,或者是否有更高效且基于流的方法。就我而言,我将以流媒体方式接收记录。对于类似的 csv 输出过程,我们以 1000 为批次将行写入磁盘,因此需要在内存中保存的行数永远不会达到完整数据集的大小。

我应该...吗?:

  1. 只需创建一个 pandas 数据框,然后将其写入镶木地板。 (这意味着整个数据集需要存储在内存中,但我们将此视为必要要求。)
  2. 使用一些流友好的方式在我们收到它们时一次写入 1000 行左右,从而最大限度地减少整个过程中总的时间点 RAM 消耗。 (我没有看到任何有关如何执行此操作的文档,而且我不确定它是否是镶木地板的选项。)
  3. 将所有内容写入 CSV,然后使用智能读取/分析 CSV 内容并在事后创建压缩镶木地板的函数。 (运行时间可能较慢,但内存配置文件较低,并且在非常大的文件上失败的机会较低。)

想法? 有建议吗?

python parquet pyarrow fastparquet
3个回答
11
投票

使用一些流友好的方式在我们收到它们时一次写入 1000 行左右,从而最大限度地减少整个过程中总的时间点 RAM 消耗。

你可以做到这一点。

(我没有看到任何有关如何执行此操作的文档,而且我不确定它是否是镶木地板的选项。)

至少现在 https://arrow.apache.org/docs/python/ generated/pyarrow.parquet.ParquetWriter.html 有一些关于如何执行此操作的文档 - 特别是

write_batch
函数


这是一个示例,尽管需要根据数据源进行一些调整。例如,如果已经“分块”,或者模式必须从数据中推断出来,而不是像这里那样进行硬编码。

该示例也主要通过 Pandas 进行,因为它是一种从行转换为列以创建每个 RecordBatch 的便捷方法,但是还有其他不需要 pandas 的创建每个 RecordBatch 的方法。

import itertools import pandas as pd import pyarrow as pa import pyarrow.parquet as pq # Any iterable that yields rows as tuples def get_rows(): for i in range(0, 10000): yield (1, 2) # Chunk the rows into arrow batches def get_batches(rows_iterable, chunk_size, schema): rows_it = iter(rows_iterable) while True: batch = pa.RecordBatch.from_pandas( pd.DataFrame(itertools.islice(rows_it, chunk_size), columns=schema.names), schema=schema, preserve_index=False, ) if not batch: break yield batch # Could be inferred from data, but note that the schema has to be # known when creating the ParquetWriter object schema = pa.schema([ ('a', pa.int32()), ('b', pa.int32()), ]) rows = get_rows() batches = get_batches(rows, chunk_size=1000, schema=schema) # Write the batches with pq.ParquetWriter('example.parquet', schema=schema) as writer: for batch in batches: writer.write_batch(batch)
    


0
投票
我有同样的问题,但我需要从 Python Web 服务器流式传输 Parquet 写入。如果您需要与 Python Web 服务器结合传输 Parquet 响应,则需要执行一些额外的步骤。这是基于上面 Michal 的良好答案。

    Python Web 服务器使用 WSGI 标准
  • 对于流式响应,WSGI 使用
  • app_iter 协议,它不是类似文件的对象
尤其

    我们希望避免Web服务器将整个Parquet文件缓冲在内存中,然后将其写入HTTP响应,对于大响应,这将导致内存不足和超时错误
下面是从我们的生产代码中获取的答案,该代码从 SQLAlchemy 输入生成流式 Parquet HTTP 响应。

首先,我们需要定义一个适配器类,用于在 Python 的 IO 类风格和 app-iter 风格之间调整字节:

"""Circular bytes buffer to adapt iterator vs. BytesIO approach when streaming from a web server.""" from io import BytesIO class CircularBytesBuffer: """BytesIO mock up with a circular buffer. - This is used as an adapter between Python's WSGI web server `response.app_iter` and Python file writer (BytesIO) model. - Buffer can be flushed partially, so we do not buffer everything in the memory at once, but we stream it incrementally over the web server response. Implement enough methods that we can satisfy ParquetWriter. - https://docs.python.org/3/library/io.html#io.IOBase - https://stackoverflow.com/questions/38256689/how-do-i-validate-an-implementation-of-pythons-iobase """ def __init__(self): self.buffer = BytesIO() self.written = 0 # Track total written bytes as we keep recreating the buffer self.closed = False def flush_buffer(self) -> bytes: b = self.buffer.getvalue() self.buffer.seek(0) self.buffer.truncate(0) return b def write(self, b: bytes): self.buffer.write(b) self.written += len(b) def seek(self, offset, whence=0): raise NotImplementedError() def read(self, size=-1): raise NotImplementedError() def tell(self): return self.written def close(self): self.closed = True
然后我们定义

    我们的 Parquet 端点 - 这里我们使用
  • Pyramid Web 服务器,但可以是任何 Python Web 服务器
  • 我们通过
  • response.app_iter
     传递的生成器函数来传输回复
  • 必须向任何保留代理或其他中间件发出信号,表明这是流式响应,并且它们不应尝试缓冲它
  • 我们需要定义流媒体的最大限制,以便对拒绝服务攻击进行一定的控制,我们使用
  • max_bytes
     计数器来跟踪我们已流式传输到响应的字节数
  • 我们还有一个逻辑将格式更改为 JSONL 而不是 Parquet,以便基于 JavaScript 的 Web 前端可以读取相同的数据
发电机:

def _generate_liquidity_candles_parquet( query_generator: Iterable[tuple], max_bytes: int, dbession: Session, ): """Stream Parquet Uniswap v3 CLMM candles.""" # Uniswap v3 liquidity dats format for CLMM schema = pa.schema([ ("pair_id", pa.int32()), ("bucket", pa.timestamp("s")), ("open_tick", pa.uint32()), ("close_tick", pa.uint32()), ("high_tick", pa.uint32()), ("low_tick", pa.uint32()), ("current_liquidity", pa.decimal256(76)), ("net_amount0", pa.decimal256(76)), ("net_amount1", pa.decimal256(76)), ("in_amount0", pa.decimal256(76)), ("in_amount1", pa.decimal256(76)), ]) try: out = CircularBytesBuffer() with pq.ParquetWriter(out, schema=schema) as writer: for exchange_type, sort_key, query in query_generator: batch = pa.RecordBatch.from_pandas( pd.DataFrame( query.yield_per(1000), columns=schema.names ), schema=schema, preserve_index=False, ) # Write bytes to our circular buffer # and then flush them using yield writer.write_batch(batch) buffered_bytes = out.flush_buffer() yield buffered_bytes # This will trash the Parquet file format, # but we do not have any other way to signal that the Parquet # writing was forcefully aborted, because we have already sent # the HTTP headers. So we just append some thrash at the end of the file. if out.written > max_bytes: yield f"Parquet writing forcefully aborted. max_bytes reached: {out.written}".encode("utf-8") break # Flush whatever bytes ParquetWriter writes on close buffered_bytes = out.flush_buffer() yield buffered_bytes finally: dbession.close()
端点:

@noindex_view_config(route_name="web_liquidity_candles") def web_liquidity_candles(request: Request): """Stream liquidity data from the server. DoS vectors - Limit number of pairs user can query - Limit number of bytes the endpoint can stream """ dbession = request.oracle_dbsession params = request.params try: # We protect the server against DoS using the max bytes value, # which tells how many bytes this endpoint can stream. # Max bytes for the iterator can come from the client, # but we also need to have a server-side maximum so that clients cannot # simply bypass it. max_bytes = request.params.get("max_bytes", "750_000_000") try: max_bytes = int(max_bytes) except ValueError: raise CandleLookupError(f"Bad max_bytes: {max_bytes}") format = request.params.get("format", "jsonl").lower() if format not in ("jsonl", "parquet"): raise CandleLookupError(f"Unknown response format: {foramt}") # Don't allow exceed this configuration value ever, # or the client to request more abs_max_bytes_limit = 1_000_000_000 if max_bytes > abs_max_bytes_limit: raise CandleLookupError(f"max_bytes too high: {max_bytes}, limit {abs_max_bytes_limit}") # Create SQLAlchemy query query_generator = fetch_liquidity_candle_stream( dbession, pair_ids=params.get("pair_ids"), time_bucket=params.get("time_bucket"), start=params.get("start"), end=params.get("end"), ) except CandleLookupError as e: json_body = {"error_id": "CandleLookupError", "message": str(e)} return exception_response(422, json_body=json_body) if format == "jsonl": # JSONL streaming header # https://stackoverflow.com/a/59998781/315168 # https://github.com/Pylons/pyramid_tm/issues/56#issuecomment-285510721 r = Response(content_type="application/jsonl+json; charset=UTF-8") else: # Parquet mimetype # https://stackoverflow.com/questions/78129486/what-mime-media-type-content-type-should-be-used-for-apache-parquet-files r = Response(content_type="application/vnd.apache.parquet") # Signal Caddy reverse proxy that # this is a streaming response and it should not # attempt to buffer the response r.content_length = -1 # Iterable responses do not close database handlers automatically? # TODO: Ensure we are not leaking connections # https://github.com/Pylons/pyramid_tm/issues/56#issuecomment-285510721 _generate_liquidity_candles_jsonl.close = lambda: dbession.close() _generate_liquidity_candles_parquet.close = lambda: dbession.close() if format == "jsonl": r.app_iter = _generate_liquidity_candles_jsonl(query_generator, max_bytes, dbession) else: r.app_iter = _generate_liquidity_candles_parquet(query_generator, max_bytes, dbession) return r
    
© www.soinside.com 2019 - 2024. All rights reserved.