我尝试使用流式传输从 JS 中的数据库中获取大量数据,以防止一次将所有数据加载到内存中。我使用express作为我的服务器和使用Axios获取数据的nodeJS客户端。我已成功通过流式传输获取数据,但不知道如何处理流式传输时发生的错误。
特快服务器:
app.get('/stream', async (req, res) => {
try {
const cursor = //fetch data with a limit & skip (MongoDB)
while(cursor.hasNext()) {
const data = await cursor.next()
const writeToStream = new Promise((resolve) => {
res.write(data, 'utf8', () => {
console.log("batch sent");
resolve()
})
})
await writeToStream
}
res.end()
} catch(error) {
console.log(`error: ${error.message}`)
//How do i send the status & error message to the requestor?
//return res.status(400).end(error.message) // <-- wanted behavior
})
客户:
try {
const res = await axios({
url: 'http://localhost:3000/test',
responseType: 'stream'
})
const { data } = res
data.pipe(someStream)
data.on('end', () => {
//stream finished
})
data.on('error', (error) => { // First Option
//error without a status or message
//res.status(error.status).send(error.message) // <-- wanted behavior
})
} catch(error) { // Second Option
//error without a status or message
//return res.status(error.status).send(error.message) // <-- wanted behavior
}
客户端上的错误处理有效(代码运行),但我无法弄清楚如何从服务器向客户端发送状态和消息,指示错误并指定它。
版本: “axios”:“^1.5.1”,“express”:“^4.18.2”
希望得到一些帮助。 提前谢谢!
问题是,在将标头发送到客户端后,您无法设置标头。因此,当您使用
res.write
启动流时,标头已作为 200
发送。
你能做的就是使用一个技巧。您可以为
DATA
和 ERROR
设置固定前缀。通过这种方式,您可以区分什么是真实数据和什么是真实错误。我知道这不是最有效的方法,但是可行并且似乎合理,因为流堆栈本身根本不提供错误管理机制。
服务器.ts:
import express from 'express'
const app = express()
const port = 3000
const DATA_PREFIX = "DATA:"
const ERROR_PREFIX = "ERROR:"
app.get('/stream', async (req, res) => {
try {
// I have replaced the query to MongoDB with some sample data.
const sample_datas = ["data1", "data2", "data3", "data4", "data5"]
for (let dt of sample_datas) {
const writeToStream = new Promise((resolve) => {
// if dt == "data4", then simulate an error
if (dt == "data4") {
throw new Error("data4 has problems.")
}
res.write(DATA_PREFIX + dt, 'utf8', () => {
console.log("batch sent");
resolve(true)
})
})
await writeToStream
}
res.end()
} catch (error) {
console.log(`error: ${error.message}`)
//How do i send the status & error message to the requestor?
res.write(ERROR_PREFIX + error.message)
res.end()
}
})
app.listen(port, () => {
console.log(`Example app listening on port ${port}`)
})
客户端.ts
import axios from 'axios'
import fs from 'fs'
import stream from 'stream'
const DATA_PREFIX = "DATA:"
const ERROR_PREFIX = "ERROR:"
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
const main = async () => {
await sleep(2000)
try {
const res = await axios({
url: 'http://localhost:3000/stream',
responseType: 'stream'
})
const { data } = res
const writableStream = fs.createWriteStream('__output__.txt');
const appendText = new stream.Transform({
transform(chunk: Buffer, encoding, callback) {
const chunk_str = chunk.toString('utf8')
if (chunk_str.startsWith(DATA_PREFIX)) {
this.push(Buffer.from(chunk_str.replace(DATA_PREFIX, ""), 'utf8'))
}
else if (chunk_str.startsWith(ERROR_PREFIX)) {
const error = chunk_str.replace(ERROR_PREFIX, "")
error_function(new Error(error))
}
callback();
}
});
data.pipe(appendText).pipe(writableStream);
data.on('end', () => {
console.log("stream finished")
})
const error_function = (error: Error) => {
console.log("data.on error:", error)
}
data.on('error', error_function)
} catch (error) {
console.log("catch error:", error)
}
}
main()