我无法使用 OpenAI Azure 的 StreamingResponse 从 HTTP Post API 获取所需的响应

问题描述 投票:0回答:1

我正在使用 Azure OpenAI 根据查询生成响应,我使用 PostAPI 中的邮递员与男孩一起发送它,如下所示:

{"query":"what is de-iceing"}

现在,一旦它运行,我就可以看到我得到的终端是

Azure Function runtime version: 3.11 [2024-10-18T] Request body: b'{"query":"what is de-iceing"}' [2024-10-18T] data
但我没有在 StreamingResponse 中获得 OpenAI 生成的响应。 附件是我的azure函数的完整代码。

# importing the libraies
import azure.functions as func
import logging
# from langchain_text_splitters import CharacterTextSplitter
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_openai import AzureOpenAIEmbeddings
import os
from openai import AzureOpenAI
import openai
# from dotenv import load_dotenv
from langchain_community.vectorstores.azuresearch import AzureSearch
import json
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio
# load_dotenv()

# create instance of an Azure dunction 
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# Get data from Azure Open AI
# async def stream_processor(response):
#     async for chunk in response:
#         if len(chunk.choices) > 0:
#             delta = chunk.choices[0].delta
#             if delta.content: # Get remaining generated response if applicable
#                 await asyncio.sleep(0.1)
#                 yield delta.content

async def stream_processor(response):
    if hasattr(response, '__aiter__'):
        async for chunk in response:
            if len(chunk.choices) > 0:
                delta = chunk.choices[0].delta
                if delta.content:
                    await asyncio.sleep(0.1)
                    yield delta.content
    else:
        # Handle non-async iterable case if necessary
        for chunk in response:
            if len(chunk.choices) > 0:
                delta = chunk.choices[0].delta
                if delta.content:
                    yield delta.content

@app.route(route="v2_AMMBackend")
async def v2_AMMBackend(req: Request) -> StreamingResponse:
    logging.info('Python HTTP trigger function processed a request.')

    runtime_version = os.environ.get('FUNCTIONS_WORKER_RUNTIME_VERSION')
    logging.info(f"Azure Function runtime version: {runtime_version}")
    logging.info(req)
    req_body = await req.body()  # Use await to get the body asynchronously
    logging.info(f'Request body: {req_body}')
    
    # Parse the request body as JSON
    data = json.loads(req_body.decode('utf-8'))
    logging.info('data')
    logging.info(data)

    # req_body = req.get_body().decode('utf-8')
    # # Parse the request body as JSON
    # data = json.loads(req_body)
    # Access the variables from the request body
    variable1 = data.get("query")
    logging.info(variable1)

    # Use AzureOpenAIEmbeddings with an Azure account
    embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings(
        azure_deployment=os.environ.get("EMB_AZURE_DEPLOYMENT"),
        openai_api_version=os.environ.get("EMB_AZURE_OPENAI_API_VERSION"),
        azure_endpoint=os.environ.get("EMB_AZURE_ENDPOINT"),
        api_key=os.environ.get('EMB_AZURE_OPENAI_API_KEY'),
    )

    # Calling the AzureSearch with index_name
    index_name= "de-iceing-test"
    vector_store: AzureSearch = AzureSearch(
        azure_search_endpoint=os.environ.get("VECTOR_STORE_ADDRESS"),
        azure_search_key=os.environ.get("VECTOR_STORE_PASSWORD"),
        index_name=index_name,
        embedding_function=embeddings.embed_query,
    )

    # Calling the AzureOpenAI
    client = AzureOpenAI(
    api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"),  
    api_version = "2023-07-01-preview",
    azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT")
    )

    # Perform a similarity search
    def userchat(query):
        docs = vector_store.similarity_search(
            query=fr"{query}",
            k=2,
            search_type="similarity",
        )
        chunk = [item.page_content for item in docs]
        return chunk
    
    context = userchat(variable1)
    prompt = fr"Context: {context} \
                Based on the above context please provide a clear and concise answer to the user's query/question mentioned below.\
                Query: {variable1}.\
                If the query is about the procedue or the method then generate the text in bullet points."
    prompt1 = fr"Here is the query from the user, Query: {variable1} \
                Can you please provide a clear and concise solution to the user's query based on the content below and summary below.\
                Content: {context}"

    # async def get_chat_completion(prompt):
    response = client.chat.completions.create(
        model="cbothr-gpt-4-001",
        stream=True,
        messages=[
            {"role": "user", "content": f"{prompt}"}
        ]
    )
    logging.info('response')
    logging.info(response)

# Make sure to call the function within an async context
    if hasattr(response, '__aiter__'):
        return StreamingResponse(stream_processor(response), media_type="text/event-stream")
    else:
        return func.HttpResponse(
        "Error: Response is not iterable.",
        status_code=500
    )

我无法获得代码最后几行中提到的响应,而我得到的是:

{
    "_HttpResponse__status_code": 500,
    "_HttpResponse__mimetype": "text/plain",
    "_HttpResponse__charset": "utf-8",
    "_HttpResponse__headers": {},
    "_HttpResponse__body": "Error: Response is not iterable."
}

我该怎么办?

azure azure-functions postman openai-api streamingresponsebody
1个回答
0
投票

您需要使用 AsyncAzureOpenAI 创建 Azure OpenAI 客户端,如下所示。

client = openai.AsyncAzureOpenAI(
    api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"),  
    api_version = "2024-08-01-preview",
    azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT")
    )

您的完整代码应如下所示。

import azure.functions as func
import logging
from langchain_openai import AzureOpenAIEmbeddings
import os
import openai
from langchain_community.vectorstores.azuresearch import AzureSearch
import json
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

async def stream_processor(response):
    async for chunk in response:
        if len(chunk.choices) > 0:
            delta = chunk.choices[0].delta
            if delta.content: 
                await asyncio.sleep(0.1)
                yield delta.content


@app.route(route="http_trigger", methods=[func.HttpMethod.POST])
async def http_trigger(req: Request) -> StreamingResponse:
    logging.info('Python HTTP trigger function processed a request.')
    
    req_body = await req.body()  
    logging.info(f'Request body: {req_body}') 
    data = json.loads(req_body.decode('utf-8'))
    variable1 = data.get("query")
    
    embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings(
        azure_deployment=os.environ.get("EMB_AZURE_DEPLOYMENT"),
        openai_api_version=os.environ.get("EMB_AZURE_OPENAI_API_VERSION"),
        azure_endpoint=os.environ.get("EMB_AZURE_ENDPOINT"),
        api_key=os.environ.get('EMB_AZURE_OPENAI_API_KEY'),
    )

    index_name= "de-iceing-test"
    vector_store: AzureSearch = AzureSearch(
        azure_search_endpoint=os.environ.get("VECTOR_STORE_ADDRESS"),
        azure_search_key=os.environ.get("VECTOR_STORE_PASSWORD"),
        index_name=index_name,
        embedding_function=embeddings.embed_query,
    )

    client = openai.AsyncAzureOpenAI(
    api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"),  
    api_version = "2024-08-01-preview",
    azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT")
    )

    def userchat(query):
        docs = vector_store.similarity_search(
            query=fr"{query}",
            k=2,
            search_type="similarity",
        )
        chunk = [item.page_content for item in docs]
        return chunk
    
    context = userchat(variable1)
    prompt = fr"Content: {context} \
                please provide a clear and concise answer to the user's query/question mentioned below.\
                Query: {variable1}.\
                If the query is about the procedue or the method then generate the text in bullet points."
    prompt1 = fr"Here is the query from the user, Query: {variable1} \
                Can you please provide a clear and concise solution to the user's query based on the content below and summary below.\
                Content: {context}"
    
    logging.info(prompt)

    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "user", "content": f"{prompt}"}
        ],
        stream=True
    )
    logging.info(f"OpenAI API Response Object: {response}")

    if response is not None:
        logging.info("Streaming started...")
        return StreamingResponse(stream_processor(response), media_type="text/event-stream")
    else:
        return func.HttpResponse(
            "Error: Response is not iterable or streaming not supported.",
            status_code=500
    )

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.