如何在aws nodejs lambda中使用stream.pipeline?

问题描述 投票:0回答:1

我试图使用nodejs的lambda将mongodb光标的数据流转到一个s3文件中。

以下是我的一段代码。

我观察到的是,lambda没有等待管道完成,而是存在,所以文件没有写入s3。

但如果我把它作为一个独立的node.js脚本运行,同样可以正常工作。

const logger = require('./logger').logger;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const util = require('util');
const pipeline = util.promisify(stream.pipeline);

exports.handler =  async (event, context) => {


    await pipeline(
        client.db("somedb").collection("somecollection").aggregate(crtiriaObj).stream({transform: x => `${JSON.stringify(x)}\n`}),
        uploadFromStream()
    )

};

let uploadFromStream =  () => {

    let pass = new stream.PassThrough();
    let s3 = new s3Client();;


    let params = {Bucket: "bucketname", Key: "filename", Body: pass};

    s3.upload(params, function(err, data) {
        if (err) {
            logger.error(`Error uploading file ${fileName}`,err);
        } else {
            logger.info(`Successfully uploaded file: ${fileName}, result: ${JSON.stringify(data)}`);
        }

    });

    return pass;
};
node.js mongodb amazon-web-services stream pipeline
1个回答
0
投票

我最终做了没有async await时尚。

我的代码最终看起来像下面的片段.我也写了一篇关于它的博文在。https:/dev.toanandsunderramancopying-over-data-from-mongodb-to-s3-3j4g。

const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const pipeline = stream.pipeline;


//brute force method loading all the data into an array
exports.copyData = (event, context, callback) => {

    MongoClient.connect(getDBURI(), {
            useNewUrlParser: true,
            useUnifiedTopology: true
    }).then((dbConnection) => {

        pipeline(
            dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>)
                                        .stream({transform: x => convertToNDJSON(x)}),
            uploadDataToS3(callback),
            (err) => {
                if (err) {
                    console.log('Pipeline failed.', err);
                } else {
                    console.log('Pipeline succeeded.');
                }
            }
        )

    })


}
/**
 * Construct the DB URI based on the environment
 * @returns {string}
 */
const getDBURI = () => {
    //best practice is to fetch the password from AWS Parameter store
    return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};

//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
    return JSON.stringify(data) + "\n";
};

let uploadDataToS3 =  (callback) => {
    let env = process.env;
    let s3 = null;
    let pass = new stream.PassThrough();
    if (env === 'local') {
        s3  = new s3Client({
            accessKeyId: 'minioadmin' ,
            secretAccessKey: 'minioadmin' ,
            endpoint: 'http://host.docker.internal:9000' ,
            s3ForcePathStyle: true, // needed with minio?
            signatureVersion: 'v4'
        });
    } else {
        s3 = new s3Client();
    }
    //using multipart upload to speed up the process
    let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
    let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};

    s3.upload(params,opts, function(err, data) {
        if (err) {
            console.log(`Error uploading file ${file-name}`,err);
        } else {
            console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`);
        }
        callback();

    });
    return pass;

};
© www.soinside.com 2019 - 2024. All rights reserved.