异步 Rust:Future 无法使用 Redis 连接管理器在线程之间安全地共享

问题描述 投票:0回答:1

我正在使用 rust redis 库对数据库执行一些操作。

我有一个来自外部库的特征,我需要将其实现到我的结构中,以便按照我应该从 Redis 读取/写入值的方式。

这是有问题的函数如何定义的简化版本:

    /// Address space.
    type Address;

    /// Values space.
    type LocalValue;

    /// Memory error.
    type Error: Send + Sync + std::error::Error;


/// Address, LocalValue and Error are defined
    
/// Reads the words from the given addresses.
    fn read_from_db(
        &self,
        a: Vec<Self::Address>,
    ) -> impl Send + Sync + Future<Output = Result<Vec<Option<Self::LocalValue>>, Self::Error>>;

当我实现此功能时,如下所示:


    async fn read_from_db(
        &self,
        addresses: Vec<Address>,
    ) -> Result<Vec<Option<Self::LocalValue>>, Self::Error> {
    
        let refs: Vec<&Address> = addresses.iter().collect();
        let value = self.clone().connection.mget::<_, Vec<_>>(&refs).await?;
        Ok(value)
      
    }    

我有错误:

error: future cannot be shared between threads safely

|
119 | / async fn batch_read(
120 | | &self,
121 | | addresses: Vec<Address>,
122 | | ) -> Result<Vec<Option<Self::LocalValue>>, Self::Error> {
| |_____________________________________________________^ future returned by \read_from_db` is not `Sync``
note: future is not \Sync` as it awaits another future which is not `Sync``
let value = self.clone().connection.mget::<_, Vec<_>>(&refs).await?;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type \Pin<Box<dyn Future<Output = Result<Vec<Option</* some type */>>, RedisError>> + Send>>`, which is not `Sync`
note: required by a bound in /* some trait */
|
70 | ) -> impl Send + Sync + Future<Output = Result<Vec<Option</* some type */>>, Self::Error>>;
| ^^^^ required by this bound in /* some trait */

我尝试过使用锁、使用副本、释放内存,我也尝试过更改数据类型,但不起作用

asynchronous rust redis future
1个回答
0
投票

一种选择是使用

SyncWrapper
板条箱,如下所示:

use std::future::Future;
use std::pin::Pin;
use sync_wrapper::SyncFuture;

trait DbReader {
    type Address;
    type LocalValue;
    type Error: Send + Sync + std::error::Error;

    fn read_from_db(
        &self,
        a: Vec<Self::Address>,
    ) -> impl Send + Sync + Future<Output = Result<Vec<Option<Self::LocalValue>>, Self::Error>>;
}

fn problematic_future() -> Pin<Box<dyn Future<Output = ()> + Send>> {
    todo!()
}

struct MyHandler;
impl DbReader for MyHandler {
    type Address = ();
    type LocalValue = ();
    type Error = std::io::Error;

    async fn read_from_db(&self, a: Vec<Self::Address>) -> Result<Vec<Option<Self::LocalValue>>, Self::Error> {
        let better_future = SyncFuture::new(problematic_future());
        better_future.await;

        todo!()
    }
}

但是,正如对您问题的评论中所述,不需要这样做。

Future
只需要是
Send
,而不是
Sync
,所以真正的问题在于您的特质已经提供了。

  1. 如果
    read_from_db
    方法来自您自己的代码,您应该能够重构它并删除
    Sync
  2. 如果它来自某个外部库,您应该将其报告给维护人员,因为它实际上不太可能被需要。

小免责声明:

SyncWrapper
解决方案有效的部分原因正是因为
Future
实际上不需要是
Sync
。 Future 没有任何
&self
方法(您需要有一个可变引用来轮询它),因此
SyncWrapper
可以安全地包装这样的 future,而无需
Arc
或类似的共享所有权结构。

© www.soinside.com 2019 - 2024. All rights reserved.