我有一个在后台使用 RAG 管道的 Flask 应用程序,并且我从 vLLM 流式传输响应。我目前正在使用一个函数来解析并生成流式 JSON 响应。但是,我面临一个问题,即流输出包含以前生成的文本,而不仅仅是最新的内容
def parse_json_stream(line):
decoded_line = line.decode('utf-8')
decoder = json.JSONDecoder()
pos = 0
while pos < len(decoded_line):
try:
result, json_end = decoder.raw_decode(decoded_line[pos:])
if "text" in result:
print(result["text"]) # debugging
if result["text"]:
yield result["text"][0].encode("utf-8")
pos += json_end
except json.JSONDecodeError:
# if can't decode JSON, go next character
pos += 1
以下是我如何使用此函数在 Flask 中打印响应: 我想确保只生成法学硕士的最新文本,不包括任何以前生成的文本。
def generate():
try:
# We can use prompt_len to slice down the text, not using it right now
prompt_len, response = process_and_respond(file_path, question)
print("********* Generate Function **********")
for line in response.iter_lines():
if line:
generator = parse_json_stream(line)
for parsed_text in generator:
if parsed_text:
yield parsed_text
finally:
print(f"Deleting File {file_path}")
os.remove(file_path)
我不确定您从
process_and_respond
收到的有效负载是什么,所以我只是假设之前生成的文本包含在该有效负载中。
您可以尝试的是跟踪您在
parse_json_stream
中生成的文本,以便您可以将其与后续文本进行比较,并仅返回之前未生成的文本。
def parse_json_stream(line):
decoded_line = line.decode('utf-8')
decoder = json.JSONDecoder()
pos = 0
### to track yielded text
yielded_text = ''
while pos < len(decoded_line):
try:
result, json_end = decoder.raw_decode(decoded_line[pos:])
if "text" in result:
print(result["text"]) # debugging
if result["text"]:
### remove yielded text from current text
to_yield = result["text"][0].encode("utf-8").replace(yielded_text, '')
yield to_yield
### update yielded text
yielded_text += to_yield
pos += json_end
except json.JSONDecodeError:
# if can't decode JSON, go next character
pos += 1