Axios客户端批量异步调用一直在等待

问题描述 投票:0回答:1

对于下面的代码,我遇到了一个问题,即我的代码没有收到来自 Axios API 调用的响应,即使请求已到达服务器并且服务器正在响应。结果,我的代码一直在等待响应,并且批处理未完成。我已经在 Postman 中使用相同的标头/正文和 URL 测试了该请求,它正在按预期工作。

我认为这与 Async/Await 有关,但由于我不擅长 javascript/nodejs 语言,所以我无法找出下面代码中的问题。

我正在寻找一种解决方案来处理大型 CSV 文件并将请求批量发送到外部 API,并且对于单个批次同时发送请求。

var express = require('express');
var router = express.Router();

const fs = require('fs');
const csv = require('csv-stream');
const through2 = require('through2');
const { performance } = require('perf_hooks');
var axios = require('axios');

var configProcessed = 0;
var configList = null;

const MAX_THREADS = 5;
const workerPool = Array.from({ length: MAX_THREADS }, () => false);

// Create a task queue
const taskQueue = [];

// Function to find an available worker thread
function findAvailableWorker() {
    return workerPool.findIndex(worker => !worker);
}

// Function to call the the API for processing of confiurations
async function callAPI(data) {
  try{
        let response = await axios.request({
            method: 'PUT',
            url: url, 
            data: JSON.stringify(data), 
            headers:{'Content-Type': 'application/json', 
                    'Authorization': 'Basic '+ user}
        });
        
        configProcessed++;
        return response.data;
    }
    catch(error) {
        console.log(error);
        let message = '';
        if (error.response) {
        message = JSON.stringify(error.response.data);
        } else {
        message = "Unkown Error:" + error.message;
        }
        
        configProcessed++;
        return Promise.reject(message);
    }
}

// Function to process the single batch of concurrent requests
async function processConfigurations(taskData) {
    try {
      return await Promise.all(taskData.configList.map(config => {
        return callAPI(config);
      }));    
    } catch (error) {
      console.error("Error processing configurations:", error);
    }
}

// Function to run a task using an available worker
async function runTaskInPool(taskData) {
    let result = null;
    const workerIndex = findAvailableWorker();
    if (workerIndex !== -1) {
        workerPool[workerIndex] = true;
        try {
            result = await processConfigurations(taskData);
            workerPool[workerIndex] = false;
        } catch (error) {
            workerPool[workerIndex] = false;
            throw error;
        }
    } else {
        // Add task to the queue
        taskQueue.push(taskData);
    }
    return result;
}

// Function to process the bootstrapping for all the configurations available within CSV file
async function bootstrapConfig(params) {
  const stream = fs.createReadStream('./' + fileName +'.csv'); // Stream to process the large file line-by-line
    
  stream
  .pipe(csv.createStream({
        endLine : '\n',
        columns : ['COL1', 'COL2', 'COL3', 'COL4'],
        escapeChar : '"',
        enclosedChar : '"'
    }))
    .pipe(through2({ objectMode: true }, (row, enc, cb) => {
        
        config = //prepare config with row

        configList.push(config);
        
        cb(null, true); // Process next row in CSV  
    }))
    .on('data', async function (data) { // For every row of CSV file, when data is found
        if(configList.length == batchSize) { // Check if the size of the config list match with the concurrent requests
            let tempConfigList = [...configList];
            configList.length = 0;
            let response = await runTaskInPool({configList:tempConfigList});
        }
    })
    .on('end', async function() { // on last row of the CSV fie
        if(configList.length > 0) {
            let tempConfigList = [...configList];
            configList.length = 0;
            let response = await runTaskInPool({configList:tempConfigList});
            processQueue();
        } else {
            processQueue();
        }
        
    })
    .on('error', err => {
        console.error("Unhandled error while reading CSV file and processing");
    })
 
}

// Function to process pending tasks from the queue
async function processQueue() {
    if (taskQueue.length > 0) {
        const taskData = taskQueue.shift();
        try {
            await runTaskInPool(taskData);
        } catch (error) {
            console.error('Error processing task from queue:', error);
        }
        processQueue();
    }
}

// API to BootStrap the configuraitons
router.get('/', function(req, res, next) {

        configProcessed = 0;
        configList = new Array();
        bootstrapConfig(); // pass all requests params
        res.json("Success");
    }
    
});

module.exports = router;

请帮忙。

javascript node.js csv async-await axios
1个回答
0
投票

我尝试过的一个修复,其工作原理如下

.on('end', async function() { // on last row of the CSV fie
    if(configList.length > 0) {
        let tempConfigList = [...configList];
        configList.length = 0;
        
        const release = await semaphore.acquire();
        try {
            await processConfigurations({configList:tempConfigList});
        }catch(error) {
            console.log(error);
        }finally {
            // Release the semaphore after processing each request
            semaphore.release();
        }
    } 
    
})

因此,现在我不再批量调用,而是同时处理配置,并且一批并发请求将等待另一批使用获得的锁机制完成。

我还删除了将任务添加到队列中的代码,然后进一步处理,现在不需要了。

© www.soinside.com 2019 - 2024. All rights reserved.