我有一个用例,其中包括将巨大的表从 Oracle 加载到 Snowflake。 Oracle 服务器距离 Snowflake 端点很远,因此当通过假脱机脚本或 cx_oracle 加载大于 12 GB 的表(实际上是视图)时,我们确实会遇到连接问题。
我正在考虑使用最多 4 个线程的 ThreadPoolExecutor 来测试,并使用 SessionPool。这样,我就可以为每个线程建立一个连接,这就是重点。因此,这意味着我必须为每个线程批量分配数据获取。 我的问题是:我怎样才能实现这一目标?这样做是否正确: “从 x 和 y 之间 rownum 的表中选择 *”(不是这个语法,我知道......但你明白我的意思),我应该依赖 OFFSET,......?
我的想法是每个线程都获得一个 select 的“切片”,批量获取数据并批量写入 csv,因为我宁愿有小文件而不是大文件,发送到雪花。
def query(start_off, pool):
start_conn = datetime.now()
con = pool.acquire()
end_conn = datetime.now()
print(f"Conn/Acquire time: {end_conn-start_conn}")
with con.cursor() as cur:
start_exec_ts = datetime.now()
cur.execute(QUERY, start_pos=start_off, end_pos=start_off+(OFFSET-1))
end_exec_ts = datetime.now()
rows = cur.fetchall()
end_fetch_ts = datetime.now()
total_exec_ts = end_exec_ts-start_exec_ts
total_fetch_ts = end_fetch_ts-end_exec_ts
print(f"Exec time : {total_exec_ts}")
print(f"Fetch time : {total_fetch_ts}")
print(f"Task executed {threading.current_thread().getName()}, {threading.get_ident()}")
return rows
def main():
pool = cx_Oracle.SessionPool(c.oracle_conn['oracle']['username'],
c.oracle_conn['oracle']['password'],
c.oracle_conn['oracle']['dsn'],
min=2, max=4, increment=1,
threaded=True,
getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT
)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(query, d, pool) for d in range(1,13,OFFSET)]
for future in as_completed(futures):
# process your records from each thread
print(repr(future.result()))
# process_records(future.result())
if __name__ == '__main__':
main()
另外,在查询函数中使用 fetchMany ,我怎样才能发回结果以便每次都可以处理它们?
如果你想通过python脚本传输数据
您可以创建一个生产者 -> 队列 -> 消费者工作流程来执行此操作
消费者依赖数据的ID
制作人
生产者获取数据的ID
将“一片 ID”作为作业放入队列
消费者
消费者从队列中获取作业
使用 ID 获取数据(例如“select * from table where id in ...”)
将数据保存到某处
示例
此类概念的快速示例
import time
import threading
import queue
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
@dataclass
class Job:
ids: list
jobs = queue.Queue()
executor = ThreadPoolExecutor(max_workers=4)
fake_data = [i for i in range(0, 200)]
def consumer():
try:
job = jobs.get_nowait()
# select * from table where id in job.ids
# save the data
time.sleep(5)
print(f"done {job}")
except Exception as exc:
print(exc)
def fake_select(limit, offset):
if offset >= len(fake_data):
return None
return fake_data[offset:(offset+limit)]
def producer():
limit = 10
offset = 0
stop_fetch = False
while not stop_fetch:
# select id from table limit l offset o
ids = fake_select(limit, offset)
if ids is None:
stop_fetch = True
else:
job = Job(ids=ids)
jobs.put(job)
print(f"put {job}")
offset += limit
executor.submit(consumer)
time.sleep(0.2)
def main():
th = threading.Thread(target=producer)
th.start()
th.join()
while not jobs.empty():
time.sleep(1)
executor.shutdown(wait=True)
print("all jobs done")
if __name__ == "__main__":
main()
此外,
如果你想在消费者获取数据后进行更多操作
您可以在消费者流程中执行此操作
或者添加另一个队列和消费者来执行额外的操作
工作流程就会变成这样
生产者 -> 队列 -> 获取并保存数据 消费者 -> 队列 -> 消费者做一些额外的操作
您能够解决上述用例吗?我已经找到了使用 cxOracle 会话池和 ThreadPoolExecutor 从 Oracle 中获取块数据并将其推送到 Snowflake 的方法。如果您已经计算出端到端的数据,请告诉我,因为我看到一些数据重复主要是由于不正确的会话处理或游标缓存造成的。