我正在使用 Azure OpenAI 根据查询生成响应,我使用 PostAPI 中的邮递员与男孩一起发送它,如下所示:
{"query":"what is de-iceing"}
现在,一旦它运行,我就可以看到我得到的终端是
Azure Function runtime version: 3.11 [2024-10-18T] Request body: b'{"query":"what is de-iceing"}' [2024-10-18T] data
但我没有在 StreamingResponse 中获得 OpenAI 生成的响应。
附件是我的azure函数的完整代码。
# importing the libraies
import azure.functions as func
import logging
# from langchain_text_splitters import CharacterTextSplitter
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_openai import AzureOpenAIEmbeddings
import os
from openai import AzureOpenAI
import openai
# from dotenv import load_dotenv
from langchain_community.vectorstores.azuresearch import AzureSearch
import json
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio
# load_dotenv()
# create instance of an Azure dunction
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
# Get data from Azure Open AI
# async def stream_processor(response):
# async for chunk in response:
# if len(chunk.choices) > 0:
# delta = chunk.choices[0].delta
# if delta.content: # Get remaining generated response if applicable
# await asyncio.sleep(0.1)
# yield delta.content
async def stream_processor(response):
if hasattr(response, '__aiter__'):
async for chunk in response:
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if delta.content:
await asyncio.sleep(0.1)
yield delta.content
else:
# Handle non-async iterable case if necessary
for chunk in response:
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
@app.route(route="v2_AMMBackend")
async def v2_AMMBackend(req: Request) -> StreamingResponse:
logging.info('Python HTTP trigger function processed a request.')
runtime_version = os.environ.get('FUNCTIONS_WORKER_RUNTIME_VERSION')
logging.info(f"Azure Function runtime version: {runtime_version}")
logging.info(req)
req_body = await req.body() # Use await to get the body asynchronously
logging.info(f'Request body: {req_body}')
# Parse the request body as JSON
data = json.loads(req_body.decode('utf-8'))
logging.info('data')
logging.info(data)
# req_body = req.get_body().decode('utf-8')
# # Parse the request body as JSON
# data = json.loads(req_body)
# Access the variables from the request body
variable1 = data.get("query")
logging.info(variable1)
# Use AzureOpenAIEmbeddings with an Azure account
embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings(
azure_deployment=os.environ.get("EMB_AZURE_DEPLOYMENT"),
openai_api_version=os.environ.get("EMB_AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.environ.get("EMB_AZURE_ENDPOINT"),
api_key=os.environ.get('EMB_AZURE_OPENAI_API_KEY'),
)
# Calling the AzureSearch with index_name
index_name= "de-iceing-test"
vector_store: AzureSearch = AzureSearch(
azure_search_endpoint=os.environ.get("VECTOR_STORE_ADDRESS"),
azure_search_key=os.environ.get("VECTOR_STORE_PASSWORD"),
index_name=index_name,
embedding_function=embeddings.embed_query,
)
# Calling the AzureOpenAI
client = AzureOpenAI(
api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"),
api_version = "2023-07-01-preview",
azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT")
)
# Perform a similarity search
def userchat(query):
docs = vector_store.similarity_search(
query=fr"{query}",
k=2,
search_type="similarity",
)
chunk = [item.page_content for item in docs]
return chunk
context = userchat(variable1)
prompt = fr"Context: {context} \
Based on the above context please provide a clear and concise answer to the user's query/question mentioned below.\
Query: {variable1}.\
If the query is about the procedue or the method then generate the text in bullet points."
prompt1 = fr"Here is the query from the user, Query: {variable1} \
Can you please provide a clear and concise solution to the user's query based on the content below and summary below.\
Content: {context}"
# async def get_chat_completion(prompt):
response = client.chat.completions.create(
model="cbothr-gpt-4-001",
stream=True,
messages=[
{"role": "user", "content": f"{prompt}"}
]
)
logging.info('response')
logging.info(response)
# Make sure to call the function within an async context
if hasattr(response, '__aiter__'):
return StreamingResponse(stream_processor(response), media_type="text/event-stream")
else:
return func.HttpResponse(
"Error: Response is not iterable.",
status_code=500
)
我无法获得代码最后几行中提到的响应,而我得到的是:
{
"_HttpResponse__status_code": 500,
"_HttpResponse__mimetype": "text/plain",
"_HttpResponse__charset": "utf-8",
"_HttpResponse__headers": {},
"_HttpResponse__body": "Error: Response is not iterable."
}
我该怎么办?
您需要使用 AsyncAzureOpenAI 创建 Azure OpenAI 客户端,如下所示。
client = openai.AsyncAzureOpenAI(
api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"),
api_version = "2024-08-01-preview",
azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT")
)
您的完整代码应如下所示。
import azure.functions as func
import logging
from langchain_openai import AzureOpenAIEmbeddings
import os
import openai
from langchain_community.vectorstores.azuresearch import AzureSearch
import json
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
async def stream_processor(response):
async for chunk in response:
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if delta.content:
await asyncio.sleep(0.1)
yield delta.content
@app.route(route="http_trigger", methods=[func.HttpMethod.POST])
async def http_trigger(req: Request) -> StreamingResponse:
logging.info('Python HTTP trigger function processed a request.')
req_body = await req.body()
logging.info(f'Request body: {req_body}')
data = json.loads(req_body.decode('utf-8'))
variable1 = data.get("query")
embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings(
azure_deployment=os.environ.get("EMB_AZURE_DEPLOYMENT"),
openai_api_version=os.environ.get("EMB_AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.environ.get("EMB_AZURE_ENDPOINT"),
api_key=os.environ.get('EMB_AZURE_OPENAI_API_KEY'),
)
index_name= "de-iceing-test"
vector_store: AzureSearch = AzureSearch(
azure_search_endpoint=os.environ.get("VECTOR_STORE_ADDRESS"),
azure_search_key=os.environ.get("VECTOR_STORE_PASSWORD"),
index_name=index_name,
embedding_function=embeddings.embed_query,
)
client = openai.AsyncAzureOpenAI(
api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"),
api_version = "2024-08-01-preview",
azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT")
)
def userchat(query):
docs = vector_store.similarity_search(
query=fr"{query}",
k=2,
search_type="similarity",
)
chunk = [item.page_content for item in docs]
return chunk
context = userchat(variable1)
prompt = fr"Content: {context} \
please provide a clear and concise answer to the user's query/question mentioned below.\
Query: {variable1}.\
If the query is about the procedue or the method then generate the text in bullet points."
prompt1 = fr"Here is the query from the user, Query: {variable1} \
Can you please provide a clear and concise solution to the user's query based on the content below and summary below.\
Content: {context}"
logging.info(prompt)
response = await client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": f"{prompt}"}
],
stream=True
)
logging.info(f"OpenAI API Response Object: {response}")
if response is not None:
logging.info("Streaming started...")
return StreamingResponse(stream_processor(response), media_type="text/event-stream")
else:
return func.HttpResponse(
"Error: Response is not iterable or streaming not supported.",
status_code=500
)