我成功地与多处理并行运行 OpenAI GPT4o:
def llm_query(chunk):
context, query = get_prompt_synonyms()
input1, output1 = get_example()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": context},
{"role": "user", "content": f'Input data is {input1}' + ' ' + query},
{"role": "assistant", "content": output1},
{"role": "user", "content": f'Input data is {chunk}' + ' ' + query}
],
temperature=0,
max_tokens=4090
)
reply = response.choices[0].message.content
return reply
def description_from_llm(chunk):
reply = llm_query(chunk)
df_synonyms_codes = get_frame_llm(reply) # reply to dataframe
return df_synonyms_codes
if __name__ == '__main__':
# some stuff
with Pool(processes=cpu_count()) as pool:
freeze_support()
dfs_syn = pool.map(description_from_llm, list_chunks)
df_final = pd.concat(dfs_syn)
pool.close()
它(本地)运行得非常快,没有任何问题。然而,当我尝试使用 Anthropic Claude 3.5 做同样的事情时(我确保导入了所有需要的更新包并拥有有效的密钥等):
def llm_query(chunk, temperature=0, max_tokens=4096):
model = "claude-3-5-sonnet-20240620"
data = "Input data for analysis and enrichment: {x}".format(x=list_benefits_chunk)
context, query = get_query()
examples = get_few_shot_learning()
messages = get_messages(context, data, query, examples)
response = client.messages.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response
它不能正常工作:
TypeError: APIStatusError.__init__() missing 2 required keyword-only arguments: 'response' and 'body'
它循环工作:
df_all = pd.DataFrame()
for chunk in list_chunks:
df= llm_query(chunk)
df_all = pd.concat[df_all, df],axis=0)
但是太慢了!
有没有办法并行调用人类 API?或者其他可以减少时间 x7 - x10 的解决方案(就像 mp 对 GPT4o 所做的那样)?
我通常使用ThreadPoolExecutor。
最小示例:
from anthropic import Anthropic
from concurrent.futures import ThreadPoolExecutor
TEMPERATURE = 0.5
CLAUDE_SYSTEM_MESSAGE = "You are a helpful AI assistant."
anthropic_client = Anthropic(api_key=ANTHROPIC_API_KEY)
def call_anthropic(
prompt,
model_id="claude-3-haiku-20240307",
temperature=TEMPERATURE,
system=CLAUDE_SYSTEM_MESSAGE,
):
try:
message = anthropic_client.messages.create(
model=model_id,
temperature=temperature,
max_tokens=4096,
system=system,
messages=[
{
"role": "user",
"content": prompt,
}
],
)
return message.content[0].text
except Exception as e:
print(f"Error: {e}")
return None
BASE_PROMPT = "What is the capital of {country}?"
COUNTRIES = ["Switzerland", "Sweden", "Sri Lanka", "Spain"]
prompts = [BASE_PROMPT.format(country=country) for country in COUNTRIES]
with ThreadPoolExecutor(max_workers=4) as executor:
responses = list(executor.map(call_anthropic, prompts))
print(responses)
输出
['The capital of Switzerland is Bern.',
'The capital of Sweden is Stockholm.',
'The capital of Sri Lanka is Colombo.',
'The capital of Spain is Madrid.']