aptpod Tech Blog

株式会社アプトポッドのテクノロジーブログです

Rust+Quinnで大量のデータを送信する

f:id:aptpod_tech-writer:20201201162021j:plain

aptpod Advent Calendar 2020の4日目を担当します、研究開発グループの大久保です。

2日目では、RustとQuinnでechoサーバを作成しました。今回は応用として、Quinnを使って大量のデータを送信し、パフォーマンス評価をしてみたいと思います。弊社内のユースケースとして、エッジ側で溜まったデータをサーバに送信したい、という状況が考えられるため、それを想定した評価となります。

実装と検証

1プロセス内でサーバ、クライアント両方立てます。Cargo.tomlに追記する依存関係は以下のようになります。

[dependencies]
anyhow = "1"
futures = "0.3"
quinn = "0.6"
rand = "0.7"
rcgen = "0.8"
tokio = { version = "0.2", features = ["full"] }

評価用のコードは以下のようになります。

use anyhow::*;
use futures::StreamExt;
use quinn::{
    Certificate, CertificateChain, ClientConfigBuilder, Connecting, Endpoint, NewConnection,
    PrivateKey, ServerConfig, ServerConfigBuilder, TransportConfig,
};
use rand::{thread_rng, Rng};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Instant;

// 送信用のデータ(10KiB)
const MSG_SIZE: usize = 1024 * 10;

#[tokio::main]
async fn main() -> Result<()> {
    let cert = rcgen::generate_simple_self_signed(["localhost".to_owned()])?;
    let cert_der = cert.serialize_der()?;
    let priv_key = cert.serialize_private_key_der();

    let cert_der_clone = cert_der.clone();

    // 送信用の適当なデータを乱数で用意する
    let mut send_data = vec![0u8; MSG_SIZE];
    thread_rng().fill(&mut send_data[..]);

    // サーバを動かす
    tokio::spawn(async move {
        run_server(cert_der_clone, priv_key).await.unwrap();
    });

    // クライアントを動かし、所要時間(ミリ秒)を表示する
    let start = Instant::now();
    run_client(cert_der, &send_data).await?;
    let elapsed = start.elapsed();
    println!(
        "Elapsed time: {} ms",
        elapsed.as_secs() * 1000 + elapsed.subsec_millis() as u64
    );

    Ok(())
}

// サーバを動かす
async fn run_server(cert_der: Vec<u8>, priv_key: Vec<u8>) -> Result<()> {
    let mut transport_config = TransportConfig::default();
    transport_config.stream_window_uni(0xFF);
    let mut server_config = ServerConfig::default();
    server_config.transport = std::sync::Arc::new(transport_config);
    let mut server_config = ServerConfigBuilder::new(server_config);
    let cert = Certificate::from_der(&cert_der)?;
    server_config.certificate(
        CertificateChain::from_certs(vec![cert]),
        PrivateKey::from_der(&priv_key)?,
    )?;
    let mut endpoint = Endpoint::builder();
    endpoint.listen(server_config.build());
    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 33333);
    let (endpoint, mut incoming) = endpoint.bind(&addr)?;
    println!("listeing on {}", endpoint.local_addr()?);

    while let Some(conn) = incoming.next().await {
        tokio::spawn(async {
            match handle_connection(conn).await {
                Ok(_) => (),
                Err(e) => {
                    eprintln!("{}", e);
                }
            }
        });
    }

    Ok(())
}

// サーバへの接続を扱う
async fn handle_connection(conn: Connecting) -> Result<(), Error> {
    let NewConnection {
        connection,
        mut uni_streams,
        ..
    } = conn.await?;

    println!("connected from {}", connection.remote_address());

    while let Some(uni_stream) = uni_streams.next().await {
        let uni_stream = uni_stream?;
        tokio::spawn(async {
            let _data = uni_stream.read_to_end(MSG_SIZE).await.unwrap();
        });
    }

    println!("connection closed from {}", connection.remote_address());

    Ok(())
}

// クライアントを動かす
async fn run_client(cert_der: Vec<u8>, send_data: &[u8]) -> Result<()> {
    let mut client_config = ClientConfigBuilder::default();
    client_config.add_certificate_authority(Certificate::from_der(&cert_der)?)?;
    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 33333);
    let mut endpoint_builder = Endpoint::builder();
    endpoint_builder.default_client_config(client_config.build());
    let (endpoint, _incoming) = endpoint_builder.bind(&"0.0.0.0:0".parse().unwrap())?;

    let NewConnection { connection, .. } = endpoint.connect(&addr, "localhost")?.await?;
    println!("connected: addr={}", connection.remote_address());

    for _ in 0..1000 {
        let connection = connection.clone();
        let mut send_stream = connection.open_uni().await?;
        send_stream.write_all(send_data).await?;
        send_stream.finish().await?;
    }

    connection.close(0u8.into(), &[]);

    endpoint.wait_idle().await;

    Ok(())
}

echoサーバと基本的に同じように構築していきますが、いくつかの変更点があります。

    let cert = rcgen::generate_simple_self_signed(["localhost".to_owned()])?;
    let cert_der = cert.serialize_der()?;
    let priv_key = cert.serialize_private_key_der();

main関数のこちらの部分では、rcgenクレートを用いて、自己署名証明書を作成しています。テスト目的ならこれで十分でしょう。

    while let Some(uni_stream) = uni_streams.next().await {
        let uni_stream = uni_stream?;
        tokio::spawn(async {
            let _data = uni_stream.read_to_end(MSG_SIZE).await.unwrap();
        });
    }

handle_connection関数では、サーバに対して張られた単方向ストリームを読み込みます。読み込むだけで返信は行いません。

    for _ in 0..1000 {
        let connection = connection.clone();
        let mut send_stream = connection.open_uni().await?;
        send_stream.write_all(send_data).await?;
        send_stream.finish().await?;
    }

run_client関数では、用意したデータ(10KiB)を1000回ループを回して送信します。

こちらを実行すると、以下のような出力になりました。

listeing on 0.0.0.0:33333
connected: addr=127.0.0.1:33333
connected from 127.0.0.1:47158
closed by peer: 0
Elapsed time: 131 ms

1000回送信したので、1回あたり0.13ms程度で10KiBを送信していたことが分かります。

送信タスクを非同期にする

せっかくtokioで非同期にコードを書いているので、FuturesUnorderedを使って送信タスクを非同期にしてみます。その場合、送信を1000回行っているfor文を次のように書き換えます。

    let mut tasks = futures::stream::FuturesUnordered::new();

    for _ in 0..1000 {
        let connection = connection.clone();
        let send_data = send_data.to_vec();
        let task = async move {
            let mut send_stream = connection.open_uni().await.unwrap();
            send_stream.write_all(&send_data).await.unwrap();
            send_stream.finish().await.unwrap();
        };
        tasks.push(task);
    }

    while let Some(_) = tasks.next().await {}

この場合の実行結果は以下のようになりました。

listeing on 0.0.0.0:33333
connected: addr=127.0.0.1:33333
connected from 127.0.0.1:42291
closed by peer: 0
Elapsed time: 83 ms

1回あたり0.083msで、すこし短縮されます。

なお、送信するデータを10KiBから100KiBにした場合、1回送信ごとにブロッキングする実装で663ms、非同期で668msと、ほとんど差が無くなります。1ストリームあたりに流す量に応じて使い分けるのが良さそうです。

最後に

今回は、RustとQuinnを使って、大量のデータを送信した場合のパフォーマンスを調査しました。Quinnは、tokioをベースとして実装されているため、tokioやfuturesといったRustの非同期機能と組み合わせて使うことができ、コネクションやストリームを開くのも非常に直感的にできます。今後もこれらの機能を活用しながら、引き続きQUICの利用法を調査していく予定です。