如何使用Mongoose使用MongoDB事务?

问题描述 投票:0回答:3

我正在使用 MongoDB Atlas 云(https://cloud.mongodb.com/)和 Mongoose 库。

我尝试使用事务概念创建多个文档,但它不起作用。 我没有收到任何错误。但是,回滚似乎无法正常工作。

app.js

//*** more code here

var app = express();

require('./models/db');

//*** more code here

模型/db.js

var mongoose = require( 'mongoose' );

// Build the connection string
var dbURI = 'mongodb+srv://mydb:pass@cluster0-****.mongodb.net/mydb?retryWrites=true';

// Create the database connection
mongoose.connect(dbURI, {
  useCreateIndex: true,
  useNewUrlParser: true,
});

// Get Mongoose to use the global promise library
mongoose.Promise = global.Promise;

模型/user.js

const mongoose = require("mongoose");

const UserSchema = new mongoose.Schema({
  userName: {
    type: String,
    required: true
  },
  pass: {
    type: String,
    select: false
  }
});

module.exports = mongoose.model("User", UserSchema, "user");

myroute.js

const db = require("mongoose");
const User = require("./models/user");

router.post("/addusers", async (req, res, next) => {

    const SESSION = await db.startSession();

    await SESSION.startTransaction();

    try {

          const newUser = new User({
            //*** data for user ***
          });
          await newUser.save();

          //*** for test purpose, trigger some error ***
          throw new Error("some error");

          await SESSION.commitTransaction();

          //*** return data 

    } catch (error) {
            await SESSION.abortTransaction();
    } finally {
            SESSION.endSession();
    }    

 });

上面的代码工作没有错误,但它仍然在数据库中创建用户。它假设回滚创建的用户并且集合应该为空。

我不知道我在这里错过了什么。谁能告诉我这里出了什么问题吗?

应用程序、模型、架构和路由器位于不同的文件中。

javascript node.js mongodb mongoose mongodb-query
3个回答
23
投票

您需要在事务期间处于活动状态的所有读/写操作的选项中包含

session
。只有这样它们才真正应用于事务范围,您可以在其中回滚它们。

作为更完整的列表,并且仅使用更经典的

Order/OrderItems
建模,对于大多数具有一定关系事务经验的人来说应该非常熟悉:

const { Schema } = mongoose = require('mongoose');

// URI including the name of the replicaSet connecting to
const uri = 'mongodb://localhost:27017/trandemo?replicaSet=fresh';
const opts = { useNewUrlParser: true };

// sensible defaults
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
mongoose.set('useFindAndModify', false);
mongoose.set('useCreateIndex', true);

// schema defs

const orderSchema = new Schema({
  name: String
});

const orderItemsSchema = new Schema({
  order: { type: Schema.Types.ObjectId, ref: 'Order' },
  itemName: String,
  price: Number
});

const Order = mongoose.model('Order', orderSchema);
const OrderItems = mongoose.model('OrderItems', orderItemsSchema);

// log helper

const log = data => console.log(JSON.stringify(data, undefined, 2));

// main

(async function() {

  try {

    const conn = await mongoose.connect(uri, opts);

    // clean models
    await Promise.all(
      Object.entries(conn.models).map(([k,m]) => m.deleteMany())
    )

    let session = await conn.startSession();
    session.startTransaction();

    // Collections must exist in transactions
    await Promise.all(
      Object.entries(conn.models).map(([k,m]) => m.createCollection())
    );

    let [order, other] = await Order.insertMany([
      { name: 'Bill' },
      { name: 'Ted' }
    ], { session });

    let fred = new Order({ name: 'Fred' });
    await fred.save({ session });

    let items = await OrderItems.insertMany(
      [
        { order: order._id, itemName: 'Cheese', price: 1 },
        { order: order._id, itemName: 'Bread', price: 2 },
        { order: order._id, itemName: 'Milk', price: 3 }
      ],
      { session }
    );

    // update an item
    let result1 = await OrderItems.updateOne(
      { order: order._id, itemName: 'Milk' },
      { $inc: { price: 1 } },
      { session }
    );
    log(result1);

    // commit
    await session.commitTransaction();

    // start another
    session.startTransaction();

    // Update and abort
    let result2 = await OrderItems.findOneAndUpdate(
      { order: order._id, itemName: 'Milk' },
      { $inc: { price: 1 } },
      { 'new': true, session }
    );
    log(result2);

    await session.abortTransaction();

    /*
     * $lookup join - expect Milk to be price: 4
     *
     */

    let joined = await Order.aggregate([
      { '$match': { _id: order._id } },
      { '$lookup': {
        'from': OrderItems.collection.name,
        'foreignField': 'order',
        'localField': '_id',
        'as': 'orderitems'
      }}
    ]);
    log(joined);


  } catch(e) {
    console.error(e)
  } finally {
    mongoose.disconnect()
  }

})()

因此,我通常建议以小写形式调用变量

session
,因为这是所有操作都需要的“选项”对象的键名称。保持小写约定也允许使用 ES6 对象分配之类的东西:

const conn = await mongoose.connect(uri, opts);

...

let session = await conn.startSession();
session.startTransaction();

此外,猫鼬交易文档有点误导,或者至少它可以更具描述性。示例中所指的

db
实际上是 Mongoose Connection 实例,而不是底层
Db
甚至是
mongoose
全局导入,因为有些人可能会误解这一点。请注意,在清单和上面的摘录中,这是从
mongoose.connect()
获得的,并且应该保存在您的代码中,作为您可以从共享导入访问的内容。

或者,您甚至可以在建立连接后的任何时间,通过

mongoose.connection
属性在模块化代码中获取此信息。这在服务器路由处理程序等内部通常是安全的,因为调用代码时将会有一个数据库连接。 代码还演示了

session

在不同模型方法中的用法:


let [order, other] = await Order.insertMany([ { name: 'Bill' }, { name: 'Ted' } ], { session }); let fred = new Order({ name: 'Fred' }); await fred.save({ session });

所有基于 
find()

的方法以及基于

update()
insert()
delete()
的方法都有一个最终的“选项块”,其中需要此会话键和值。
save()
方法的唯一参数是这个选项块。这就是告诉 MongoDB 将这些操作应用于该引用会话上的当前事务。

以大致相同的方式,在提交事务之前,对

find()

或类似的任何未指定

session
选项的请求在该事务正在进行时看不到数据的状态。修改后的数据状态仅在事务完成后可用于其他操作。请注意,这会对写入产生影响,如
文档
中所述。 发出“中止”命令时:

// Update and abort let result2 = await OrderItems.findOneAndUpdate( { order: order._id, itemName: 'Milk' }, { $inc: { price: 1 } }, { 'new': true, session } ); log(result2); await session.abortTransaction();

活动事务上的任何操作都会从状态中删除并且不会应用。因此,它们对于之后的结果操作是不可见的。在此示例中,文档中的值会递增,并将显示当前会话中检索到的值 
5

。但是,在

session.abortTransaction()
之后,文档的先前状态将恢复。请注意,任何不在同一会话上读取数据的全局上下文,除非提交,否则不会看到状态更改。

这应该给出总体概述。可以添加更多的复杂性来处理不同级别的写入失败和重试,但这已经在文档和许多示例中广泛涵盖,或者可以回答更具体的问题。

输出

作为参考,所包含列表的输出如下所示:

Mongoose: orders.deleteMany({}, {}) Mongoose: orderitems.deleteMany({}, {}) Mongoose: orders.insertMany([ { _id: 5bf775986c7c1a61d12137dd, name: 'Bill', __v: 0 }, { _id: 5bf775986c7c1a61d12137de, name: 'Ted', __v: 0 } ], { session: ClientSession("80f827fe077044c8b6c0547b34605cb2") }) Mongoose: orders.insertOne({ _id: ObjectId("5bf775986c7c1a61d12137df"), name: 'Fred', __v: 0 }, { session: ClientSession("80f827fe077044c8b6c0547b34605cb2") }) Mongoose: orderitems.insertMany([ { _id: 5bf775986c7c1a61d12137e0, order: 5bf775986c7c1a61d12137dd, itemName: 'Cheese', price: 1, __v: 0 }, { _id: 5bf775986c7c1a61d12137e1, order: 5bf775986c7c1a61d12137dd, itemName: 'Bread', price: 2, __v: 0 }, { _id: 5bf775986c7c1a61d12137e2, order: 5bf775986c7c1a61d12137dd, itemName: 'Milk', price: 3, __v: 0 } ], { session: ClientSession("80f827fe077044c8b6c0547b34605cb2") }) Mongoose: orderitems.updateOne({ order: ObjectId("5bf775986c7c1a61d12137dd"), itemName: 'Milk' }, { '$inc': { price: 1 } }, { session: ClientSession("80f827fe077044c8b6c0547b34605cb2") }) { "n": 1, "nModified": 1, "opTime": { "ts": "6626894672394452998", "t": 139 }, "electionId": "7fffffff000000000000008b", "ok": 1, "operationTime": "6626894672394452998", "$clusterTime": { "clusterTime": "6626894672394452998", "signature": { "hash": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "keyId": 0 } } } Mongoose: orderitems.findOneAndUpdate({ order: ObjectId("5bf775986c7c1a61d12137dd"), itemName: 'Milk' }, { '$inc': { price: 1 } }, { session: ClientSession("80f827fe077044c8b6c0547b34605cb2"), upsert: false, remove: false, projection: {}, returnOriginal: false }) { "_id": "5bf775986c7c1a61d12137e2", "order": "5bf775986c7c1a61d12137dd", "itemName": "Milk", "price": 5, "__v": 0 } Mongoose: orders.aggregate([ { '$match': { _id: 5bf775986c7c1a61d12137dd } }, { '$lookup': { from: 'orderitems', foreignField: 'order', localField: '_id', as: 'orderitems' } } ], {}) [ { "_id": "5bf775986c7c1a61d12137dd", "name": "Bill", "__v": 0, "orderitems": [ { "_id": "5bf775986c7c1a61d12137e0", "order": "5bf775986c7c1a61d12137dd", "itemName": "Cheese", "price": 1, "__v": 0 }, { "_id": "5bf775986c7c1a61d12137e1", "order": "5bf775986c7c1a61d12137dd", "itemName": "Bread", "price": 2, "__v": 0 }, { "_id": "5bf775986c7c1a61d12137e2", "order": "5bf775986c7c1a61d12137dd", "itemName": "Milk", "price": 4, "__v": 0 } ] } ]



0
投票
我认为这是开始使用 mongoose 执行事务的最快方法

const mongoose = require("mongoose"); // starting session on mongoose default connection const session = await mongoose.startSession(); mongoose.connection.transaction(async function executor(session) { try { // creating 3 collections in isolation with atomicity const price = new Price(priceSchema); const variant = new Variant(variantSchema); const item = new Item(itemSchema); await price.save({ session }); await variant.save({ session }); // throw new Error("opps some error in transaction"); return await item.save({ session }); } catch (err) { console.log(err); } });



0
投票

const mongooseConnection = mongoose.createConnection(url) const session = await mongooseConnection.startSession() try { session.startSession() // <Make DB calls> with providing session await User.create([{ name: 'a' }], { session }) await User.insertMany([{ name: 'b' }, { name: 'c' }], { session }) const userD = new User({ name: 'd' }) await userD.save({ session }) await User.updateOne({ name: 'e' }, { name: 'f' }, { session }) // </Make DB calls> // commit your changes to the database if everything went fine await session.commitTransaction() } catch(err) { // Make sure you "undo" everything by aborting the transaction await session.abortTransaction() console.error(err) } finally { // In all cases we end the session properly await session.end() }

	
© www.soinside.com 2019 - 2024. All rights reserved.