我们如何使用 vercel 的 AI sdk 流式传输 LangGraph 的最后一代?它与 LangChain LCEL 配合得很好,如本博客中所述。但是我们如何使用 LangGraph 和 vercel ai 来做到这一点?
我在处理图形流时遇到错误
TypeError: stream.pipeThrough is not a function
LangChainAdapter.toDataStreamResponse(final_generation)
当用户单击提交时,将调用下面的POST,从而调用图形流。这就是错误发生的地方。
//src/routes/api/chat
import { LangChainAdapter } from 'ai';
import type { RequestHandler } from './$types';
import type { Message } from 'ai/svelte';
import { Workflow } from '$lib/server/graph/workflow';
//server endpoint for chatGpt Stream Chat
export const POST: RequestHandler = async ({ request }) => {
const { messages }: { messages: Message[] } = await request.json();
let final_generation = null;
const eventStream = await Workflow.getCompiledStateGraph().streamEvents({'question': messages.pop(), 'chat_history': messages}, { version: "v2"});
for await (const { event, tags, data } of eventStream) {
if (event === "on_chat_model_stream") {
console.log("tags:", tags)
console.log("data", data);
console.log("event", event);
if (data.chunk.content) {
final_generation = data.chunk
}
}
}
return LangChainAdapter.toDataStreamResponse(final_generation);
};
一个简单的 CompiledStateGraph
export class Workflow {
// @ts-ignore
private static COMPILED_STATE_GRAPH: CompiledStateGraph | null = null;
private constructor() {}
public static getCompiledStateGraph() {
if (!Workflow.COMPILED_STATE_GRAPH) {
const graph = new StateGraph(State)
.addNode("retrieve", retrieveDocuments)
.addNode("llm_search", generate)
.addConditionalEdges(START, routeQuestion)
.addEdge("llm_search", END)
.addEdge("retrieve", END);
Workflow.COMPILED_STATE_GRAPH = graph.compile();
}
return Workflow.COMPILED_STATE_GRAPH;
}
}
生成节点
import { State } from '$lib/server/graph/state';
import { LLMClient } from '$lib/server/llm-client';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { ChatPromptTemplate } from '@langchain/core/prompts';
export const generate = async (state: typeof State.State): Promise<Partial<typeof State.State>> => {
console.log("---LLM Inference---");
const PROMPT_TEMPLATE = 'You are a helpful assistant!';
const prompt = ChatPromptTemplate.fromMessages([
['system', PROMPT_TEMPLATE],
['human', "{question}"],
]);
const routeUserQuestionChain = prompt.pipe(LLMClient.getClient()).pipe(new StringOutputParser());
const stream = await routeUserQuestionChain.invoke({question: state.question});
return { generation: stream };
};
状态
import type { Document } from "@langchain/core/documents";
import { Annotation } from "@langchain/langgraph";
import type { Message } from "ai/svelte";
export const State = Annotation.Root({
messages: Annotation<{messages:Message[]}>,
question: Annotation<string>,
generation: Annotation<string>,
documents: Annotation<Document[]>({
reducer: (_, y) => y,
default: () => [],
})
})
输入绑定、处理用户消息是通过 vercel ai 的 useChat() 完成的
渲染部分,这里 HumanInput 组件绑定到输入并将用户查询提交到handleSubmit,后者又调用我们之前的 POST 服务器函数。
<script lang="ts">
import HumanInput from "$lib/components/HumanInput.svelte";
import MaxWidthWrapper from '$lib/components/MaxWidthWrapper.svelte';
import DisplayMessages from "$lib/components/DisplayMessages.svelte";
import {useChat} from '@ai-sdk/svelte';
const { input, handleSubmit, messages } = useChat();
</script>
<div class="flex flex-col h-screen">
<div class="flex-grow overflow-hidden">
<MaxWidthWrapper class_="h-full flex flex-col">
<DisplayMessages {messages} />
<HumanInput {input} {handleSubmit}/>
</MaxWidthWrapper>
</div>
</div>
这里我们需要理解的主要概念是Vercel AI和LangChain如何处理消息。 AI SDK 从
Message
包中理解 ai
,而 LangChain 处理 BaseMessage
包中的 @langchain/core/messages
子类型。
解决此问题的技巧是在这两种格式之间转换消息。
//src/api/chat/+server.ts
import { LangChainAdapter } from 'ai';
import type { Message } from 'ai/svelte';
import { Workflow } from '$lib/server/graph/workflow';
import { convertLangChainMessageToVercelMessage, convertVercelMessageToLangChainMessage } from '$lib/utils/utility';
export const POST = async ({ request, params }) => {
const config = { configurable: { thread_id: params.id}, version: "v2" };
const messages: { messages: Message[] } = await request.json();
const userQuery = messages.messages[messages.messages.length - 1].content;
let history = (messages.messages ?? [])
.slice(0, -1)
.filter(
(message: Message) =>
message.role === 'user' || message.role === 'assistant'
)
.map(convertVercelMessageToLangChainMessage);
let compiledStateGraph = Workflow.getCompiledStateGraph();
const stream = await compiledStateGraph.streamEvents({question: userQuery, messages: history}, config);
const transformStream = new ReadableStream({
async start(controller) {
for await (const { event, data, tags } of stream) {
if (event === 'on_chat_model_stream') {
if (!!data.chunk.content && tags.includes("llm_inference")) {
const aiMessage = convertLangChainMessageToVercelMessage(data.chunk);
controller.enqueue(aiMessage);
}
}
}
controller.close();
}
});
return LangChainAdapter.toDataStreamResponse(transformStream);
};
在调用图形流程之前,我从
useChat()
的消息数组中提取当前用户查询,然后使用 convertVercelMessageToLangChainMessage
函数将其余消息转换为 langchain 可理解的格式。同样,一旦我从 LangChain 流中收到 AIMesssageChunks
,我就会使用 convertLangChainMessageToVercelMessage
函数将它们转换回 vercel ai 可理解的格式,然后将流返回给 useChat()
来处理新消息。
import type { Message } from 'ai/svelte';
import { AIMessage, BaseMessage, ChatMessage, HumanMessage } from '@langchain/core/messages';
/**
* Converts a Vercel message to a LangChain message.
* @param message - The message to convert.
* @returns The converted LangChain message.
*/
export const convertVercelMessageToLangChainMessage = (message: Message): BaseMessage => {
switch (message.role) {
case 'user':
return new HumanMessage({ content: message.content });
case 'assistant':
return new AIMessage({ content: message.content });
default:
return new ChatMessage({ content: message.content, role: message.role });
}
};
/**
* Converts a LangChain message to a Vercel message.
* @param message - The message to convert.
* @returns The converted Vercel message.
*/
export const convertLangChainMessageToVercelMessage = (message: BaseMessage) => {
switch (message.getType()) {
case 'human':
return { content: message.content, role: 'user' };
case 'ai':
return {
content: message.content,
role: 'assistant',
tool_calls: (message as AIMessage).tool_calls
};
default:
return { content: message.content, role: message._getType() };
}
};
另请注意
if (!!data.chunk.content && tags.includes("llm_inference"))
这是我们过滤最后一代的方式。在图的最后一个节点执行期间,我们可以使用配置(标签)来标记 LLM,稍后我们可以使用它来获取该节点执行的结果。
export const llmInference = async (state: typeof GraphState.State) => {
console.log("---LLM Inference---");
const PROMPT_TEMPLATE = 'You are a helpful assistant! Please answer the user query. Use the chat history to provide context.';
const prompt = ChatPromptTemplate.fromMessages([
['system', PROMPT_TEMPLATE],
['human', "{question}"],
['human', "Chat History: {messages}"],
]);
const inferenceChain = prompt.pipe(LLMClient.getClient().withConfig({ tags: ["llm_inference"]}));
const generation = await inferenceChain.invoke(state);
return { messages: [generation] };
};