将 pandas 数据框上传到 QuestDB 的有效方法

问题描述 投票:0回答:1

将 pandas 数据框上传到数据库的有效(快速)方法是什么?我正在遵循 https://questdb.io/docs/clients/ingest-python#basic-insert

中提供的示例
conf = f'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
    sender.dataframe(df, table_name='trades', at=TimestampNanos.now())

上传约 5M 行和 50 列、每列约 350 字节的数据帧大约需要 45 秒。我使用示例中引用的默认参数。我期待更快的加载性能。

database time-series questdb
1个回答
0
投票

与 QuestDB 的核心工程师交谈后,我有了一些见解,并设法在短短 8 秒内提取数据。

事实证明,即使 pandas 流是零拷贝的,由于 ILP 协议的限制,它也必须扩展为 UTF-8 才能传输,因此会传输比数据帧的绝对大小更多的数据。

将来,除了 ILP 之外,QuestDB 将拥有自己的二进制协议,但目前 ILP 将是我们最快的选择,因此我们需要一种解决方法来更快地获取数据。事实证明,当您有多个连接并行发送数据时,QuestDB 摄取数据的速度更快,因此这里的一个直接选择是将数据帧划分为块并从多个连接流式传输这些块。

我们可以做这样的事情:

  1. 将帧分成批次
  2. 定义发送函数
  3. 使用线程池并行发送批次
from collections import deque
from concurrent.futures import ThreadPoolExecutor


batches = deque()
for slice in np.array_split(df, df.size / batch_size):
    batches.append(slice)

def send_batch(conf_str, table_name, batches, timestamp_name):
    with Sender.from_conf(conf_str, auto_flush=False, init_buf_size=100000000) as qdb_sender:
        while True:
            try:
                slice = batches.pop()
            except IndexError:
                break
            qdb_sender.dataframe(slice, table_name=table_name, at=timestamp_name)
            qdb_sender.flush()


with ThreadPoolExecutor(max_workers=parallel) as executor:
    futures = []
    for _ in range(parallel):
       futures.append(executor.submit(send_batch, conf_str, table_name, batches, timestamp_name))
    for future in futures:
       future.result()

我们可以使用自动刷新而不是在每个批次后显式刷新,并且我们可以使用多处理而不是线程池,但这基本上就是这个想法。

注意:有人告诉我,如果数据采用 parquet 格式,则另一种途径将直接使用 QuestDB 的 parquet 功能,但在我的情况下,这不是一个选项,因为数据来自外部源,转换为 parquet 会产生额外的开销。

© www.soinside.com 2019 - 2024. All rights reserved.