Python 中创建 Parquet 文件的最常见方法是首先创建 Pandas 数据框,然后使用 pyarrow 将表写入 parquet。我担心这可能会导致内存使用量过大 - 因为它需要至少将数据集的一个完整副本存储在内存中才能创建 pandas 数据框。
我想知道是否由于列压缩要求而需要将整个数据集加载到内存中,或者是否有更高效且基于流的方法。就我而言,我将以流媒体方式接收记录。对于类似的 csv 输出过程,我们以 1000 为批次将行写入磁盘,因此需要在内存中保存的行数永远不会达到完整数据集的大小。
我应该...吗?:
想法? 有建议吗?
使用一些流友好的方式在我们收到它们时一次写入 1000 行左右,从而最大限度地减少整个过程中总的时间点 RAM 消耗。
你可以做到这一点。
(我没有看到任何有关如何执行此操作的文档,而且我不确定它是否是镶木地板的选项。)
至少现在 https://arrow.apache.org/docs/python/ generated/pyarrow.parquet.ParquetWriter.html 有一些关于如何执行此操作的文档 - 特别是
write_batch
函数
这是一个示例,尽管需要根据数据源进行一些调整。例如,如果已经“分块”,或者模式必须从数据中推断出来,而不是像这里那样进行硬编码。
该示例也主要通过 Pandas 进行,因为它是一种从行转换为列以创建每个 RecordBatch 的便捷方法,但是还有其他不需要 pandas 的创建每个 RecordBatch 的方法。
import itertools
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
# Any iterable that yields rows as tuples
def get_rows():
for i in range(0, 10000):
yield (1, 2)
# Chunk the rows into arrow batches
def get_batches(rows_iterable, chunk_size, schema):
rows_it = iter(rows_iterable)
while True:
batch = pa.RecordBatch.from_pandas(
pd.DataFrame(itertools.islice(rows_it, chunk_size), columns=schema.names),
schema=schema, preserve_index=False,
)
if not batch:
break
yield batch
# Could be inferred from data, but note that the schema has to be
# known when creating the ParquetWriter object
schema = pa.schema([
('a', pa.int32()),
('b', pa.int32()),
])
rows = get_rows()
batches = get_batches(rows, chunk_size=1000, schema=schema)
# Write the batches
with pq.ParquetWriter('example.parquet', schema=schema) as writer:
for batch in batches:
writer.write_batch(batch)
here了解它们的含义,但简短的版本是柱状数据仅限于多行的块,并且每个块可以单独附加到文件中。 您可以使用 PyArrow 为传入数据流实现此功能。
首先,我们需要定义一个适配器类,用于在 Python 的 IO 类风格和 app-iter 风格之间调整字节:
"""Circular bytes buffer to adapt iterator vs. BytesIO approach when streaming from a web server."""
from io import BytesIO
class CircularBytesBuffer:
"""BytesIO mock up with a circular buffer.
- This is used as an adapter between Python's WSGI web server `response.app_iter`
and Python file writer (BytesIO) model.
- Buffer can be flushed partially, so we do not buffer everything in the memory at once,
but we stream it incrementally over the web server response.
Implement enough methods that we can satisfy ParquetWriter.
- https://docs.python.org/3/library/io.html#io.IOBase
- https://stackoverflow.com/questions/38256689/how-do-i-validate-an-implementation-of-pythons-iobase
"""
def __init__(self):
self.buffer = BytesIO()
self.written = 0 # Track total written bytes as we keep recreating the buffer
self.closed = False
def flush_buffer(self) -> bytes:
b = self.buffer.getvalue()
self.buffer.seek(0)
self.buffer.truncate(0)
return b
def write(self, b: bytes):
self.buffer.write(b)
self.written += len(b)
def seek(self, offset, whence=0):
raise NotImplementedError()
def read(self, size=-1):
raise NotImplementedError()
def tell(self):
return self.written
def close(self):
self.closed = True
然后我们定义
response.app_iter
传递的生成器函数来传输回复
max_bytes
计数器来跟踪我们已流式传输到响应的字节数
def _generate_liquidity_candles_parquet(
query_generator: Iterable[tuple],
max_bytes: int,
dbession: Session,
):
"""Stream Parquet Uniswap v3 CLMM candles."""
# Uniswap v3 liquidity dats format for CLMM
schema = pa.schema([
("pair_id", pa.int32()),
("bucket", pa.timestamp("s")),
("open_tick", pa.uint32()),
("close_tick", pa.uint32()),
("high_tick", pa.uint32()),
("low_tick", pa.uint32()),
("current_liquidity", pa.decimal256(76)),
("net_amount0", pa.decimal256(76)),
("net_amount1", pa.decimal256(76)),
("in_amount0", pa.decimal256(76)),
("in_amount1", pa.decimal256(76)),
])
try:
out = CircularBytesBuffer()
with pq.ParquetWriter(out, schema=schema) as writer:
for exchange_type, sort_key, query in query_generator:
batch = pa.RecordBatch.from_pandas(
pd.DataFrame(
query.yield_per(1000),
columns=schema.names
),
schema=schema,
preserve_index=False,
)
# Write bytes to our circular buffer
# and then flush them using yield
writer.write_batch(batch)
buffered_bytes = out.flush_buffer()
yield buffered_bytes
# This will trash the Parquet file format,
# but we do not have any other way to signal that the Parquet
# writing was forcefully aborted, because we have already sent
# the HTTP headers. So we just append some thrash at the end of the file.
if out.written > max_bytes:
yield f"Parquet writing forcefully aborted. max_bytes reached: {out.written}".encode("utf-8")
break
# Flush whatever bytes ParquetWriter writes on close
buffered_bytes = out.flush_buffer()
yield buffered_bytes
finally:
dbession.close()
端点:
@noindex_view_config(route_name="web_liquidity_candles")
def web_liquidity_candles(request: Request):
"""Stream liquidity data from the server.
DoS vectors
- Limit number of pairs user can query
- Limit number of bytes the endpoint can stream
"""
dbession = request.oracle_dbsession
params = request.params
try:
# We protect the server against DoS using the max bytes value,
# which tells how many bytes this endpoint can stream.
# Max bytes for the iterator can come from the client,
# but we also need to have a server-side maximum so that clients cannot
# simply bypass it.
max_bytes = request.params.get("max_bytes", "750_000_000")
try:
max_bytes = int(max_bytes)
except ValueError:
raise CandleLookupError(f"Bad max_bytes: {max_bytes}")
format = request.params.get("format", "jsonl").lower()
if format not in ("jsonl", "parquet"):
raise CandleLookupError(f"Unknown response format: {foramt}")
# Don't allow exceed this configuration value ever,
# or the client to request more
abs_max_bytes_limit = 1_000_000_000
if max_bytes > abs_max_bytes_limit:
raise CandleLookupError(f"max_bytes too high: {max_bytes}, limit {abs_max_bytes_limit}")
# Create SQLAlchemy query
query_generator = fetch_liquidity_candle_stream(
dbession,
pair_ids=params.get("pair_ids"),
time_bucket=params.get("time_bucket"),
start=params.get("start"),
end=params.get("end"),
)
except CandleLookupError as e:
json_body = {"error_id": "CandleLookupError", "message": str(e)}
return exception_response(422, json_body=json_body)
if format == "jsonl":
# JSONL streaming header
# https://stackoverflow.com/a/59998781/315168
# https://github.com/Pylons/pyramid_tm/issues/56#issuecomment-285510721
r = Response(content_type="application/jsonl+json; charset=UTF-8")
else:
# Parquet mimetype
# https://stackoverflow.com/questions/78129486/what-mime-media-type-content-type-should-be-used-for-apache-parquet-files
r = Response(content_type="application/vnd.apache.parquet")
# Signal Caddy reverse proxy that
# this is a streaming response and it should not
# attempt to buffer the response
r.content_length = -1
# Iterable responses do not close database handlers automatically?
# TODO: Ensure we are not leaking connections
# https://github.com/Pylons/pyramid_tm/issues/56#issuecomment-285510721
_generate_liquidity_candles_jsonl.close = lambda: dbession.close()
_generate_liquidity_candles_parquet.close = lambda: dbession.close()
if format == "jsonl":
r.app_iter = _generate_liquidity_candles_jsonl(query_generator, max_bytes, dbession)
else:
r.app_iter = _generate_liquidity_candles_parquet(query_generator, max_bytes, dbession)
return r