Concurrent hashmap의 사용 동기는 익히 알려진 대로, map 전체의 lock을 잡는 대신 critical section 을 최소한으로 설계하여 lock의 사용 기간과 범위를 극단적으로 줄이도록 하여, 겹치지 않는 영역에 대한 자료 구조 연산들을 최대한 동시적으로 진행될 수 있도록 하겠다는 것이다. 물론, lock의 사용을 사용자로부터 감추어 deadlock/race-condition 등의 concurrency bug 걱정을 덜어 줄 수 있다는 것도 큰 덤이 될 것이다.

그런데, RUST 에서는 한가지 이유가 더 있다. 바로, mutable data structure의 쓰레드간 공유를 “컴파일러의 허락”하에 사용하고 싶다는 것. Concurrent hashmap의 대표 주자인 dashmap 문서 에 보면, 아래와 같은 문구가 눈에 띈다.

DashMap tries to be very simple to use and to be a direct replacement for RwLock<HashMap<K, V, S». To accomplish these all methods take &self instead modifying methods taking &mut self. This allows you to put a DashMap in an Arc and share it between threads while being able to modify it.

RUST에서는, struct 필드값을 변경시키는 fn 의 경우 “&mut self”를 첫번째 인자로 가져오게 되어 있는데, 이 경우 MutexRwLock으로 보호되어야 쓰레드간 공유가 가능하도록 되어있다. 이 때문에, HashMap을 여러 쓰레드에서 쓰고 싶으면, Arc<RwLock<HashMap<K, V, S>>>와 같은 형태로 사용되어야 컴파일이 된다는 이야기. 게다가, 컴파일러를 만족시킨다 하더라도 concurrent writes가 들어오면 RwLock, Mutex 어느걸 쓰더라도 single-threaded 성능 혹은 그 이하의 성능으로 떨어질 수 밖에 없을 것.

DashMap을 쓰면 문제가 간단해진다. 마법과도 같이, insert, remove, clear, …&self 로 인자로 받기 때문에 Arc<DashMap<K, V, S>>의 형태로 곧바로 사용할 수 있고, 별도의 lock을 획득하는 과정이 필요가 없어진다. 다만, 그 마법은 일종의 unsafe 를 이용한 흑마법의 일종(!)으로 컴파일러를 어느 정도 회피하는 것과 연관이 있는 듯하니 참고는 해둘 필요가 있겠다.

Concurrent hashmap의 사용처로 metrics library를 꼽을 수 있겠다. Multi-threaded application에서 다양한 thread에서 metric이 emit될 수 있을 텐데, metric library의 책임은 이들을 thread-safe 하게 잘 저장해두었다가 외부의 crawler (e.g., prometheus)에 정해진 포맷으로 전달해주는 것. Concurrent hashmap을 사용할 좋은 기회이고, Dashmap의 fn 중에 흥미로운 것이 있어 한가지 소개하면 alter() 라는 것이다. 가령, 특정 key에 해당하는 value를 1 증가시키거나 다른 변형을 가하고 싶은 경우, 아무리 concurrent hashmap을 쓴다고 해도 단순치 않은데, 왜냐면 사용자가 여러개의 insert/get 연산들을 atomic하게 처리하고 싶다는 의도를 알릴 방법이 없기 때문이다. 다행히, alter를 사용하면 사용자가 단지 값을 읽는 것이 아니라 읽은 다음 곧바로 변경시키고자 한다는 의도를 전달할 수 있어, metric 구현에 유용하게 사용할 수 있다.

아래의 예제는, 5개의 message sender와 2개의 message receiver를 만들어 한개의 channel로 연결한 뒤, channel에 message를 보낼 때마다 num_sends를 1씩 증가시키고 message를 받을 때마다 num_recvs를 1씩 감소시키고, 1초에 한번씩 metric 내용 전체를 출력하는 예제이다. Channel을 통해 보내지는 총 메시지 개수는 5 * 1000000 이 되기 때문에, Dashmap으로 concurrency control이 잘 되었다면, 적당한 시간이 흐른 뒤에는 num_sendsnum_recvs가 모두 5000000으로 출력이 되어야 한다.

use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crossbeam::channel::{Receiver, RecvError, Sender, SendError, unbounded};
use dashmap::DashMap;
use rand::Rng;

const NUM_SENDERS: i32 = 5;
const NUM_RECEIVERS: i32 = 2;

fn main() {
    let counters  = {
        let m = DashMap::new();
        m.insert("num_sends", 0);
        m.insert("num_recvs", 0);
        Arc::new(m)
    };

    let (tx, rx) = unbounded();
    for _ in 0..NUM_SENDERS {
        let tx = tx.clone();
        let counters = counters.clone();
        run_msg_sender(tx, counters);
    }

    for _ in 0..NUM_RECEIVERS {
        let rx = rx.clone();
        let counters = counters.clone();
        run_msg_receiver(rx, counters);
    }

    loop {
        println!("{:?}", counters);
        thread::sleep(Duration::from_secs(1));
    }
}

fn run_msg_sender(tx: Sender<i32>, counters: Arc<DashMap<&'static str, i32>>) {
    thread::spawn(move || {
        let mut rng = rand::thread_rng();
        for _ in 0..1000000 {
            let val = rng.gen_range(0..10000);
            match tx.send(val) {
                Ok(_) => {
                    counters.alter("num_sends", |_, v| v + 1);
                }
                Err(_) => {
                    break;
                }
            };
        }
    });
}

fn run_msg_receiver(rx: Receiver<i32>, counters: Arc<DashMap<&'static str, i32>>) {
    thread::spawn(move || {
        loop {
            match rx.recv() {
                Ok(_) => {
                    counters.alter("num_recvs", |_, v| v + 1);
                }
                Err(_) => {
                    break;
                }
            }
        }
    });
}

위를 실행하면, 다음과 같은 결과가 출력된다:

$ cargo run
{"num_sends": 40, "num_recvs": 44}
{"num_sends": 843597, "num_recvs": 843597}
{"num_sends": 1667305, "num_recvs": 1667294}
{"num_sends": 2489836, "num_recvs": 2489839}
{"num_sends": 3316906, "num_recvs": 3316905}
{"num_sends": 4145870, "num_recvs": 4145863}
{"num_sends": 4994430, "num_recvs": 4994021}
{"num_sends": 5000000, "num_recvs": 5000000}
{"num_sends": 5000000, "num_recvs": 5000000}
...