aptpod Advent Calendar 2020の4日目を担当します、研究開発グループの大久保です。
2日目では、RustとQuinnでechoサーバを作成しました。今回は応用として、Quinnを使って大量のデータを送信し、パフォーマンス評価をしてみたいと思います。弊社内のユースケースとして、エッジ側で溜まったデータをサーバに送信したい、という状況が考えられるため、それを想定した評価となります。
実装と検証
1プロセス内でサーバ、クライアント両方立てます。Cargo.toml
に追記する依存関係は以下のようになります。
[dependencies] anyhow = "1" futures = "0.3" quinn = "0.6" rand = "0.7" rcgen = "0.8" tokio = { version = "0.2", features = ["full"] }
評価用のコードは以下のようになります。
use anyhow::*; use futures::StreamExt; use quinn::{ Certificate, CertificateChain, ClientConfigBuilder, Connecting, Endpoint, NewConnection, PrivateKey, ServerConfig, ServerConfigBuilder, TransportConfig, }; use rand::{thread_rng, Rng}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::time::Instant; // 送信用のデータ(10KiB) const MSG_SIZE: usize = 1024 * 10; #[tokio::main] async fn main() -> Result<()> { let cert = rcgen::generate_simple_self_signed(["localhost".to_owned()])?; let cert_der = cert.serialize_der()?; let priv_key = cert.serialize_private_key_der(); let cert_der_clone = cert_der.clone(); // 送信用の適当なデータを乱数で用意する let mut send_data = vec![0u8; MSG_SIZE]; thread_rng().fill(&mut send_data[..]); // サーバを動かす tokio::spawn(async move { run_server(cert_der_clone, priv_key).await.unwrap(); }); // クライアントを動かし、所要時間(ミリ秒)を表示する let start = Instant::now(); run_client(cert_der, &send_data).await?; let elapsed = start.elapsed(); println!( "Elapsed time: {} ms", elapsed.as_secs() * 1000 + elapsed.subsec_millis() as u64 ); Ok(()) } // サーバを動かす async fn run_server(cert_der: Vec<u8>, priv_key: Vec<u8>) -> Result<()> { let mut transport_config = TransportConfig::default(); transport_config.stream_window_uni(0xFF); let mut server_config = ServerConfig::default(); server_config.transport = std::sync::Arc::new(transport_config); let mut server_config = ServerConfigBuilder::new(server_config); let cert = Certificate::from_der(&cert_der)?; server_config.certificate( CertificateChain::from_certs(vec![cert]), PrivateKey::from_der(&priv_key)?, )?; let mut endpoint = Endpoint::builder(); endpoint.listen(server_config.build()); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 33333); let (endpoint, mut incoming) = endpoint.bind(&addr)?; println!("listeing on {}", endpoint.local_addr()?); while let Some(conn) = incoming.next().await { tokio::spawn(async { match handle_connection(conn).await { Ok(_) => (), Err(e) => { eprintln!("{}", e); } } }); } Ok(()) } // サーバへの接続を扱う async fn handle_connection(conn: Connecting) -> Result<(), Error> { let NewConnection { connection, mut uni_streams, .. } = conn.await?; println!("connected from {}", connection.remote_address()); while let Some(uni_stream) = uni_streams.next().await { let uni_stream = uni_stream?; tokio::spawn(async { let _data = uni_stream.read_to_end(MSG_SIZE).await.unwrap(); }); } println!("connection closed from {}", connection.remote_address()); Ok(()) } // クライアントを動かす async fn run_client(cert_der: Vec<u8>, send_data: &[u8]) -> Result<()> { let mut client_config = ClientConfigBuilder::default(); client_config.add_certificate_authority(Certificate::from_der(&cert_der)?)?; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 33333); let mut endpoint_builder = Endpoint::builder(); endpoint_builder.default_client_config(client_config.build()); let (endpoint, _incoming) = endpoint_builder.bind(&"0.0.0.0:0".parse().unwrap())?; let NewConnection { connection, .. } = endpoint.connect(&addr, "localhost")?.await?; println!("connected: addr={}", connection.remote_address()); for _ in 0..1000 { let connection = connection.clone(); let mut send_stream = connection.open_uni().await?; send_stream.write_all(send_data).await?; send_stream.finish().await?; } connection.close(0u8.into(), &[]); endpoint.wait_idle().await; Ok(()) }
echoサーバと基本的に同じように構築していきますが、いくつかの変更点があります。
let cert = rcgen::generate_simple_self_signed(["localhost".to_owned()])?; let cert_der = cert.serialize_der()?; let priv_key = cert.serialize_private_key_der();
main
関数のこちらの部分では、rcgenクレートを用いて、自己署名証明書を作成しています。テスト目的ならこれで十分でしょう。
while let Some(uni_stream) = uni_streams.next().await { let uni_stream = uni_stream?; tokio::spawn(async { let _data = uni_stream.read_to_end(MSG_SIZE).await.unwrap(); }); }
handle_connection
関数では、サーバに対して張られた単方向ストリームを読み込みます。読み込むだけで返信は行いません。
for _ in 0..1000 { let connection = connection.clone(); let mut send_stream = connection.open_uni().await?; send_stream.write_all(send_data).await?; send_stream.finish().await?; }
run_client
関数では、用意したデータ(10KiB)を1000回ループを回して送信します。
こちらを実行すると、以下のような出力になりました。
listeing on 0.0.0.0:33333 connected: addr=127.0.0.1:33333 connected from 127.0.0.1:47158 closed by peer: 0 Elapsed time: 131 ms
1000回送信したので、1回あたり0.13ms程度で10KiBを送信していたことが分かります。
送信タスクを非同期にする
せっかくtokio
で非同期にコードを書いているので、FuturesUnordered
を使って送信タスクを非同期にしてみます。その場合、送信を1000回行っているfor文を次のように書き換えます。
let mut tasks = futures::stream::FuturesUnordered::new(); for _ in 0..1000 { let connection = connection.clone(); let send_data = send_data.to_vec(); let task = async move { let mut send_stream = connection.open_uni().await.unwrap(); send_stream.write_all(&send_data).await.unwrap(); send_stream.finish().await.unwrap(); }; tasks.push(task); } while let Some(_) = tasks.next().await {}
この場合の実行結果は以下のようになりました。
listeing on 0.0.0.0:33333 connected: addr=127.0.0.1:33333 connected from 127.0.0.1:42291 closed by peer: 0 Elapsed time: 83 ms
1回あたり0.083msで、すこし短縮されます。
なお、送信するデータを10KiBから100KiBにした場合、1回送信ごとにブロッキングする実装で663ms、非同期で668msと、ほとんど差が無くなります。1ストリームあたりに流す量に応じて使い分けるのが良さそうです。
最後に
今回は、RustとQuinnを使って、大量のデータを送信した場合のパフォーマンスを調査しました。Quinnは、tokioをベースとして実装されているため、tokioやfuturesといったRustの非同期機能と組み合わせて使うことができ、コネクションやストリームを開くのも非常に直感的にできます。今後もこれらの機能を活用しながら、引き続きQUICの利用法を調査していく予定です。