Actix SyncArbiter注册表

问题描述 投票:-1回答:1

我正在尝试使用SyncArbiter为不同的演员实现10个Redis连接池。假设我们有一个名为Bob的演员必须使用Redis演员来完成它的任务。

虽然这可以通过以下方式实现:

// crate, use and mod statements have been omitted to lessen clutter

/// FILE main.rs
pub struct AppState {
    pub redis: Addr<Redis>,
    pub bob: Addr<Bob>
}

fn main() {
    let system = actix::System::new("theatre");

    server::new(move || {
        let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
        let bob_addr = SyncArbiter::start(10, || Bob::new());

        let state = AppState {
            redis: redis_addr,
            bob: bob_addr
        };

        App::with_state(state).resource("/bob/eat", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::bob::eat)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

/// FILE controllers/bob.rs
pub struct Food {
  name: String,
  kcal: u64
}

pub fn eat(
    (req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
    state
        .bob
        .send(Eat::new(req.into_inner()))
        .from_err()
        .and_then(|res| match res {
            Ok(val) => {
                println!("==== BODY ==== {:?}", val);
                Ok(HttpResponse::Ok().into())
            }
            Err(_) => Ok(HttpResponse::InternalServerError().into()),
        })
}

/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
    pub client: Client
}

pub struct RunCommand(Cmd);

impl RunCommand {
    pub fn new(cmd: Cmd) -> Self {
        RunCommand(cmd)
    }
}

impl Message for RunCommand {
    type Result = Result<RedisResult<String>, ()>;
}

impl Actor for Redis {
    type Context = SyncContext<Self>;
}

impl Handler<RunCommand> for Redis {
    type Result = Result<RedisResult<String>, ()>;

    fn handle(&mut self, msg: RunCommand, _context: &mut Self::Context) -> Self::Result {
        println!("Redis received command!");
        Ok(Ok("OK".to_string()))
    }
}

impl Redis {
    pub fn new(url: &str) -> Result<Self, RedisError> {
        let client = match Client::open(url) {
            Ok(client) => client,
            Err(error) => return Err(error)
        };

        let redis = Redis {
            client: client,
        };

        Ok(redis)
    }
}

/// FILE actors/bob.rs
pub struct Bob;

pub struct Eat(Food);

impl Message for Eat {
    type Result = Result<Bob, ()>;
}

impl Actor for Eat {
    type Context = SyncContext<Self>;
}

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        println!("Bob received {:?}", &msg);

        // How to get a Redis actor and pass data to it here?

        Ok(msg.datapoint)
    }
}

impl Bob {
    pub fn new() -> () {
        Bob {}
    }
}

从Bob的上述句柄实现来看,目前还不清楚Bob如何获得Redis演员的地址。或发送任何消息到Actor中运行的任何SyncArbiter

使用常规ArbiterRegistry也可以实现同样的效果,但据我所知,Actix不允许多个相同的演员(例如我们不能使用常规的Arbiter启动10个Redis演员)。

要正式化我的问题:

  • 是否有RegistrySyncArbiter演员
  • 我可以在常规Arbiter中启动多个相同类型的actor
  • 是否有更好/更规范的方法来实现连接池

编辑

版本:

  • actix 0.7.9
  • actix_web 0.7.19
  • 期货=“0.1.26”
  • 生锈1.33.0
rust rust-actix actix-web
1个回答
0
投票

我自己找到了答案。

开箱即用,没有办法从注册表中检索带有ActorSyncContext

鉴于我上面的例子。对于演员Bob发送任何类型的消息给Redis演员,它需要知道Redis演员的地址。 Bob可以明确地获取Redis的地址 - 包含在发送给它的消息中或从某种共享状态读取。

我想要一个类似于Erlang的系统,所以我决定不通过消息传递演员的地址,因为它似乎太费力,容易出错,并且在我看来它违背了基于演员的并发模型的目的(因为没有一个演员可以发信息给任何其他人演员)。

因此,我调查了共享状态的想法,并决定实现我自己的SyncRegistry,这将是Actix标准Registry的模拟 - 这正是我想要的,但不是对于带有SyncContext的Actors。

这是我编写的天真解决方案:https://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177

通过以下设置:

fn main() {
    let system = actix::System::new("theatre");

    let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
    SyncRegistry::set(addr);
    let addr = SyncArbiter::start(10, || Bob::new());
    SyncRegistry::set(addr);


    server::new(move || {
        let state = AppState {};

        App::with_state(state).resource("/foo", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::foo::create)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

演员Bob可以从程序中的任何一点以下列方式获取Redis的地址:

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        let redis = match SyncRegistry::<Redis>::get() {
            Some(redis) => redis,
            _ => return Err(())
        };

        let cmd = redis::cmd("XADD")
            .arg("things_to_eat")
            .arg("*")
            .arg("data")
            .arg(&msg.0)
            .to_owned();

        redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.