并行运行anthropic api

问题描述 投票:0回答:1

我成功地与多处理并行运行 OpenAI GPT4o:

def llm_query(chunk):
    context, query = get_prompt_synonyms()
    input1, output1 = get_example()
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": context},
            {"role": "user", "content": f'Input data is {input1}' + ' ' + query},
            {"role": "assistant", "content": output1},
            {"role": "user", "content": f'Input data is {chunk}' + ' ' + query}
        ],
        temperature=0,
        max_tokens=4090
    )
    reply = response.choices[0].message.content
    return reply

def description_from_llm(chunk):
    reply = llm_query(chunk)
    df_synonyms_codes = get_frame_llm(reply) # reply to dataframe
    return df_synonyms_codes



if __name__ == '__main__':
    # some stuff
    with Pool(processes=cpu_count()) as pool:
             freeze_support()
             dfs_syn = pool.map(description_from_llm, list_chunks)
         df_final = pd.concat(dfs_syn)
         pool.close()

它(本地)运行得非常快,没有任何问题。然而,当我尝试使用 Anthropic Claude 3.5 做同样的事情时(我确保导入了所有需要的更新包并拥有有效的密钥等):

def llm_query(chunk, temperature=0, max_tokens=4096):
    model = "claude-3-5-sonnet-20240620"
    data = "Input data for analysis and enrichment: {x}".format(x=list_benefits_chunk)
    context, query = get_query()
    examples = get_few_shot_learning()
    messages = get_messages(context, data, query, examples)
    response = client.messages.create(
        model=model,
        messages=messages,
        temperature=temperature,
        max_tokens=max_tokens
    )
    return response

它不能正常工作:

TypeError: APIStatusError.__init__() missing 2 required keyword-only arguments: 'response' and 'body'

它循环工作:

   df_all = pd.DataFrame() 
    for chunk in list_chunks: 
        df= llm_query(chunk) 
        df_all = pd.concat[df_all, df],axis=0)

但是太慢了!

有没有办法并行调用人类 API?或者其他可以减少时间 x7 - x10 的解决方案(就像 mp 对 GPT4o 所做的那样)?

python parallel-processing multiprocessing large-language-model anthropic
1个回答
0
投票

我通常使用ThreadPoolExecutor

最小示例:

from anthropic import Anthropic
from concurrent.futures import ThreadPoolExecutor

TEMPERATURE = 0.5
CLAUDE_SYSTEM_MESSAGE = "You are a helpful AI assistant."

anthropic_client = Anthropic(api_key=ANTHROPIC_API_KEY)

def call_anthropic(
    prompt,
    model_id="claude-3-haiku-20240307",
    temperature=TEMPERATURE,
    system=CLAUDE_SYSTEM_MESSAGE,
):
    try:
        message = anthropic_client.messages.create(
            model=model_id,
            temperature=temperature,
            max_tokens=4096,
            system=system,
            messages=[
                {
                    "role": "user",
                    "content": prompt,
                }
            ],
        )
        return message.content[0].text

    except Exception as e:
        print(f"Error: {e}")
        return None

BASE_PROMPT = "What is the capital of {country}?"
COUNTRIES = ["Switzerland", "Sweden", "Sri Lanka", "Spain"]

prompts = [BASE_PROMPT.format(country=country) for country in COUNTRIES]
with ThreadPoolExecutor(max_workers=4) as executor:
    responses = list(executor.map(call_anthropic, prompts))

print(responses)

输出

['The capital of Switzerland is Bern.',
 'The capital of Sweden is Stockholm.',
 'The capital of Sri Lanka is Colombo.',
 'The capital of Spain is Madrid.']
© www.soinside.com 2019 - 2024. All rights reserved.