Rustにおける非同期ストリームの関数呼び出しコストを検証する

f:id:aptpod_tech-writer:20210316130944p:plain

OTチームの大久保です。

エッジデバイス上でのデータ処理やネットワーク周りの実装に、速度と生産性の両面で優れるRust言語を利用できないかをここ最近は検討しています。特に、tokioのバージョン1.0がリリースされたように、最近はRustの非同期関連のエコシステムが充実してきたので、エッジデバイスでも応用できそうです。Rustのasync/awaitはゼロコストを謳っているので、安心して使うことができます。しかしながら、極めて高頻度に呼び出される関数がasyncであった場合、普通の同期的(asyncではない普通のfn)な関数に比べて、asyncであることによる関数の呼び出しコストの増加は無いのでしょうか。実用上は、asyncな関数は内部で非同期IOを行うはずなので、それに比べれば関数の呼び出しコストは微々たるもので気にする必要はありませんが、以下のような場合には問題に成りえます。

  • async関数だが、稀にしかコストのかかるIO操作を行わない。例えば、大抵はキューに保存されているデータを返すが、キューが空の場合になって初めてファイルを読み込むなど。
  • 内部でIO操作を行わず、async関数にする必要が無いが、他のIO操作をする関数とインターフェイスを合わせるため、asyncを指定している。

今回は、このうち2番目のような状況を想定し、何度も値を取り出すイテレータの非同期版、つまりストリームを複数の方法で作って、実行速度を検証してみたいと思います。

普通のイテレータ

1から1億までの数字を順番に返すイテレータSyncTestを作ります。

const N_LOOP: usize = 100000000;
const SUM: usize = N_LOOP * (N_LOOP + 1) / 2;

mod sync_test {
    use super::*;

    pub struct SyncTest(pub usize);

    impl Iterator for SyncTest {
        type Item = usize;

        fn next(&mut self) -> Option<usize> {
            self.0 += 1;
            if self.0 <= N_LOOP {
                Some(self.0)
            } else {
                None
            }
        }
    }
}

実行時間を調べるため、次のようなprint_time関数を用意します。このprint_timeは、与えられた関数の実行時間を測定し、1秒間に何回イテレータ(ストリーム)が処理されたかを表示します。ちなみに、nightlyのbenchによる測定は、async関数を何度も呼ぶ計測には向いてなさそうなので、実行時間を調べるのに自前のprint_time関数を用意しました。

fn print_time<F: FnOnce()>(f: F) {
    let before = Instant::now();
    f();
    let duration = Instant::now().duration_since(before);
    let secs = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1000000000.0;
    println!("{:?}:\t{:.0}/s", duration, N_LOOP as f64 / secs);
}

SyncTextイテレータを作成し、実行するコードを作成します。

println!("sync_test");
let mut iter = sync_test::SyncTest(0);
print_time(move || {
    let mut sum = 0;
    while let Some(result) = iter.next() {
        sum += result;
        black_box(result);
    }
    assert_eq!(sum, SUM);
});

このiter.next()は1億回呼ばれ、その結果の合計値を計算し最後にそれが正しいか確認します。また、ループ毎の結果はstd::hint::black_boxに渡します。black_boxは、渡した値がどのような用途にも使用され得ることを示すためのnightly限定の関数で、このようなパフォーマンス測定の時に過度な最適化を抑止します。

上記のコードの実行結果は以下のようになりました。ちなみに、動作させているCPUはCore i5-8259Uです。

sync_test
61.327952ms:    1630577848/s

このイテレータを1億回実行するのには60ms程度かかり、1秒間に16億回程度実行することができます。

asyncをつけてみる

SyncTestnextasyncをつけただけのものを実行してみます。Streamトレイトの定義通りではありませんが、このようにasync関数で値を取り出すオブジェクトもストリームの仲間として今回扱います。

実行はtokioのランタイムを用います。

mod async_test {
    use super::N_LOOP;

    pub struct AsyncTest(pub usize);

    impl AsyncTest {
        pub async fn next(&mut self) -> Option<usize> {
            self.0 += 1;
            if self.0 <= N_LOOP {
                Some(self.0)
            } else {
                None
            }
        }
    }
}

// ランタイムの用意
let rt = tokio::runtime::Builder::new_current_thread()
    .enable_all()
    .build()
    .unwrap();

// 実行
println!("async_test");
let mut iter = async_test::AsyncTest(0);
print_time(|| {
    rt.block_on(async move {
        let mut sum = 0;
        while let Some(result) = iter.next().await {
            sum += result;
            black_box(result);
        }
        assert_eq!(sum, SUM);
    });
});

この実行結果は以下のようになります。

async_test
202.02216ms:   494995203/s

このasync版nextは、秒間5億回程度呼び出すことができます。普通のイテレータより3倍程度遅くなりました。もっとも、このnext関数はほとんど中身が無く、実行時間はほとんど関数呼び出しに占められていると考えると、async関数の呼び出しのコストは案外小さいと言えるのかもしれません。

futures::stream::iter

futuresは、ストリームを操作するための関数をいくつか提供しています。その中で、futures::stream::iterは、イテレータからストリームを作成するための関数です。先程作ったSyncTestイテレータから、ストリームを作成してみます。

また、StreamトレイトをBoxに入れたBoxStreamというものがfuturesには定義されています。中身の型を隠蔽することができるので、中身が異なる複数のストリームを扱う場合に便利です。このBoxStreamを使った場合の性能も計測します。

mod stream_from_iter_test {
    use super::sync_test::SyncTest;
    use futures::stream::{iter, BoxStream, Stream};

    pub fn get_stream(init: usize) -> impl Stream<Item = usize> {
        iter(SyncTest(init))
    }

    pub fn get_stream_boxed(init: usize) -> BoxStream<'static, usize> {
        Box::pin(get_stream(init))
    }
}

// 実行
println!("stream_from_iter_test");
let stream = stream_from_iter_test::get_stream(0);
print_time(|| {
    rt.block_on(async move {
        let mut sum = 0;
        futures::pin_mut!(stream);
        while let Some(result) = stream.next().await {
            sum += result;
            black_box(result);
        }
        assert_eq!(sum, SUM);
    });
});

// Box版を実行
println!("stream_from_iter_boxed_test");
let mut stream = stream_from_iter_test::get_stream_boxed(0);
print_time(|| {
    rt.block_on(async move {
        let mut sum = 0;
        while let Some(result) = stream.next().await {
            sum += result;
            black_box(result);
        }
        assert_eq!(sum, SUM);
    });
});

この実行結果は以下のようになります。

stream_from_iter_test
157.242894ms:   635958786/s
stream_from_iter_boxed_test
189.748031ms:   527014691/s

Boxを用いない場合、秒間6億回と先程のasync版nextより性能が高い結果となりました。BoxStreamにした場合、それよりやや性能が落ちます。トレイトオブジェクトからの仮想関数呼び出しと同様に、Boxに入れるとやや呼び出しコストが高くなることが伺えます。

futures::stream::unfold

イテレータからStreamを作る場合、当然イテレータのnextはasyncでは無いのでその中でawaitは使えません。awaitを使いたい場合、futures::stream::unfoldを使います。任意のオブジェクトをstateとして使い回すことでStreamを実装します。先程定義したAsyncTestを使うと以下のようになります。

mod stream_unfold_test {
    use super::async_test::AsyncTest;
    use futures::stream::{unfold, BoxStream, Stream};

    pub fn get_stream(init: usize) -> impl Stream<Item = usize> {
        unfold(AsyncTest(init), |mut state| async move {
            let result = state.next().await;
            result.map(|result| (result, state))
        })
    }

    pub fn get_stream_boxed(init: usize) -> BoxStream<'static, usize> {
        Box::pin(get_stream(init))
    }
}

// 実行
println!("unfold_test");
let stream = stream_unfold_test::get_stream(0);
print_time(|| {
    rt.block_on(async move {
        let mut sum = 0;
        futures::pin_mut!(stream);
        while let Some(result) = stream.next().await {
            sum += result;
            black_box(result);
        }
        assert_eq!(sum, SUM);
    });
});

// Box版を実行
println!("unfold_boxed_test");
let mut stream = stream_unfold_test::get_stream_boxed(0);
print_time(|| {
    rt.block_on(async move {
        let mut sum = 0;
        while let Some(result) = stream.next().await {
            sum += result;
            black_box(result);
        }
        assert_eq!(sum, SUM);
    });
});

この実行結果は以下のようになります。

unfold_test
421.561371ms:   237213385/s
unfold_boxed_test
428.991288ms:   233104967/s

秒間2億回と、AsyncTestnextを直接呼び出すのに比べて速度は半減しています。しかし、async関数からBoxStreamの形にしたい場合は有用でしょう。

async-stream

async-streamは、他の言語のyield文に相当するものをRustで使えるようにするためのマクロを提供します。Rustにもyield、そしてジェネレーターに相当するものはnightlyには存在するのですが、安定版では使えません。

async-streamを使うと、yieldで生成したい値を返すことでストリームを実装できます。

mod async_stream_test {
    use super::*;
    use futures::stream::{BoxStream, Stream};

    pub fn get_stream(init: usize) -> impl Stream<Item = usize> {
        async_stream::stream! {
            for i in init..=N_LOOP {
                // ループの中でyieldを使う
                yield i;
            }
        }
    }

    pub fn get_stream_boxed(init: usize) -> BoxStream<'static, usize> {
        Box::pin(get_stream(init))
    }
}

// 実行
println!("async_stream_test");
let stream = async_stream_test::get_stream(0);
print_time(|| {
    rt.block_on(async move {
        let mut sum = 0;
        futures::pin_mut!(stream);
        while let Some(result) = stream.next().await {
            sum += result;
            black_box(result);
        }
        assert_eq!(sum, SUM);
    });
});

// Box版を実行
println!("async_stream_boxed_test");
let mut stream = async_stream_test::get_stream_boxed(0);
print_time(|| {
    rt.block_on(async move {
        let mut sum = 0;

        while let Some(result) = stream.next().await {
            sum += result;
            black_box(result);
        }
        assert_eq!(sum, SUM);
    });
});

この実行結果は以下のようになります。

async_stream_test
1.207506539s:   82815287/s
async_stream_boxed_test
1.213700546s:   82392646/s

秒間8000万回と、これまでの結果よりかなり遅くなります。async_stream内のyieldは、関数を中断する命令として働きますが、これはマクロによりawaitに変換されます。await自体は値を返せないので、async-streamはyieldによって返す値を一旦スレッドローカルストレージに格納し、awaitで抜けてから取り出して呼び出し元に返すという実装になっているようです。このあたりの処理はそれなりに複雑なので、時間がかかってしまったと思われます。

とはいえ、安定版でyieldが使えるのはなかなか魅力的です。

async-trait

Rustでは、asyncの付いた関数をトレイト定義に用いることは現状できません。async-traitは、マクロを使ってasync付き関数をトレイトに含めるようにしてくれます。そしてトレイトを実装した型をトレイトオブジェクトにすることも可能になります。ここでは、async-traitを使った場合の関数呼び出しコストを検証してみます。

async-traitを使って、これまで通り1から1億までの数字を取り出せるオブジェクトを定義します。

mod async_trait_test {
    use super::*;

    #[async_trait::async_trait]
    pub trait MyStream {
        async fn next(&mut self) -> Option<usize>;
    }

    pub struct AsyncTraitTest(pub usize);

    #[async_trait::async_trait]
    impl MyStream for AsyncTraitTest {
        async fn next(&mut self) -> Option<usize> {
            self.0 += 1;
            if self.0 <= N_LOOP {
                Some(self.0)
            } else {
                None
            }
        }
    }
}

// 実行
println!("async_trait_test");
// トレイトオブジェクトにする
let mut mystream: Box<dyn async_trait_test::MyStream> =
    Box::new(async_trait_test::AsyncTraitTest(0));
print_time(|| {
    rt.block_on(async move {
        let mut sum = 0;
        while let Some(result) = mystream.next().await {
            sum += result;
            black_box(result);
        }
        assert_eq!(sum, SUM);
    });
});

この実行結果は以下のようになります。

async_trait_test
1.652983415s:   60496675/s

秒間6000万回と、async-streamを使ったときより性能が低下しました。async-traitは、async関数をトレイトで扱うためにBoxを使っており、そのため関数呼び出し毎にヒープ割当が発生し、それなりの負荷になってしまったと考えられます。しかし、async-traitはトレイトに適用できるので柔軟性が高く、その利点に比べればよほど高頻度で呼び出される場面でない限り負荷は気にならないでしょう。

generator (nightly)

非同期ストリームではありませんが、nightlyのジェネレーターを使って、これまでと同等のコードを記述してみます。

// ジェネレーターを使うために必要なfeatureの指定
#![feature(generators, generator_trait)]

mod generator_test {
    use super::*;
    use std::ops::Generator;

    pub fn get_generator(init: usize) -> impl Generator<Yield = usize> {
        // yieldを含むクロージャはジェネレーターになる
        move || {
            for i in init..=N_LOOP {
                yield i;
            }
        }
    }
}

// 実行
println!("generator_test");
let mut gen = generator_test::get_generator(0);
print_time(|| {
    use std::ops::{Generator, GeneratorState};
    use std::pin::Pin;

    let mut sum = 0;

    while let GeneratorState::Yielded(result) = Pin::new(&mut gen).resume(()) {
        sum += result;
        black_box(result);
    }
    assert_eq!(sum, SUM);
});

この実行結果は以下のようになります。

generator_test
152.886807ms:   654078674/s

単純なイテレータには及ばないものの、秒間6億回とそれなりの速度です。yieldが使えると、イテレータのためにstructやenumを使って自分で状態機械を定義する手間が無くなるので、なるべく早く安定版にも導入されてほしいところです。当分先の話になりそうですが。

比較

以上の測定結果をまとめると以下の表のようになりました。

実装 実行回数 [ /s ]
イテレータ 1630577848
async付next 494995203
futures::stream::iter 635958786
futures::stream::iter (Box) 527014691
futures::stream::unfold 237213385
futures::stream::unfold (Box) 233104967
async-stream 82815287
async-stream (Box) 82392646
async-trait 60496675
generator (nightly) 654078674

並べて見てみると結構な差があることがわかります。

まとめ

今回は単純なイテレータ・ストリームを色々な方法で実装し、繰り返し呼び出した時の実行時間を計測・比較しました。結果として、実装方法によって実行時間には有意な差が見られました。現実には、関数呼び出しより中身の処理の方が時間がかかり、ましてや非同期IOを扱うことにくらべれば微々たる差になります。しかし、高頻度で呼び出される場合も考えられ、その場合は実装による性能の違いを検証してみる価値はあるでしょう。