对于下面的代码,我遇到了一个问题,即我的代码没有收到来自 Axios API 调用的响应,即使请求已到达服务器并且服务器正在响应。结果,我的代码一直在等待响应,并且批处理未完成。我已经在 Postman 中使用相同的标头/正文和 URL 测试了该请求,它正在按预期工作。
我认为这与 Async/Await 有关,但由于我不擅长 javascript/nodejs 语言,所以我无法找出下面代码中的问题。
我正在寻找一种解决方案来处理大型 CSV 文件并将请求批量发送到外部 API,并且对于单个批次同时发送请求。
var express = require('express');
var router = express.Router();
const fs = require('fs');
const csv = require('csv-stream');
const through2 = require('through2');
const { performance } = require('perf_hooks');
var axios = require('axios');
var configProcessed = 0;
var configList = null;
const MAX_THREADS = 5;
const workerPool = Array.from({ length: MAX_THREADS }, () => false);
// Create a task queue
const taskQueue = [];
// Function to find an available worker thread
function findAvailableWorker() {
return workerPool.findIndex(worker => !worker);
}
// Function to call the the API for processing of confiurations
async function callAPI(data) {
try{
let response = await axios.request({
method: 'PUT',
url: url,
data: JSON.stringify(data),
headers:{'Content-Type': 'application/json',
'Authorization': 'Basic '+ user}
});
configProcessed++;
return response.data;
}
catch(error) {
console.log(error);
let message = '';
if (error.response) {
message = JSON.stringify(error.response.data);
} else {
message = "Unkown Error:" + error.message;
}
configProcessed++;
return Promise.reject(message);
}
}
// Function to process the single batch of concurrent requests
async function processConfigurations(taskData) {
try {
return await Promise.all(taskData.configList.map(config => {
return callAPI(config);
}));
} catch (error) {
console.error("Error processing configurations:", error);
}
}
// Function to run a task using an available worker
async function runTaskInPool(taskData) {
let result = null;
const workerIndex = findAvailableWorker();
if (workerIndex !== -1) {
workerPool[workerIndex] = true;
try {
result = await processConfigurations(taskData);
workerPool[workerIndex] = false;
} catch (error) {
workerPool[workerIndex] = false;
throw error;
}
} else {
// Add task to the queue
taskQueue.push(taskData);
}
return result;
}
// Function to process the bootstrapping for all the configurations available within CSV file
async function bootstrapConfig(params) {
const stream = fs.createReadStream('./' + fileName +'.csv'); // Stream to process the large file line-by-line
stream
.pipe(csv.createStream({
endLine : '\n',
columns : ['COL1', 'COL2', 'COL3', 'COL4'],
escapeChar : '"',
enclosedChar : '"'
}))
.pipe(through2({ objectMode: true }, (row, enc, cb) => {
config = //prepare config with row
configList.push(config);
cb(null, true); // Process next row in CSV
}))
.on('data', async function (data) { // For every row of CSV file, when data is found
if(configList.length == batchSize) { // Check if the size of the config list match with the concurrent requests
let tempConfigList = [...configList];
configList.length = 0;
let response = await runTaskInPool({configList:tempConfigList});
}
})
.on('end', async function() { // on last row of the CSV fie
if(configList.length > 0) {
let tempConfigList = [...configList];
configList.length = 0;
let response = await runTaskInPool({configList:tempConfigList});
processQueue();
} else {
processQueue();
}
})
.on('error', err => {
console.error("Unhandled error while reading CSV file and processing");
})
}
// Function to process pending tasks from the queue
async function processQueue() {
if (taskQueue.length > 0) {
const taskData = taskQueue.shift();
try {
await runTaskInPool(taskData);
} catch (error) {
console.error('Error processing task from queue:', error);
}
processQueue();
}
}
// API to BootStrap the configuraitons
router.get('/', function(req, res, next) {
configProcessed = 0;
configList = new Array();
bootstrapConfig(); // pass all requests params
res.json("Success");
}
});
module.exports = router;
请帮忙。
我尝试过的一个修复,其工作原理如下
.on('end', async function() { // on last row of the CSV fie
if(configList.length > 0) {
let tempConfigList = [...configList];
configList.length = 0;
const release = await semaphore.acquire();
try {
await processConfigurations({configList:tempConfigList});
}catch(error) {
console.log(error);
}finally {
// Release the semaphore after processing each request
semaphore.release();
}
}
})
因此,现在我不再批量调用,而是同时处理配置,并且一批并发请求将等待另一批使用获得的锁机制完成。
我还删除了将任务添加到队列中的代码,然后进一步处理,现在不需要了。