ここは俺の備忘録だ

少なくとも日本語での言及が少ない話をするつもりです

Rustでmpsc::Receiversを1スレッドかつ並行に待つ方法

イベントハンドリング等である程度込み入ってくると単一の膨れ上がったenumを分割し,それぞれのモジュールに分けたりする.すると複数のenumが一堂に会する場が何処かに発生する筈だ.

ただ.std::sync::mpsc::Receiverのrecvやiterブロッキングであるため,enum毎に用意した複数のReceiversで逐一止まってしまうのは効率が悪い.

ではReceivers毎にスレッドを立てれば良いかというと,これも所有権が持っていかれる等面倒が多いし,そもそも1つのReceiversを待つ為だけにそれぞれカーネルスレッドを立てるのは無駄な気がする.

というわけで以下は個人的に分散処理のOSSを眺めていた際提案されていたそういうモチベの解決策,のアルゴリズムの再現である.

これはこのままでは動かない.流れと実装しなければならないものの備忘録という事にしてほしい.

#[derive(Debug, Clone)]
enum EvCat {
    Net,
    UI,
}

#[derive(Debug)]
enum NetEv {
    Success,
    Failure,
}

#[derive(Debug)]
enum UiEv {
    ScreenTransition,
    Terminate,
}

struct EvSender<Cat, Subset> {
    ev_tx: mpsc::Sender<Subset>,
    cat: Cat,
    cat_tx: mpsc::Sender<Cat>,
}

fn main() {

    let (cat_tx, cat_rx) = std::sync::mpsc::channel();
    let (net_ev_tx, net_ev_rx) = std::sync::mpsc::channel();
    let (ui_ev_tx, ui_ev_rx) = std::sync::mpsc::channel();

    let ui_ev_sender = EvSender::<EvCat, UiEv>::new(ui_ev_tx, EvCat::UI, cat_tx.clone());

    let nw_ev_sender = EvSender::<EvCat, NetEv>::new(net_ev_tx, EvCat::Net, cat_tx);

    let joinh = thread::spawn(move || {
        for it in cat_rx.iter() {
            match it {
                EvCat::Net => {
                    if let Ok(net_ev) = net_ev_rx.try_recv() {
                        match net_ev {
                            NetEv::Success    => { /* Do Something */ },
                            NetEv::Failure => { /* Do Something */ },
                        }
                    }
                },
                EvCat::UI => {
                    if let Ok(ui_ev) = ui_ev_rx.try_recv() {
                        match ui_ev {
                            UiEv::Terminate       => break,
                            UiEv::ScreenTransition => { /* Do Something */ },
                        }
                    }
                }
            }
        }
    });

    nw_ev_sender.send(NetEv::Connected).unwrap();
    ui_ev_sender.send(UiEv::CreateDirectory).unwrap();
    ui_ev_sender.send(UiEv::Terminate).unwrap();
}

Receiverの try_recv 関数だけは唯一ノンブロッキングメソッドであり, 値が届いて無ければResult::Errを返してくれる.cat_rxのiterブロッキングは仕方ないのでスレッドで打ち消す.

EvSenderの実装だが,これはnew関数の値を保持するだけの構造体である. 使われているのは最後のsendの部分で(このページでは実装されていないので想像して欲しい),先のEvSenderに対して以下の様なimplになる.

fn send(&self, event: Subset) -> Result<(), /*error types*/> {
    if let Err(err) = self.ev_tx.send(event) {
        /*error handling*/
    }
    if let Err(err) = self.cat_tx.send(self.cat.clone()) {
        /*error handling*/
    }
    Ok(())
}

ev_txへ分割されたenumのサブセットをsendして様子を見た後,続けてcatを流す. すると理想的にはmatchする値が流れ着くのに続いてforのcat_rx.iter()が作動し,enumをハンドリングするという流れになる.

この方法ならばenumのサブセットを各モジュールに分散させてそれぞれ使っている場合でも, 全体の集合を定義しておく事で,まとめてハンドリングできるようになる. 今回の題材はenumによるイベントのハンドリングであったが,それに限らず複数のReceiversの集約として有用なモデルだと言えるだろう. これまで代数的データ型を使う機会は頻繁に有ったが,所有権を意識させられるとまた新しい用途や工夫が現れ中々面白いものである.