actix <2> - grpc tonic integration
actix runtime에서 grpc tonic을 사용할 수 있는 방법의 핵심은, Box::pin()
을 사용하는 것.
다음의 예제는, 10개의 grpc client를 사용해서 HelloRequest를 총 1만개 보내고, 최종적으로
각 actor로부터 report를 수집해서 출력하는 예제임.
예제 코드는 tonic 공식 repo
를 참조하였고, Box::pin() 을 사용한 async execution 해결책은 async-actix를
참조하였음. broker는 여기서 사용하지 않았음. topology는 다음과 같음.
main runtime ---- HelloworldClient 0 ---
|-- HelloworldClient 1 --|
|-- HelloworldClient 2 --|------ (grpc channel) ---> GreeterServer
|-- ... --|
--- HelloworldClient 9 ---
src/helloworld-client-actor.rs
use actix::prelude::*;
use hello_world::greeter_client::GreeterClient;
use hello_world::HelloRequest;
use rand::Rng;
pub mod hello_world {
tonic::include_proto!("helloworld");
}
/***
Actors
*/
struct HelloworldClient {
greeter_client: GreeterClient<tonic::transport::Channel>,
num_reqs_processed: u64,
}
/***
Actor Messages
*/
#[derive(Debug, Clone, Message)]
#[rtype(result = "()")]
struct ReqForSayHello;
#[derive(Debug, Clone, Message)]
#[rtype(result = "u64")]
struct ReqForReport;
/***
Customizing Actor Init logic
*/
impl Actor for HelloworldClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("HelloworldClient started");
}
}
/***
Actor Handlers
*/
impl Handler<ReqForSayHello> for HelloworldClient {
type Result = ();
fn handle(&mut self, msg: ReqForSayHello, ctx: &mut Self::Context) -> Self::Result {
let addr = ctx.address();
println!("{:?} has received ReqForSayHello. Issuing gRPC to server...", addr);
let mut grpc_client = self.greeter_client.clone();
// This enables "async execution" within the actix's actor runtime (backed by tokio)
let fut = Box::pin(
async move {
let request = tonic::Request::new(HelloRequest{name: "tonic".to_string()});
let response = grpc_client.say_hello(request).await;
match response {
Ok(_) => {},
Err(e) => {
println!("failed to invoke say_hello(), caused by {:?}", e);
}
}
}
);
let actor_future = fut.into_actor(self);
ctx.spawn(actor_future);
self.num_reqs_processed += 1; // ideally, this should be Arc::clone()'d and passed to the actor future
}
}
impl Handler<ReqForReport> for HelloworldClient {
type Result = u64;
fn handle(&mut self, msg: ReqForReport, ctx: &mut Self::Context) -> Self::Result {
self.num_reqs_processed
}
}
fn main() {
let sys = System::new();
sys.block_on(async {
let mut client = GreeterClient::connect("http://[::1]:50051").await.unwrap();
let mut actor_addrs = Vec::new();
// Spawn 10 grpc actors
for _ in 0..10 {
let grpc_cloned = client.clone();
let hello_client = HelloworldClient {
greeter_client: grpc_cloned,
num_reqs_processed: 0,
};
let addr = hello_client.start();
actor_addrs.push(addr);
}
// Deliver 10000 messages to those random actors
for _ in 0..10000 {
let rand_idx = rand::thread_rng().gen_range(0..actor_addrs.len());
let addr = actor_addrs.get(rand_idx);
match addr {
None => { panic!("failed to pick a random actor addr!") }
Some(addr) => {
addr.do_send(ReqForSayHello);
}
}
}
// Synchronously "ask" those actors for the reports
let mut report = Vec::new();
for (i, actor_addr) in actor_addrs.iter().enumerate() {
let res = actor_addr.send(ReqForReport).await;
report.push(format!("actor= {:?}, num_reqs_processed = {:?}", i, res.unwrap()));
}
let r = report.join("\n").to_string();
// Print out the report
println!("{}", r);
});
sys.run().unwrap();
}
src/helloworld-server.rs
use tonic::{transport::Server, Request, Response, Status};
use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
pub mod hello_world {
tonic::include_proto!("helloworld"); // The string specified here must match the proto package name
}
#[derive(Debug, Default)]
pub struct MyGreeter {}
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>, // Accept request of type HelloRequest
) -> Result<Response<HelloReply>, Status> { // Return an instance of type HelloReply
println!("Got a request: {:?}", request);
let reply = hello_world::HelloReply {
message: format!("Hello {}!", request.into_inner().name).into(), // We must use .into_inner() as the fields of gRPC requests and responses are private
};
Ok(Response::new(reply)) // Send back our formatted greeting
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let greeter = MyGreeter::default();
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;
Ok(())
}
proto/helloworld.proto
syntax = "proto3";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
cargo.toml
[package]
name = "actix-broker-test"
version = "0.1.0"
edition = "2021"
[dependencies]
actix = "0.13"
actix-broker = "0.4.3"
tonic = "0.7"
prost = "0.10"
rand = "0.8.5"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
[build-dependencies]
tonic-build = "0.7"
Executions
# terminal 1
$ cargo run --bin helloworld-server
# terminal 2
$ cargo run --bin helloworld-client-actor
...
actor= 0, num_reqs_processed = 1024
actor= 1, num_reqs_processed = 983
actor= 2, num_reqs_processed = 999
actor= 3, num_reqs_processed = 1040
actor= 4, num_reqs_processed = 1013
actor= 5, num_reqs_processed = 951
actor= 6, num_reqs_processed = 1051
actor= 7, num_reqs_processed = 963
actor= 8, num_reqs_processed = 965
actor= 9, num_reqs_processed = 1011