无法在循环内建立rusqlite事务:使用移动的值,并且一次不能多次借用可变变量

问题描述 投票:0回答:2

为了使用rusqlite加快插入SQLite数据库的速度,我想在for循环内构建事务,并且仅每N次迭代提交一次。

以下代码可编译,但它会构建一个事务并一次性提交所有事务:

use rusqlite::{Connection, Result, NO_PARAMS};

fn main() -> Result<()> {
    let mut conn = Connection::open_in_memory()?;

    conn.execute(
        "CREATE TABLE entry (
            id   INTEGER PRIMARY KEY,
            data INTEGER
        )",
        NO_PARAMS,
    )?;

    let tx = conn.transaction()?;
    for i in 0..20 {
        tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
    }
    tx.commit()?;

    Ok(())
}

我的用例需要使用数百万个插入来构建事务,因此我想做的是积累事务,并在事务到达transaction_size时提交并重新开始新事务。 非编译版本如下所示:

let transaction_size = 5;
let tx = conn.transaction()?;
for i in 0..20 {
    if (i % transaction_size) == (transaction_size - 1) {
        tx.commit()?;
        let tx = conn.transaction()?;
    }
    tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
}

借阅检查器不允许这样做有两个原因。

error[E0382]: use of moved value: `tx`
  --> src/main.rs:18:13
   |
15 |     let tx = conn.transaction()?;
   |         -- move occurs because `tx` has type `rusqlite::transaction::Transaction<'_>`, which does not implement the `Copy` trait
...
18 |             tx.commit()?;
   |             ^^ value moved here, in previous iteration of loop

error[E0499]: cannot borrow `conn` as mutable more than once at a time
  --> src/main.rs:19:22
   |
15 |     let tx = conn.transaction()?;
   |              ---- first mutable borrow occurs here
...
19 |             let tx = conn.transaction()?;
   |                      ^^^^ second mutable borrow occurs here
20 |         }
21 |         tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
   |         -- first borrow later used here

第一个投诉对我来说很有意义。第二个不是那么多,因为将编译以下内容(但是我在每个事务中仅插入一行):

for i in 0..20 {
    let tx = conn.transaction()?;
    tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
    tx.commit()?;
}

我已经尝试在循环内使用let tx = if cond { tx.commit()?; conn.transaction()? },但是您需要else子句来进行检查。

我无法弄清楚如何使我的编译器满意的同时实现目标。也许有一些方法可以使用不安全的功能,但是我对Rust还是很陌生。

编辑

我忘了提到我想将我的迭代器视为一次性使用。

使用从@Sébastien中将用于将事务构建到do_batch中的逻辑分离的想法。我创建了这个版本,该版本将使用可变矢量累积必须添加到事务中的数据。然后,它将构建并提交大小为transaction_size的块的事务。

use rusqlite::{Connection, Result, Transaction, NO_PARAMS};
use std::vec::Vec;

fn do_batch<'a>(tx: &Transaction<'a>, transaction_accum: &Vec<i32>) -> Result<()> {
    for i in transaction_accum.iter() {
        tx.execute("INSERT INTO entry (data) values (?1)", &[i])?;
    }
    Ok(())
}

fn main() -> Result<()> {
    let mut conn = Connection::open_in_memory()?;

    conn.execute(
        "CREATE TABLE entry (
            id   INTEGER PRIMARY KEY,
            data INTEGER
        )",
        NO_PARAMS,
    )?;

    let transaction_size = 5;
    let mut transaction_accum: Vec<i32> = Vec::new();
    for i in 1..20 {
        transaction_accum.push(i);

        if (i % transaction_size) == (transaction_size - 1) {
            let tx = conn.transaction()?;
            do_batch(&tx, &transaction_accum)?;
            transaction_accum.clear();
            tx.commit()?;
        }
    }
    Ok(())
}

编辑2

[@Sébastien的另一个建议后,我偶然发现了itertools板条箱,该板条箱可让您分块迭代器的输出,从而提供以下简洁的解决方案。我唯一担心的是,为了制作块,在调用chunks时,整个迭代器都在幕后实现。是这样吗?

use rusqlite::{Connection, Result, Transaction, NO_PARAMS};
use std::vec::Vec;
use itertools::Itertools;


fn do_batch<'a>(tx: &Transaction<'a>, transaction_accum: &Vec<i32>) -> Result<()> {
    for i in transaction_accum.iter() {
        tx.execute("INSERT INTO entry (data) values (?1)", &[i])?;
    }
    Ok(())
}

fn main() -> Result<()> {
    let mut conn = Connection::open_in_memory()?;

    conn.execute(
        "CREATE TABLE entry (
            id   INTEGER PRIMARY KEY,
            data INTEGER
        )",
        NO_PARAMS,
    )?;

    let transaction_size = 5;
    let my_iter = 1..20; // this is really a WalkDir from the walkdir crate
    for chunk in &my_iter.into_iter().chunks(transaction_size) {
        let tx = conn.transaction()?;
        do_batch(&tx, &chunk.collect())?;
        tx.commit()?;
    }
    Ok(())
}
sqlite rust borrow-checker
2个回答
0
投票

以下代码段在第6行有一个重要的误解:

let transaction_size = 5;
let tx = conn.transaction()?;
for i in 0..20 {
    if (i % transaction_size) == (transaction_size - 1) {
        tx.commit()?;
        let tx = conn.transaction()?; // <-- HERE
    }
    tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
}

此行不会替换在第2行上创建的tx变量,而是会创建一个名为tx的新变量,该变量在if块的持续时间内隐藏第一个变量,并在结束了。因此,当您到达tx.execute时,您将尝试使用已经提交的事务而不是新事务。

您想要的是:

let transaction_size = 5;
let mut tx = conn.transaction()?; // <-- Note the `mut` so that we can change it later to a new one
for i in 0..20 {
    if (i % transaction_size) == (transaction_size - 1) {
        tx.commit()?;
        tx = conn.transaction()?; // <-- No `let` -> replace the existing `tx`
    }
    tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
}
tx.commit()?; // <- Don't forget to commit the last transaction.

0
投票

这是一个SQL问题,而不是Rust问题,但我将同时解释为什么会遇到此问题以及它在Rust中的显示方式。

这全都源于对事务数据库的基本误解,它适用于支持事务的每个RDBMS。事务的重点是打开服务器上可以视为单独的板块。然后,您要对此进行状态更改,例如添加或删除行,然后then将您的单独平板变成服务器的“真实”状态。根据您所使用的数据库引擎,实现的方式会有所不同,但是对于我们今天针对您的问题的目的,此类比可以实现。

而不是这样做,您是在打开交易,进行一次插入,然后立即用commit()交还给您。注意其签名:

commit()

正如我们所期望的,fn commit(self) -> Result<()> commit(),而不是self。通过提交(或回滚),您就是在告诉服务器您正在使用此事务完成

要解决此问题,您需要根据数据库确定如何进行处理。批处理是一个不错的主意,您已经发现了,但是您需要确保您可以承受一次批处理失败并重复的费用。因此,我们将进行一些拆分。

首先,我们将构建批处理生成器。我们将需要此,特别是如果我们打算重播一个批次:

&mut self

然后,我们围绕它构建结构:

fn do_batch<'a>(tx: &mut Transaction<'a>) -> Result<(), rusqlite::Error> {
    for i in 0..20 {
        tx.execute("INSERT INTO entry (data) values (?1", &[i])?;
    }
    Ok(())
}

如果可能的话,总是值得将关注点分离开,并且如果需要的话,总是值得进行一次交易;那就是他们在那里的目的。让您的函数构建批处理,然后以某种总体结构处理提交/回退行为。


正如您在评论中提到的,您正在走树。为此,我假设您已经展平了迭代器(即,您的N维树由一维迭代器表示),并且该迭代器位于fn do_tx(mut conn: Connection) -> Result<(), rusqlite::Error> { for i in 0..20 { // Open the TX let mut tx = conn.transaction()?; do_batch(&mut tx)?; // Do your error handling here. If the batch fails, you want to decide whether to retry or abort. tx.commit()?; } Ok(()) } 下。

当前没有在迭代器上定义的tree_walker方法,这是您所需要的。为简便起见,我们只需要转到chunks(),然后使用collect()。对于大多数工作负载来说,这应该不是问题,但是如果您发现此分配的大小太大,则可以相对容易地重新实现它。

Vec::chunks()
© www.soinside.com 2019 - 2024. All rights reserved.