Rust+wasmでWebSocket上にバイナリデータ(CBOR)を流してみる

f:id:apt-k-ueno:20200107191523j:plain

この記事はAptpod Advent Calendar 2019の10日目の記事です。

先端技術調査グループの大久保です。

前回の記事では、WebSocketのechoサーバにアクセスするwasmをRustとGoで作成しました。今回は、echoだけでは物足りないので、意味のあるバイナリデータをサーバから流して、クライアント側、すなわちWebブラウザ上に表示するまでやってみます。あまり大きくないデータならJSONにして文字列を流せば良いのですが、JSONだとサイズが問題になるようなケースを想定して、JSONよりコンパクトなCBORを使ってみます。

ちなみに今回はRust作ったところで力尽きたので、対応するGoのコードは残念ながらありません。

実装

3つのクレートに実装を分けます。送るデータの定義、エンコード、デコードを担当するmydataクレート、サーバを担当するws-serverクレート、そしてクライアントを担当し、wasmにコンパイルされるws-clientクレートから成ります。前回同様、wasmにコンパイルされたRustからJavaScriptのAPIを使用するにはwasm-bindgenを用います。データが全体でどのように流れるかは少し複雑ですが、以下の図のようになります。

f:id:aptpod_tech-writer:20191209105706p:plain

mydataクレート

Rustの構造体に格納したデータをシリアライズ/デシリアライズするために今回はcbor_serdeを用います。CBORはバイナリ版JSONみたいなフォーマットで、ちょうどいいRustの実装がありましたので使わせていただきます。シリアライザ/デシリアライザが用意できるならCBOR以外のどんなフォーマットでも同じように書くことができます。

serdeとcbor_serdeに依存するので、以下の依存関係をCargo.tomlに追記します。

[dependencies]
serde = "1"
serde_derive = "1"
serde_cbor = "0.10"

このクレートは他クレートから参照されるため、lib.rsにコードを書いていきます。

use serde::Serialize;
use serde_cbor::de::from_slice;
use serde_cbor::ser::{IoWrite, Serializer};
use serde_derive::{Deserialize, Serialize};

// 送りたいデータを定義する構造体
#[derive(Serialize, Deserialize, Debug)]
pub struct MyData {
    pub time: u64,
    pub data: Vec<u64>,
}

impl MyData {
    // バイナリデータへエンコードし、Writeへ書き出す
    pub fn encode<W: std::io::Write>(&self, w: W) -> Result<(), serde_cbor::error::Error> {
        let mut serializer = Serializer::new(IoWrite::new(w)).packed_format();
        self.serialize(&mut serializer)?;
        Ok(())
    }

    // スライスからMyDataへデコード
    pub fn decode(slice: &[u8]) -> Result<MyData, serde_cbor::error::Error> {
        from_slice(slice)
    }
}

MyData構造体が実際に送りたいデータの本体で、UNIX時間とu64の配列とします。#[derive(Serialize, Deserialize)]と指定することで、serde_cborのシリアライザ/デシリアライザでこの型が使えるようになります。これを利用してencode/decode関数を記述します。serdeの力によりエンコーダ/デコーダはかなり短く書くことができます。

ws-serverクレート

先ほど定義したMyDataを送信するサーバを作成します。以下の依存関係をCargo.tomlに追記します。

[dependencies]
websocket = "0.23"
rand = "0.7"
mydata = { path="../mydata" }

WebSocketを使うためにwebsocketクレートを、適当なデータ生成のためにrandクレートを、そして先ほど作成したmydataクレートを追加します。

main.rsは以下のようになります。

use mydata::MyData;
use std::thread;
use websocket::sync::Server;
use websocket::{Message, OwnedMessage};

// 送るデータのサイズを指定
const DATA_SIZE: usize = 512;

fn main() {
    // サーバを立てる
    let server = Server::bind("localhost:50000").unwrap();

    for request in server.filter_map(Result::ok) {
        thread::spawn(|| {
            let mut client = request.accept().unwrap();

            let ip = client.peer_addr().unwrap();

            println!("Connection from {}", ip);

            let mut buf = Vec::new();

            for _ in 0..100000 {
                let data = gen_random_data(DATA_SIZE); // MyDataを生成
                data.encode(&mut buf).expect("encode error"); // バッファへエンコード
                let message = Message::binary(buf.as_slice());
                client.send_message(&message).unwrap(); // エンコードした結果を送る
                std::thread::sleep(std::time::Duration::from_millis(50)); // 少し待つ
                buf.clear();
            }

            let message = OwnedMessage::Close(None);
            client.send_message(&message).unwrap();
        });
    }
}

// 乱数を使ってMyDataを生成する
fn gen_random_data(size: usize) -> MyData {
    let mut data = Vec::with_capacity(size);
    let time = std::time::SystemTime::now() // timeにはUNIX時間を格納する
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap()
        .as_secs();
    for _ in 0..size {
        data.push(rand::random());
    }
    MyData { time, data }
}

MyDataのtimeにはUNIX時間を、dataには適当な長さの乱数列を格納し、50msごとに送信します。

ws-clientクレート

ws-clientでは、JavaScriptのAPIを使ってWebSocketのメッセージを受け取り、MyDataへデコードします。そして画面に表示する文字列を作成し、それをJavaScript側に渡します。ブラウザ上への反映はJavaScript側が行うものとします。JavaScriptへ渡す文字列は、サーバから送られてきたMyDataに格納されるUNIX時間と、dataの先頭にある乱数を表示するためのものです。

lib.rsは以下のようになります。

extern crate alloc;

use mydata::MyData;
use std::cell::RefCell;
use wasm_bindgen::prelude::*;

#[wasm_bindgen]
extern "C" {
    #[wasm_bindgen(js_namespace = console)]
    pub fn log(s: &str);
}

macro_rules! console_log {
    ($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
}

#[global_allocator]
static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;

use wasm_bindgen::JsCast;
use web_sys::{ErrorEvent, MessageEvent, WebSocket};

#[wasm_bindgen(start)]
pub fn start_websocket() -> Result<(), JsValue> {
    // WebSocketサーバに接続
    let ws = WebSocket::new("ws://localhost:50000")?;

    // コールバックの登録
    let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
        on_message(e);
    }) as Box<dyn FnMut(MessageEvent)>);
    ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
    onmessage_callback.forget();

    let onerror_callback = Closure::wrap(Box::new(move |e: ErrorEvent| {
        console_log!("error event: {:?}", e);
    }) as Box<dyn FnMut(ErrorEvent)>);
    ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
    onerror_callback.forget();

    let onopen_callback = Closure::wrap(Box::new(move |_| {
        console_log!("socket opened");
    }) as Box<dyn FnMut(JsValue)>);
    ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
    onopen_callback.forget();

    Ok(())
}

// WebSocketのメッセージを受け取ったら呼ばれる関数
fn on_message(e: MessageEvent) {
    // WebSocketからBlobオブジェクトを受け取る
    let blob: web_sys::Blob = e.data().unchecked_into();
    // Blobから&[u8]を取り出すために、データのロード後に呼ばれるon_data_loadedを登録する
    let promise = web_sys::Response::new_with_opt_blob(Some(&blob))
        .unwrap()
        .array_buffer()
        .unwrap();
    let callback = Closure::wrap(Box::new(move |array: JsValue| {
        on_data_loaded(array);
    }) as Box<dyn FnMut(JsValue)>);
    promise.then(&callback);
    callback.forget(); // 現状メモリリークしているが、もっといい書き方があるはず
}

thread_local!(static CALLBACK: RefCell<Option<js_sys::Function>> = RefCell::new(None));

// コールバックを設定するための関数
// この関数はJavaScriptから呼ばれる
#[wasm_bindgen]
pub fn set_callback(f: js_sys::Function) {
    CALLBACK.with(|callback| {
        *callback.borrow_mut() = Some(f);
    });
}

// WebSocketから受け取ったデータを処理する関数
fn on_data_loaded(array: JsValue) {
    let array = js_sys::Uint8Array::new(&array);
    let len = array.byte_length() as usize;
    let mut buf: Vec<u8> = vec![0; len];
    array.copy_to(&mut buf);

    let data = MyData::decode(&buf).unwrap(); // CBORのデータをMyData型にする
    CALLBACK.with(|callback| {
        if let Some(ref callback) = *callback.borrow() {
            let data = format!("time : {}  value : {}", data.time, data.data[0]); // JavaScriptに渡す文字列
            let data = JsValue::from(data); // 文字列をJavaScriptに渡せるように変換する
            callback.call1(&JsValue::NULL, &data).unwrap(); // 登録されたJavaScriptの関数を呼び出す
        }
    });
}

次のようなindex.htmlを用意します。

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8">
    <title>Wasm + WebSocket Test</title>
  </head>
  <body>
    <noscript>This page contains webassembly and javascript content, please enable javascript in your browser.</noscript>
    <script src="./bootstrap.js"></script>
    <p id="label"></p>
  </body>
</html>

おなじディレクトリにあるindex.jsの内容は次のようにします。

import * as ws_client from "ws_client";

// 文字列を受け取って、HTMLの内容を更新する
function on_data_loaded_callback(data) {
    var label = document.getElementById("label");
    label.textContent = data;
}

// set_callbackはRust側で定義した関数
// ここにon_data_loaded_callbackを渡す
ws_client.set_callback(on_data_loaded_callback);

実行結果

ws-server以下でサーバを立ち上げます。

cargo run

ws-clientでは、wasmにビルドした後にnpmでWebサーバを立ち上げます。

wasm-pack build
cd www
npm start

ブラウザでhttp://localhost:8080/にアクセスすると、次の画像のようにUNIX時間となにかの乱数が表示されるはずです。サーバで生成された値を受け取っていることを確認できました。

f:id:aptpod_tech-writer:20191129102315p:plain

ちなみに、今回のwasmファイルのサイズは187KB、最適化後は139KBでした。thread_localを使うため#![no_std]を指定していないとはいえ、それなりのサイズになりました。

考察

WebAssemblyを使用する利点としてよくあげられるのは速度面です。そして、今回のように検討では受け取ったバイナリのエンコード部分をwasmにしています。これで高速になったり、CPU負荷が小さくなるのでしょうか。それはJavaScriptで同等の実装を作成して比較しなければわかりませんが、あまり今回の構成が優位ではない可能性があります。RustとJavaScriptをどうやってリンクさせているかはwasm-bindgenのマニュアルに書いてありますが、RustとJavaScript間で関数を呼び出しあったりオブジェクトを変換するのはそれなりにコストがかかります。今回のバイナリをデコードする程度であれば、JavaScriptで完結させた方が良い可能性もあります。

ただ、ブラウザ以外のネイティブアプリに載せるクライアントやサーバ側もRustで書くのであれば、送受信データのデコード、エンコード、データ定義等を共通化できる利点もあります。実際今回のコードでは、MyData構造体をサーバ/クライアント側両方で扱っています。wasmで実行する場合でも、もっと重い処理を挟むようならRustで書けば最適化できます。さらに将来的にはWASIを使うことで、JavaScriptのAPIに依存することなくwasmでアプリケーションが書ける可能性もあります。

今後の展望

今回の検討でWebAssembly周りのエコシステムについて色々調べましたが、Rustのサポートは妙に充実しています。今回使用したwasm-bindgenにより、かなりシームレスにRustとJavaScriptのやりとりができるようになっています。また、WASIを実装するwasmtimeはRustで書かれているようです。WebAssemblyに力を入れているMozillaがRustの元祖開発元であることもあり、今後WebAssembly周りでRustの利用が増えていくのではないかと個人的には予想しています。

まとめ

  • Rustを使ってWebSocketによるサーバ→クライアント(Webブラウザ)の通信ができた
  • JSONよりコンパクトなCBORを送ることができた。エンコード、デコードも簡単
  • WebAssembly関連でRustの利用が増えてくはず