在异步移动块中使用 `&mut self`

问题描述 投票:0回答:2

我有一个

WriterJob
结构。它的功能并不重要;它的作用是产生一个长时间运行的任务并为调用者提供一个 api 来检查作业的状态。

pub(crate) struct WriterJob {
    ...,
    status: Status,
}

impl WriterJob {
    async fn writer_job_impl(&mut self, rx: Receiver<WriteCommand>) {
       // Job implementation details. It's important that `self` is mutuable
       // since I update the job status
       // eg.
       // self.status = Status::Ready;
    }

    pub(crate) fn status(&self) -> Status {
       self.status
    }

    pub(crate) fn spawn_writer_job(&mut self) -> Sender<WriteCommand> {
        let (tx, rx) = mpsc::channel(10);

        let handle = tokio::spawn(async move {
            self.writer_job_impl(rx).await;
        });

        self.status = Status::Spawned;

        tx
    }

我得到这个错误:

error[E0521]: borrowed data escapes outside of associated function
  --> src/io/buffered_write.rs:92:22
   |
89 |       pub(crate) fn spawn_writer_job(&mut self) -> Sender<WriteCommand> {
   |                                      ---------
   |                                      |
   |                                      `self` is a reference that is only valid in the associated function body
   |                                      let's call the lifetime of this reference `'1`
...
92 |           let handle = tokio::spawn(async move {
   |  ______________________^
93 | |             self.writer_job_impl(rx).await;
94 | |         });
   | |          ^
   | |          |
   | |__________`self` escapes the associated function body here
   |            argument requires that `'1` must outlive `'static`

我想我明白编译器抱怨说它不知道

self
是否会与生成的任务一样长,因此会出现生命周期错误。但我不确定这样做的好方法是什么。一种可能性是使用
Arc<Mutex<Status>>
Arc<RwLock<Status>>
但我不喜欢这种方法,因为我可能需要在
self
中添加更多可变字段。有没有更清洁的方法来做到这一点?

asynchronous rust rust-tokio
2个回答
0
投票

一个可能的解决方案是使用内部可变性。通过将

Status
字段更改为
Cell<Status>
- 或原子类型,crossbeam 的
crossbeam::atomic::AtomicCell
浮现在脑海中 - 您可以删除
mut
和生命周期限制。

假设您将

status
字段移动到内部类型;我叫它
WriterJobInner
WriterJob
现在只拥有一个
Arc<WriterJobInner>

pub(crate) struct WriterJob {
    inner: std::sync::Arc<WriterJobInner>,
}

struct WriterJobImpl {
    pub status: std::cell::Cell<Status>,
}

struct WriteCommand;

#[derive(Copy, Clone)]
enum Status {
    None,
    Spawned,
    Ready,
}

如果你愿意,你可以为

Deref
实现
WriterJob
来简化一些访问。

您的

WriterJob
实施现在将更改为:

impl WriterJob {
    pub(crate) fn spawn_writer_job(&self) -> tokio::sync::mpsc::Sender<WriteCommand> {
        let (tx, rx) = tokio::sync::mpsc::channel(10);

        // Clone the Arc and move it to the background thread.
        let inner = self.inner.clone();
        let handle = tokio::spawn(async move {
            inner.writer_job_impl(rx).await;
        });

        // TODO: Set only to Spawned if still None
        self.inner.status.set(Status::Spawned);
        tx
    }

    pub(crate) fn status(&self) -> Status {
        self.inner.status()
    }
}

由于线程不需要传递

self
——既不是可变的也不是不可变的——错误消失了。此外,由于
status
现在是
Cell
,它也不需要
mut self

同样,你的

WriterJobInner
也只需要
&self

impl WriterJobInner {
    pub async fn writer_job_impl(&self, rx: tokio::sync::mpsc::Receiver<WriteCommand>) {
        // ...
        self.status.set(Status::Ready);
    }

    pub fn status(&self) -> Status {
        self.status.get()
    }
}

unsafe impl Send for WriterJobInner {}
unsafe impl Sync for WriterJobInner {}

不利的一面是,

WriterJobInner
类型必须同时是
Send
Sync
才能与
Arc<T>
一起使用,但无论如何您都在跨线程使用它。

请注意,在创建线程后将

status
设置为
Spawned
是竞争条件。你想尝试原子地设置值,只有当它还没有被设置为其他东西时。


在这种方法中,最有用的组合可能是将所有需要一起更改的东西捆绑在一起,然后为其使用外部可变性。

在此处使用

RwLock
(或类似),例如作为
inner: Arc<RwLock<..>>
,要求您将同步与异步代码混合(
std::sync::RwLock
async
方法中)或使您的访问器
async
(使用
tokio::sync::RwLock
时)。


0
投票

您不能假设

self
在执行
handle
期间没有被丢弃,这就是它抛出错误的原因。相反,您可以更改结构的设计以使其工作

  1. 使用需要在线程之间共享的数据创建子结构
struct WriteJobInner {
    ...,
    status: Status,
}
  1. 创建结构以对着色器数据执行操作
// you can use 'std::sync::Mutex' if you need to touch the data in non-async
// functions
use tokio::sync::Mutex;
use std::sync::Arc;

#[derive(Clone)]
pub(crate) struct Writejob {
    inner: Arc<Mutex<WriteJobInner>>,
}

另外,也许你可以用

tokio::sync::RwLock
替换 tokio 的互斥锁,如果许多线程尝试对数据执行不可变操作,这会好得多,这样可以让许多线程读取数据而不会阻塞其他只读线程。

  1. 最后一步是实现
    WriteJob
    功能,您可以使用
    &self
    而不是
    &mut self
    因为您修改了受保护的互斥量的数据。
impl WriterJob {
    async fn writer_job_impl(&self, rx: Receiver<WriteCommand>) {
       // Job implementation details. It's important that `self` is mutuable
       // since I update the job status
       // eg.
       // *self.inner.lock().await.status = Status::Ready;
    }

    pub(crate) async fn status(&self) -> Status {
       // make sure the `status` implements copy trait
       *self.inner.lock().await.status
    }

    pub(crate) fn spawn_writer_job(&self) -> Sender<WriteCommand> {
        let (tx, rx) = mpsc::channel(10);
        // creates a new instance `Self` but all the instances of `self.inner`
        // references to the same shared state because it's wrapped by the
        // shader pointer `Arc`
        let writer = self.clone();

        let handle = tokio::spawn(async move {
            writer.writer_job_impl(rx).await;
        });

        *self.inner.status.lock().await = Status::Spawned;

        tx
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.