我有一个
WriterJob
结构。它的功能并不重要;它的作用是产生一个长时间运行的任务并为调用者提供一个 api 来检查作业的状态。
pub(crate) struct WriterJob {
...,
status: Status,
}
impl WriterJob {
async fn writer_job_impl(&mut self, rx: Receiver<WriteCommand>) {
// Job implementation details. It's important that `self` is mutuable
// since I update the job status
// eg.
// self.status = Status::Ready;
}
pub(crate) fn status(&self) -> Status {
self.status
}
pub(crate) fn spawn_writer_job(&mut self) -> Sender<WriteCommand> {
let (tx, rx) = mpsc::channel(10);
let handle = tokio::spawn(async move {
self.writer_job_impl(rx).await;
});
self.status = Status::Spawned;
tx
}
我得到这个错误:
error[E0521]: borrowed data escapes outside of associated function
--> src/io/buffered_write.rs:92:22
|
89 | pub(crate) fn spawn_writer_job(&mut self) -> Sender<WriteCommand> {
| ---------
| |
| `self` is a reference that is only valid in the associated function body
| let's call the lifetime of this reference `'1`
...
92 | let handle = tokio::spawn(async move {
| ______________________^
93 | | self.writer_job_impl(rx).await;
94 | | });
| | ^
| | |
| |__________`self` escapes the associated function body here
| argument requires that `'1` must outlive `'static`
我想我明白编译器抱怨说它不知道
self
是否会与生成的任务一样长,因此会出现生命周期错误。但我不确定这样做的好方法是什么。一种可能性是使用 Arc<Mutex<Status>>
或 Arc<RwLock<Status>>
但我不喜欢这种方法,因为我可能需要在 self
中添加更多可变字段。有没有更清洁的方法来做到这一点?
一个可能的解决方案是使用内部可变性。通过将
Status
字段更改为 Cell<Status>
- 或原子类型,crossbeam 的 crossbeam::atomic::AtomicCell
浮现在脑海中 - 您可以删除 mut
和生命周期限制。
假设您将
status
字段移动到内部类型;我叫它WriterJobInner
。 WriterJob
现在只拥有一个Arc<WriterJobInner>
:
pub(crate) struct WriterJob {
inner: std::sync::Arc<WriterJobInner>,
}
struct WriterJobImpl {
pub status: std::cell::Cell<Status>,
}
struct WriteCommand;
#[derive(Copy, Clone)]
enum Status {
None,
Spawned,
Ready,
}
如果你愿意,你可以为
Deref
实现WriterJob
来简化一些访问。
您的
WriterJob
实施现在将更改为:
impl WriterJob {
pub(crate) fn spawn_writer_job(&self) -> tokio::sync::mpsc::Sender<WriteCommand> {
let (tx, rx) = tokio::sync::mpsc::channel(10);
// Clone the Arc and move it to the background thread.
let inner = self.inner.clone();
let handle = tokio::spawn(async move {
inner.writer_job_impl(rx).await;
});
// TODO: Set only to Spawned if still None
self.inner.status.set(Status::Spawned);
tx
}
pub(crate) fn status(&self) -> Status {
self.inner.status()
}
}
由于线程不需要传递
self
——既不是可变的也不是不可变的——错误消失了。此外,由于status
现在是Cell
,它也不需要mut self
。
同样,你的
WriterJobInner
也只需要&self
:
impl WriterJobInner {
pub async fn writer_job_impl(&self, rx: tokio::sync::mpsc::Receiver<WriteCommand>) {
// ...
self.status.set(Status::Ready);
}
pub fn status(&self) -> Status {
self.status.get()
}
}
unsafe impl Send for WriterJobInner {}
unsafe impl Sync for WriterJobInner {}
不利的一面是,
WriterJobInner
类型必须同时是 Send
和 Sync
才能与 Arc<T>
一起使用,但无论如何您都在跨线程使用它。
请注意,在创建线程后将
status
设置为Spawned
是竞争条件。你想尝试原子地设置值,只有当它还没有被设置为其他东西时。
在这种方法中,最有用的组合可能是将所有需要一起更改的东西捆绑在一起,然后为其使用外部可变性。
在此处使用
RwLock
(或类似),例如作为inner: Arc<RwLock<..>>
,要求您将同步与异步代码混合(std::sync::RwLock
在async
方法中)或使您的访问器async
(使用tokio::sync::RwLock
时)。
您不能假设
self
在执行 handle
期间没有被丢弃,这就是它抛出错误的原因。相反,您可以更改结构的设计以使其工作
struct WriteJobInner {
...,
status: Status,
}
// you can use 'std::sync::Mutex' if you need to touch the data in non-async
// functions
use tokio::sync::Mutex;
use std::sync::Arc;
#[derive(Clone)]
pub(crate) struct Writejob {
inner: Arc<Mutex<WriteJobInner>>,
}
另外,也许你可以用
tokio::sync::RwLock
替换 tokio 的互斥锁,如果许多线程尝试对数据执行不可变操作,这会好得多,这样可以让许多线程读取数据而不会阻塞其他只读线程。
WriteJob
功能,您可以使用 &self
而不是 &mut self
因为您修改了受保护的互斥量的数据。impl WriterJob {
async fn writer_job_impl(&self, rx: Receiver<WriteCommand>) {
// Job implementation details. It's important that `self` is mutuable
// since I update the job status
// eg.
// *self.inner.lock().await.status = Status::Ready;
}
pub(crate) async fn status(&self) -> Status {
// make sure the `status` implements copy trait
*self.inner.lock().await.status
}
pub(crate) fn spawn_writer_job(&self) -> Sender<WriteCommand> {
let (tx, rx) = mpsc::channel(10);
// creates a new instance `Self` but all the instances of `self.inner`
// references to the same shared state because it's wrapped by the
// shader pointer `Arc`
let writer = self.clone();
let handle = tokio::spawn(async move {
writer.writer_job_impl(rx).await;
});
*self.inner.status.lock().await = Status::Spawned;
tx
}
}