MongoDB BulkWrite 内存成本

问题描述 投票:0回答:1

我正在使用 Go 的官方

mongodb
驱动程序。我获取一个 CSV 并逐行读取它,直到达到 1000 行,然后我解析数据并将其插入到数据库中。我假设它需要一个恒定的内存,因为批量写入的数据总是相同的(1000 个联系人)。但是,随着内存显着增加,情况并非如此。以下是有关上述查询的一些数据:

batchSize = 1000

Contacts - Memory consumed by bulkwrite
10k - 14 MB
20K - 30MB
30K - 59MB
40K - 137 MB
50K -241 MB

谁能解释为什么?

代码如下:

func (c *csvProcessor) processCSV(r io.Reader, headerMap map[string]int, emailDB *mongo.Database) error {
    //some code...
    csvReader := csv.NewReader(r)
    for {
        eofReached, err := c.processCSVBatch(csvReader, emailHash, smsHash, headerMap, emailDB)
        if err != nil {
            return errors.Wrap(err, "process CSV batch")
        }
        if eofReached {
            break
        }
    }
    return nil
}

func (c *csvProcessor) processCSVBatch(csvReader *csv.Reader, emailHash map[string]*userData, smsHash map[string]*userData, headerMap map[string]int, emailDB *mongo.Database) (bool, error) {
    var insertUsers, updateUsers, deleteUsers []interface{}
    var isEOFReached bool
    for i := 0; i < processCSVBatchSize; i++ {
        line, err := csvReader.Read()
        if err != nil {
            if err != io.EOF {
                return false, errors.Wrap(err, "read from input")
            }
            isEOFReached = true
            break
        }
        //some code
        insert, update, delete := c.dataMerger.mergeData(
            c.parseUser(line, headerMap),
            emailHash[stringToMD5(line[headerMap["email"]])],
            smsHashVal,
        )
        if insert != nil {
            insertUsers = append(insertUsers, insert)
        }
        if update != nil {
            updateUsers = append(updateUsers, update)
        }
        if delete != nil {
            deleteUsers = append(deleteUsers, delete)
        }
    }
    //update DB
    err := c.mongoDBUserHandler.saveUsers(emailDB, insertUsers, updateUsers, deleteUsers)
    if err != nil {
        return false, errors.Wrap(err, "save users")
    }
    return isEOFReached, nil
}

func (m *mongoDBUserHandler) saveUsers(emailDB *mongo.Database, insert, update, delete []interface{}) error {
    ctx := context.Background()
    // create the slice of write models
    var writes []mongo.WriteModel
    if len(insert) > 0 {
        writes = append(writes, m.getInsertWrites(insert)...)
    }
    if len(update) > 0 {
        writes = append(writes, m.getUpdateWrites(update)...)
    }
    if len(delete) > 0 {
        writes = append(writes, m.getDeleteWrites(delete)...)
    }
    if len(writes) == 0 {
        return nil
    }
    // run bulk write
    _, err := emailDB.
        Collection(userCollection).
        BulkWrite(ctx, writes, options.BulkWrite().SetOrdered(false))
    if err != nil {
        return errors.Wrap(err, "bulk write")
    }
    return nil
}
mongodb go memory-leaks bulkinsert
1个回答
0
投票

以下是变相的复制

    if len(insert) > 0 {
        writes = append(writes, m.getInsertWrites(insert)...)
    }
    if len(update) > 0 {
        writes = append(writes, m.getUpdateWrites(update)...)
    }
    if len(delete) > 0 {
        writes = append(writes, m.getDeleteWrites(delete)...)
    }

非常容易实现的目标:删除上面的那些行,更改 BulkWrite 以接受

writes
interface{}
这样您就可以重复使用 相同的后备数组,这应该可以节省一些内存。

© www.soinside.com 2019 - 2024. All rights reserved.