我无法理解事件循环如何处理我的代码段。我想要实现的是
const readAndUpload = () => {
fs.createReadStream('filename.csv')
.pipe(csv())
.on('data', ((row: any) => {
const file = fs.createWriteStream("file.jpg");
var url = new URL(row.imageURL)
// choose whether to make an http or https request
let client = (url.protocol=="https:") ? https : http
const request = client.get(row.imageURL, function(response:any) {
// file save
response.pipe(file);
console.log('file saved')
let filePath = "file.jpg";
let params = {
Bucket: 'bucket-name',
Body : fs.createReadStream(filePath),
Key : "filename.jpg"
};
// upload to s3
s3.upload(params, function (err: any, data: any) {
//handle error
if (err) {
console.log("Error", err);
}
//success
if (data) {
console.log("Uploaded in:", data.Location);
row.imageURL = data.Location
writeData.push(row)
// console.log(writeData)
}
});
});
}))
.on('end', () => {
console.log("done reading")
const csvWriter = createCsvWriter({
path: 'out.csv',
header: [
{id: 'id', title: 'some title'}
]
});
csvWriter
.writeRecords(writeData)
.then(() => console.log("The CSV file was written successfully"))
})
}
按照我添加的日志语句,done reading
和The CSV file was written successfully
在file saved
之前打印。我的理解是end
事件是在data
事件之后调用的,所以我不确定我要去哪里。
谢谢您的阅读!
我不确定这是否是问题的一部分,但是在这部分代码中,您还有另外一组parens。更改此:
.on('data', ((row: any) => {
.....
})).on('end', () => {
至此:
.on('data', (row: any) => {
.....
}).on('end', () => {
并且,如果正确设置了事件处理程序,则对于同一流,将在.on('data', ...)
之前调用.on('end', ....)
事件处理程序。如果你这样说:
console.log('at start of data event handler');
作为该事件处理程序的第一行,您将看到它首先被调用。
但是,您的数据事件处理程序使用多个异步调用,您的代码中没有任何内容使end
事件等待在data
事件处理程序中完成所有处理。因此,由于该处理需要一段时间,因此很自然会在完成对end
事件的所有异步代码之前,会发生data
事件。
此外,如果您可以有多个data
事件(通常会发生这种情况),那么由于要使用固定的文件名,因此您将同时运行多个data
事件,它们可能会互相覆盖。
解决此类问题的常用方法是在stream.pause()
事件处理开始时先data
暂停读取流,然后在完成所有异步工作后,再按stream.resume()
让它开始再去一次。
您需要获得正确的流才能暂停和继续。您可以执行以下操作:
let stream = fs.createReadStream('filename.csv')
.pipe(csv());
stream.on('data', ((row: any) => {
stream.pause();
....
});
然后,在s3.upload()
回调中,您可以调用stream.resume()
。您还将需要更好得多的错误处理,否则,如果遇到错误,事情只会被卡住。
看来您在呼叫时也遇到了其他并发问题:
response.pipe(file);
然后您尝试使用file
,而没有实际等待该.pipe()
操作完成(这也是异步的)。总体而言,这整个逻辑确实需要进行重大清理。我不明白您在所有不同步骤中到底想做什么,以了解如何编写完全干净和简单的版本。