在服务器端:
我有一个在 docker 容器中运行的 fastapi。服务器有一个端点,它返回一个 FastAPI StreamingResponse ,其中包含一个 python 生成器作为响应,如下所示:
@router.post("/json/stream", summary="Open a stream of json objects to be collected")
def stream_json_as_received(opt_request: AllJsonRequest):
# makes a call to a data provider
def generate_json_data(args):
# do some processing on the received data
yield json.dumps(chunked_json_dict) + '\n'
# start the generator again if there are more
if "next_url" in json_response:
yield from generate_json_data(args)
elif "next_url" not in json_response:
print("All json chunks has been retrieved")
pass
return StreamingResponse(generate_json_data(args), media_type="application/x-ndjson"
这个想法是在客户端上显示 StreamingResponse 生成的 Json 对象。
在客户端:
这就是问题开始的地方。在我的ios客户端(SwiftUI)上,我尝试使用AlamoFire库使用这个端点,如下所示:
import SwiftUI
import Combine
import Alamofire
class ViewModel: ObservableObject {
@Published var jsonObjects: [AllJsonsStream] = []
fileprivate lazy var alamoSession: Session = {
let configuration = URLSessionConfiguration.default
configuration.timeoutIntervalForRequest = 20
configuration.urlCache?.removeAllCachedResponses()
return Alamofire.Session(configuration: configuration /*, interceptor: interceptor*/)
}()
fileprivate func alamofireChunked () {
guard let url = URL(string: "https://mydockerfastapiurl/json/stream")
else {
return
}
let parameters: [String: Any] = [
"arg1": val1
]
var request = URLRequest(url: url)
request.httpMethod = "POST"
request.addValue("application/json", forHTTPHeaderField: "Content-Type")
request.addValue("application/json", forHTTPHeaderField: "Accept")
request.addValue("chunked", forHTTPHeaderField: "transfer-encoding")
print(request.headers)
do {
request.httpBody = try JSONSerialization.data(withJSONObject: parameters, jsonObjects: .prettyPrinted)
} catch {
print("ERROR")
}
alamoSession.streamRequest(request).validate().responseStream { response in
if let data = response.value {
print("received \(data.count) bytes")
if data.count > 0 {
// process received data here
print("data:", data.description)
let decoder = JSONDecoder()
do {
try JSONSerialization.jsonObject(with: data)
let decodedResponse = try decoder.decode(AllJsonsStream.self, from:data)
print("Decoded:", decodedResponse)
self.jsonObjects.append(decodedResponse)
} catch let error {
print(error)
}
}
} else {
switch response.event {
case .complete( _):
// perform any post processing here
print("stream is finished")
print("response:", response.result)
break;
default:
print("default...")
break;
}
}
}
}
}
struct ContentView: View {
private var urlString = ""
var viewModel = ViewModel()
var body: some View {
VStack {
Button {
viewModel.alamofireChunked()
} label: {
Text("Fetch stream")
}
}
}
}
当我运行此示例项目时,我收到一系列以下错误:来自每个返回的块的
Error Domain=NSCocoaErrorDomain Code=3840 "JSON text did not start with array or object and option to allow fragments not set. around line 1, column 0." UserInfo={NSDebugDescription=JSON text did not start with array or object and option to allow fragments not set. around line 1, column 0., NSJSONSerializationErrorIndex=0}
。
这显然指出了错误,即分块响应不是正确的 json 格式,因此显然无法将它们序列化到 AllJsonStreams 对象中。
这导致我更改“.responseStream”->“.responseStreamString”以查看来自端点的实际响应以及发生此错误的原因。我打印了每个块进来的时候,我发现每个响应都带回来:第一个生成的 json 和下一个生成的 json 中的一些,在随机点处切断,有点像这样:
'{"key1": "value1"}, {"Key2": "va'
显然这不是正确的 JSON,而且,我很困惑为什么它带回的数据不仅仅是每个块中 1 个正确生成的 json 对象。
然后我去验证 fastAPI 端点是否在每个收益上正确返回了良好的 json 对象。我在 Postman 和 FastAPI 服务器的 SwaggerUI 中测试了响应,这两个响应看起来都很好且正确。
这就是它变得有点时髦的地方。
然后我有了在本地运行我的 fastAPI 服务器并测试所有相同内容的想法,一切看起来都不错。因此,我将 localhost url 插入到我的 Swift 项目“http://127.0.0.1:8000/json/stream”,令我完全惊讶的是,每个块都有完全正确的 json(没有额外的半 json 对象)和所有内容连载完美。唯一改变的是从指向 docker 容器的 url 切换到指向我的本地主机版本的服务器的 url。
更让我困惑的是,我们的 Android 团队可以从 docker 容器服务器完美地使用这个端点。
我能想到它可以在本地主机上正常工作但不能在容器上正常工作的唯一原因是请求的速度更快或者docker配置中出现问题(这仍然没有完全意义,因为android版本)工作正常)
我尝试查看 alamo-fire 文档几个小时,试图在库中找到可能发生这种情况的原因(作为参考,在使用 alamo fire 之前,我们仅使用合并库尝试了类似的设置,并得到了类似的结果)。
我尝试使用我的可解码数据类型添加 CustomSerializer,同样的错误。我尝试将内容类型标头更改为“application/x-ndjson”,就像 StreamingResponse 返回一样,并尝试将 python 生成器中的 media_type 更改为“text/event-stream”或“application/json”之类的内容,同样的错误。
因为我认为请求的速度以某种方式搞乱了它,所以我尝试更改 Swift 会话配置以允许更大的 timeoutIntervalForRequest,同样的错误。
这可能是我的 fastAPI 端点是 POST 而不是 GET 的问题吗?
此时我完全迷失了,我的整个团队也是如此,任何帮助将不胜感激。
DataStreamRequest
故意将缓冲和分块留给消费者。原始 HTTP 流使您无法(好吧,很少)控制客户端的数据分块,并且
URLSession
没有任何帮助。因此,可以采用多种标准和方法来进行分块和缓冲。但是,您可能正在使用可用于手动解析传入数据的标准响应类型。根据
标准,每个对象将用换行符分隔,这使得解析相对简单。您需要在
DataStreamRequest
之上构建一个缓冲层,然后在换行符之后分割并解析 JSON。一般来说,您会想要:
Data
并附加每个传入的卡盘。
Data
的第一个前缀,直到换行符(包括换行符)。将
Data
解析为 JSON。
一旦 Alamofire 的
WebSocketRequest
更加适合生产,我建议切换到 Websocket,它可以为您处理所有这些事情。