我有以下功能:
async def pre_processing_list_sic_codes(
self,
start_date: datetime,
final_date: datetime,
sic_codes: List[str]
) -> Any:
batch_length = 80
batches = [sic_codes[i : i + batch_length] for i in range(0, len(sic_codes), batch_length)]
task = []
for batch in batches:
task.append(
self.processing(
sic_codes=batch,
start_date=start_date,
final_date=final_date,
)
)
# here
然后,任务列表中包含了所有需要执行的任务。我通常做的事情很简单:
await gather(*task)
但是,这些任务涉及数据库操作,并且由于线程太多,我达到了连接限制。我的想法是将这些任务分组(例如 3 个或 4 个),以便任务在组内串行执行,同时组内异步执行。我该如何实施?
从你的问题来看,我并不完全清楚,但是我假设你首先向数据库发送一批 SIC 代码,并且你希望同时运行这些批次。
有多种方法可以解决这个问题,这些方法显然可能会根据我的假设不正确的地方而改变。我一直使用这种方法来批量处理异步处理组。下面是在不超过某个 db_concurrency 阈值的情况下编排批次处理的快速尝试。
import datetime
import asyncio
from typing import Any, List
SIC_CODES = ["1110","1120","1130","1140","1150","1160","1190","1210","1220","1230","1240","1250","1260","1270","1280","1290","1300","1410","1420","1430","1440","1450","1460","1470","1490","1500","1610","1621","1629","1630","1640","1700","2100","2200","2300","2400","3110","3120","3210","3220","5101","5102","5200","6100","6200","7100","7210","7290","8110","8120","8910","8920","8930","8990","9100","9900","10110","10120","10130","10200","10310","10320","10390","10410","10420","10511","10512","10519","10520","10611","10612","10620","10710","10720","10730","10810","10821","10822","10831","10832","10840","10850","10860","10890","10910","10920","11010","11020","11030","11040","11050","11060","11070","12000","13100","13200","13300","13910","13921","13922","13923","13931","13939","13940","13950","13960","13990","14110","14120","14131","14132","14141","14142","14190","14200","14310","14390","15110","15120","15200","16100","16210","16220","16230","16240","16290","17110","17120","17211","17219","17220","17230","17240","17290","18110","18121","18129","18130","18140","18201","18202","18203","19100","19201","19209","20110","20120","20130","20140","20150","20160","20170","20200","20301","20302","20411","20412","20420","20510","20520","20530","20590","20600","21100","21200","22110","22190","22210","22220","22230","22290","23110","23120","23130","23140","23190","23200","23310","23320","23410","23420","23430","23440","23490","23510","23520","23610","23620","23630","23640","23650","23690","23700","23910","23990","24100","24200","24310","24320","24330","24340","24410","24420","24430","24440","24450","24460","24510","24520","24530","24540","25110","25120","25210","25290","25300","25400","25500","25610","25620","25710","25720","25730","25910","25920","25930","25940","25990","26110","26120","26200","26301","26309","26400","26511","26512","26513","26514","26520","26600","26701","26702","26800","27110","27120","27200","27310","27320","27330","27400","27510","27520","27900","28110","28120","28131","28132","28140","28150","28210","28220","28230","28240","28250","28290","28301","28302","28410","28490","28910","28921","28922","28923","28930","28940","28950","28960","28990","29100","29201","29202","29203","29310","29320","30110","30120","30200","30300","30400","30910","30920","30990","31010","31020","31030","31090","32110","32120","32130","32200","32300","32401","32409","32500","32910","32990","33110","33120","33130","33140","33150","33160","33170","33190","33200","35110","35120","35130","35140","35210","35220","35230","35300","36000","37000","38110","38120","38210","38220","38310","38320","39000","41100","41201","41202","42110","42120","42130","42210","42220","42910","42990","43110","43120","43130","43210","43220","43290","43310","43320","43330","43341","43342","43390","43910","43991","43999","45111","45112","45190","45200","45310","45320","45400","46110","46120","46130","46140","46150","46160","46170","46180","46190","46210","46220","46230","46240","46310","46320","46330","46341","46342","46350","46360","46370","46380","46390","46410","46420","46431","46439","46440","46450","46460","46470","46480","46491","46499","46510","46520","46610","46620","46630","46640","46650","46660","46690","46711","46719","46720","46730","46740","46750","46760","46770","46900","47110","47190","47210","47220","47230","47240","47250","47260","47290","47300","47410","47421","47429","47430","47510","47520","47530","47540","47591","47599","47610","47620","47630","47640","47650","47710","47721","47722","47730","47741","47749","47750","47760","47770","47781","47782","47789","47791","47799","47810","47820","47890","47910","47990","49100","49200","49311","49319","49320","49390","49410","49420","49500","50100","50200","50300","50400","51101","51102","51210","51220","52101","52102","52103","52211","52212","52213","52219","52220","52230","52241","52242","52243","52290","53100","53201","53202","55100","55201","55202","55209","55300","55900","56101","56102","56103","56210","56290","56301","56302","58110","58120","58130","58141","58142","58190","58210","58290","59111","59112","59113","59120","59131","59132","59133","59140","59200","60100","60200","61100","61200","61300","61900","62011","62012","62020","62030","62090","63110","63120","63910","63990","64110","64191","64192","64201","64202","64203","64204","64205","64209","64301","64302","64303","64304","64305","64306","64910","64921","64922","64929","64991","64992","64999","65110","65120","65201","65202","65300","66110","66120","66190","66210","66220","66290","66300","68100","68201","68202","68209","68310","68320","69101","69102","69109","69201","69202","69203","70100","70210","70221","70229","71111","71112","71121","71122","71129","71200","72110","72190","72200","73110","73120","73200","74100","74201","74202","74203","74209","74300","74901","74902","74909","74990","75000","77110","77120","77210","77220","77291","77299","77310","77320","77330","77341","77342","77351","77352","77390","77400","78101","78109","78200","78300","79110","79120","79901","79909","80100","80200","80300","81100","81210","81221","81222","81223","81229","81291","81299","81300","82110","82190","82200","82301","82302","82911","82912","82920","82990","84110","84120","84130","84210","84220","84230","84240","84250","84300","85100","85200","85310","85320","85410","85421","85422","85510","85520","85530","85590","85600","86101","86102","86210","86220","86230","86900","87100","87200","87300","87900","88100","88910","88990","90010","90020","90030","90040","91011","91012","91020","91030","91040","92000","93110","93120","93130","93191","93199","93210","93290","94110","94120","94200","94910","94920","94990","95110","95120","95210","95220","95230","95240","95250","95290","96010","96020","96030","96040","96090","97000","98000","98100","98200","99000","99999"]
sic_code_count = 0
async def processing(sic_codes: List[str], start_date: datetime, final_date: datetime) -> Any:
""" Database processing goes here
"""
global sic_code_count
sic_code_count += len(sic_codes)
async def pre_processing_list_sic_codes(
start_date: datetime,
final_date: datetime,
sic_codes: List[str]
) -> Any:
db_concurrency = 3
batch_length = 80
batches = [sic_codes[i: i + batch_length] for i in range(0, len(sic_codes), batch_length)]
batch_jobs = []
while len(batches) > 0:
batch_jobs.append(batches.pop(0))
if len(batch_jobs) == db_concurrency or len(batches) == 0:
tasks = [
asyncio.create_task(processing(
sic_codes=job,
start_date=start_date,
final_date=final_date)) for job in batch_jobs
]
await asyncio.wait(tasks)
batch_jobs = []
async def main():
await pre_processing_list_sic_codes(
start_date=datetime.datetime(2021, 1, 1),
final_date=datetime.datetime(2023, 8, 1),
sic_codes=SIC_CODES,
)
print(f"Total SIC Codes: {len(SIC_CODES)}")
print(f"Processed SIC Codes: {sic_code_count}")
if __name__ == "__main__":
asyncio.run(main())