TestDriver ------ GracefulShutdowner ------- PosAccessor
                                         |
                                         --- PeerAccessor
                                         |
                                         --- DbAccessor

위와 같이 actor들을 구성해볼 수 있다. 다만, 본 예제에서는 actor broker의 사용법을 위해, actor간 직접 메시지 교환하기 보다는 broker에 issue/subscribe하는 방식을 사용한다. 그럴 경우 topology는, 아래와 같아진다.

TestDriver      GracefulShutdowner   PosAccessor  PeerAccessor  DbAccessor
    |                     |               |             |            |
    ------------------------------------------------------------------
                                   |
                             ActixBroker
  • TestDriver -> GracefulShutdowner : 3 초후 graceful shutdown 하라는 메시지 보냄
  • GracefulShutdowner -> PosAccessor, PeerAccessor, DbAccessor: shutdown을 준비하라는 메시지 보냄
  • PosAccessor, PeerAccessor, DbAccessor -> GracefulShutdowner: shutdown 준비되었다는 메시지 보냄
  • GracefulShotdowner: Actor System을 종료시킴
use std::thread;
use actix::prelude::*;
use actix_broker::{Broker, BrokerIssue, BrokerSubscribe, SystemBroker};
use std::time::Duration;

type BrokerType = SystemBroker;

/***
Actors
 */
// Broadcast "Prepare-Shutdown"
// Aggregate 3 * "I'm ready to be shut down"
// Once three ready messages have arrived, Stop the actor system
struct GracefulShutdowner {
    num_of_workers_to_wait: u32,
    num_received_ready_for_shutdown: u32,
}

struct PosAccessor {
    shutdown_prepared: bool,
}
struct PeerAccessor {
    shutdown_prepared: bool,
}
struct DbAccessor {
    shutdown_prepared: bool,
}
struct TestDriver;

/***
Actor Messages
 */
#[derive(Debug, Clone, Message)]
#[rtype(result = "()")]
struct GracefulShutdownMsg;
#[derive(Debug, Clone, Message)]
#[rtype(result = "()")]
struct PrepareShutdownMsg;
#[derive(Debug, Clone, Message)]
#[rtype(result = "()")]
struct ReadyForShutdownMsg {
    who_am_i: String,
}

/***
Customizing Actor Init logic
 */
impl Actor for GracefulShutdowner {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        println!("GracefulShutdowner started");
        self.subscribe_sync::<BrokerType, GracefulShutdownMsg>(ctx);
        self.subscribe_sync::<BrokerType, ReadyForShutdownMsg>(ctx);
    }
}
impl Actor for PosAccessor {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        println!("PosAccessor started");
        self.subscribe_sync::<BrokerType, PrepareShutdownMsg>(ctx);
    }
}
impl Actor for PeerAccessor {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        println!("PeerAccessor started");
        self.subscribe_sync::<BrokerType, PrepareShutdownMsg>(ctx);
    }
}
impl Actor for DbAccessor {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        println!("DbAccessor started");
        self.subscribe_sync::<BrokerType, PrepareShutdownMsg>(ctx);
    }
}
impl Actor for TestDriver {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        println!("TestDriver started. After 3 seconds, we inject shutdown message");
        thread::sleep(Duration::from_secs(3));
        println!("TestDriver is injecting graceful shutdown request");
        self.issue_async::<BrokerType, _>(GracefulShutdownMsg);
    }
}

/***
Actor Message Handlers
 */
impl Handler<GracefulShutdownMsg> for GracefulShutdowner {
    type Result = ();

    fn handle(&mut self, msg: GracefulShutdownMsg, _ctx: &mut Self::Context) {
        println!("GracefulShutdowner is starting to shut down... msg = {:?}", msg);
        self.num_received_ready_for_shutdown = 0;
        self.issue_async::<BrokerType, _>(PrepareShutdownMsg);
    }
}
impl Handler<ReadyForShutdownMsg> for GracefulShutdowner {
    type Result = ();

    fn handle(&mut self, msg: ReadyForShutdownMsg, ctx: &mut Self::Context) -> Self::Result {
        println!("GracefulShutdowner has received a ready message from {}", msg.who_am_i);
        self.num_received_ready_for_shutdown += 1;
        if self.num_received_ready_for_shutdown < self.num_of_workers_to_wait {
            println!("GracefulShutdowner is still waiting for more ready message... {}/{}",
                     self.num_received_ready_for_shutdown, self.num_of_workers_to_wait);
        } else {
            println!("GracefulShutdowner has received ready messages from all participants... {}/{}. Stopping actor system...",
                     self.num_received_ready_for_shutdown, self.num_of_workers_to_wait);
            System::current().stop();
            println!("Bye!");
        }
    }
}

impl Handler<PrepareShutdownMsg> for PosAccessor {
    type Result = ();

    fn handle(&mut self, msg: PrepareShutdownMsg, ctx: &mut Self::Context) -> Self::Result {
        if !self.shutdown_prepared {
            println!("PosAccessor is preparing for a shutdown... msg = {:?}", msg);
            println!("PosAccessor has finished its all outstanding I/Os to POS");
            self.issue_async::<BrokerType, _>(
                ReadyForShutdownMsg { who_am_i : "PosAccessor".to_string() }
            );
            self.shutdown_prepared = true;
        } else {
            println!("PosAccessor is already prepared for a shutdown... msg = {:?} will be discarded", msg);
        }
    }
}

impl Handler<PrepareShutdownMsg> for DbAccessor {
    type Result = ();

    fn handle(&mut self, msg: PrepareShutdownMsg, ctx: &mut Self::Context) -> Self::Result {
        if !self.shutdown_prepared {
            println!("DbAccessor is preparing for a shutdown... msg = {:?}", msg);
            println!("DbAccessor has finished its all outstanding I/Os to DB");
            self.issue_async::<BrokerType, _>(
                ReadyForShutdownMsg { who_am_i : "DbAccessor".to_string() }
            );
            self.shutdown_prepared = true;
        } else {
            println!("DbAccessor is already prepared for a shutdown... msg = {:?} will be discarded", msg);
        }
    }
}

impl Handler<PrepareShutdownMsg> for PeerAccessor {
    type Result = ();

    fn handle(&mut self, msg: PrepareShutdownMsg, ctx: &mut Self::Context) -> Self::Result {
        if !self.shutdown_prepared {
            println!("PeerAccessor is preparing for a shutdown... msg = {:?}", msg);
            println!("PeerAccessor has finished its all outstanding I/Os to Peer Replicator");
            self.issue_async::<BrokerType, _>(
                ReadyForShutdownMsg { who_am_i : "PeerAccessor".to_string() }
            );
            self.shutdown_prepared = true;
        } else {
            println!("PeerAccessor is already prepared for a shutdown... msg = {:?} will be discarded", msg);
        }
    }
}

fn main() {
    println!("Starting");
    let sys = System::new();

    sys.block_on(async {
        let shutdowner = GracefulShutdowner {
            num_of_workers_to_wait: 3,
            num_received_ready_for_shutdown: 0,
        };
        shutdowner.start();
        PosAccessor { shutdown_prepared: false }.start();
        PeerAccessor { shutdown_prepared: false }.start();
        DbAccessor { shutdown_prepared: false }.start();
        TestDriver.start();
    });
    sys.run().unwrap();
    println!("Done");
}

실행하면,

$ cargo run
Starting
GracefulShutdowner started
PosAccessor started
PeerAccessor started
DbAccessor started
TestDriver started. After 3 seconds, we inject shutdown message
TestDriver is injecting graceful shutdown request
GracefulShutdowner is starting to shut down... msg = GracefulShutdownMsg
PosAccessor is preparing for a shutdown... msg = PrepareShutdownMsg
PosAccessor has finished its all outstanding I/Os to POS
PeerAccessor is preparing for a shutdown... msg = PrepareShutdownMsg
PeerAccessor has finished its all outstanding I/Os to Peer Replicator
DbAccessor is preparing for a shutdown... msg = PrepareShutdownMsg
DbAccessor has finished its all outstanding I/Os to DB
GracefulShutdowner has received a ready message from PosAccessor
GracefulShutdowner is still waiting for more ready message... 1/3
GracefulShutdowner has received a ready message from PeerAccessor
GracefulShutdowner is still waiting for more ready message... 2/3
GracefulShutdowner has received a ready message from DbAccessor
GracefulShutdowner has received ready messages from all participants... 3/3. Stopping actor system...
Bye!
Done