问题用例很简单。
我想通过 REST API 从 Salesforce 查询超过 50M 条记录。从多个来源收集数据并将其融合在一起是工作的一部分,这些数据可以输入到机器学习模型中。 我想将这些数据存储到数据库或镶木地板文件中,以便我可以执行后续步骤。
现在,Salesforce 对一个对象的如此大的数据大小进行 REST API 查询,将返回类似的内容
{"totalsize":50000000, "done":false,
"nextrecordUrl":"/services/data/v49.0/a1000xxxxxx-2000",
"records":[here the data of query remains of the 2000 batch that was returned]}
nextRecordUrl 简单给出了查询下一批 od 2000 的 url(salesforce 可以返回的最大批量大小,但是对此数字没有保证)。 所以我简单地运行一个循环,直到不再有 nextRecordUrl 来获取整组数据。
当前的方法如下所示:
read the data = query(api)
while "done" is not True:
read the "records" part of the json
load it into Database ( using psycopg2 )
again read data = query( nextRecordUrl )
read value of "done"
我的问题是,如何加快此操作? 考虑到即使每个循环需要 2 秒才能完成,这最多仍需要 16-17 小时,并且我们可能需要每两周运行一次,以保持应用程序 ( salesforce ) 中反映的最新数据,更不用说数据库了超时错误或我必须处理的内存错误。
有什么建议吗?
编辑: 目前的方法是这样的:
sf = salesforce () # simple_salesforce library connection
df = pandas dataframe
sqlalc_engine = sqlalchemy engine
# query the first batch here then this loop start
while True:
if num_batches == 1:
current_df = df
else:
current_df = pd.DataFrame(lstRecords)
#print(current_df.columns)
current_df = current_df.drop(['attributes'], axis=1)
print(f"next record URL is :{nextRecordsUrl}, || batch no is : {num_batches}")
print(f"Loading batch : {num_batches} | records count : {current_df.shape[0]}")
#this line i am using to insert records into DB
current_df.to_sql(table_name, con=sqlalc_engine, schema='', if_exists='append', index=False)
print(f"{num_batches} is loaded to DB")
num_batches+=1
if completed:
#no more records
break
# get next batch of records
records = sf.query_more(nextRecordsUrl, identifier_is_url=True)
lstRecords = records.get('records')
nextRecordsUrl = records.get('nextRecordsUrl')
# set running check for next loop
completed = records.get('done')
在查询中添加 WHERE 子句,以免每次都获取所有内容。
SELECT Id FROM Account WHERE LastModifiedDate >= 2024-02-25T01:23:45T
(使用上次成功作业的开始时间戳。开始,而不是结束,以确保没有间隙!)。或者有特殊的 not-really-constants 让你写 WHERE LastModifiedDate >= LAST_WEEK
。
这应该会大大减少你的时间。数据多久更改一次?说5%?是否有任何其他过滤器可以应用,或者您确实需要一切。
另一件事是您正在使用同步 API(发送请求、等待、处理结果、发送下一个请求...)。有一个异步版本可能更适合:Bulk API 2.0。您发出请求并定期询问“完成了吗”。 SF 开始生成每个最多 10K 行的结果文件,您甚至可以在多个线程中提取它们(假设应用程序的其余部分可以很好地处理多线程,可能下一个瓶颈将是本地数据库中的锁定?)。
如果你真的想手工制作请求,有一些资源:https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough.htm但我会想要使用
simple salesforce
来完成繁重的工作吗?
另一个好处是批量 API 将允许您使用 PK 分块 进一步有效地将结果切割成文件。这意味着,如果给定的 100-250K 记录“块”中只有 5 个满足您的查询条件,则该块的结果文件将只有 5 个,而不是您指定为批量大小的 10K。但是嘿,生成速度会更快。