ROSの任意トピックをC++ノードでPublish/Subscribeする方法

f:id:aptpod_tech-writer:20200804192909j:plain

はじめに

製品開発グループの野本です。
組込ソフトウェアエンジニアとしてデータ収集用端末のソフトウェア開発を担当しています。 今回はROSの取り組みの一環として、C++で任意のトピックをPublish/Subscribeする方法についてご紹介します。

背景

ROSは複数のノードがトピックを介してノード間通信を行っています。
ROSノードとROS以外のアプリケーションが通信する選択肢として、rosbridgeがあります。

f:id:aptpod_tech-writer:20200731115809p:plain

rosbridgeはWebSocket、TCP、UDPに対応*1しており、TCPを使用する場合はrosbridge_tcpノードがTCPサーバーとして動作します。クライアントは指定したトピックのデータをJSONとしてPublish/Subscribeすることができますが、以下のような懸念点があります。

  • JSON化するとデータが肥大化しやすい、データをそのまま扱う場合は変換処理が冗長(→生データのまま扱いたい...)
  • pythonで動いておりパフォーマンスが不安(→C++で動かしたい...)

これらの懸念点を解決するため、以下のようにC++ノードで動作し、JSONではなくバイナリの生データで任意トピックのPublish/Subscribeをする方法について調査しました。

f:id:aptpod_tech-writer:20200731120627p:plain

調査結果

データ型にtopic_tools::ShapeShifterを利用することで、C++で任意トピックを生データのままPublish/Subscribeすることができました。具体的には以下のように、ShapeShifter型のPublisherとSubscriberを利用します。

Subscribe

// コールバック関数
void topicCallback(const ShapeShifter::ConstPtr& topic_msg) {

    // トピック情報の取得
    const std::string& md5sum = topic_msg->getMD5Sum();
    const std::string& datatype = topic_msg->getDataType();
    const std::string& definition = topic_msg->getMessageDefinition();
    const uint32_t topic_msg_size = topic_msg->size();

    // 生データ(バイナリ)の取得
    std::vector<uint8_t> data;
    data.resize(topic_msg_size);

    ros::serialization::OStream stream(data.data(), topic_msg_size);
    topic_msg->write(stream);

    // 取り出したデータをプロセス間通信等でROS空間外に送信する
    ....
}

int main(int argc, char** argv) {
    ...
    const std::string topic_name = "/topic_name";
    ros::Subscriber sub = nh.subscribe<const topic_tools::ShapeShifter::ConstPtr&>(topic_name, 10, topicCallback);
    ...
}

参考:How to create a generic Topic Subscriber

Advertise, Publish

int main(int argc, char** argv) {
    ...
    // ShapeShifter設定
    std::string topic_name;
    std::string datatype;
    std::string md5sum;
    std::string definition;

    // 上記stringをPublishしたい情報に合わせて設定する(記述省略)
    // Subscribe側で取得したデータを設定するとそのままPublishできる

    topic_tools::ShapeShifter shape_shifter;
    shape_shifter.morph(md5sum, datatype, definition, "");
    
    ros::Publisher pub = shape_shifter.advertise(nh, topic_name, 100);

    // データ設定
    uint8_t* data;
    uint32_t data_size;

    // data, data_sizeを設定する(記述省略)

    ros::serialization::OStream stream(data, data_size);
    shape_shifter.read(stream);
    pub.publish(shape_shifter);
    ...
}

参考:Generic ROS publisher using ShapeShifter

Subscribe側で取得したデータ(md5sum、datatype、definition、data)をプロセス間通信でROS空間から取り出し、ネットワークで遠隔のロボットに転送してPublishする、といったことがC++ノード&生バイナリデータで実現可能です。

性能測定

測定環境
PC: ThinkPad X1 Carbon (7th Gen)
ROS/OS: Melodic / Ubuntu 18.04

データサイズ
rosbridgeと今回の手法で、こちらの動画データ(640x360、1280x720)をPublishした際の1メッセージのデータサイズを比較しました。

640x360

手法 1メッセージのデータサイズ(Byte)
rosbridge 921857
今回の手法 693372

1280x720

手法 1メッセージのデータサイズ(Byte)
rosbridge 3686658
今回の手法 2766972

rosbridgeと比較して約25%のデータ量を削減できていることがわかりました。

今回の手法では、トピック名およびSubscribe側で取得したデータ全て(md5sum、datatype、definition、data)を転送しています。md5sum、datatype、definitionはAdvertiseする際に必要なデータで、一度Advertiseした後は不要です。これらを省くことでさらにデータ量を削減できそうです。

メッセージ到達時間

rosbridgeと今回の手法で上記と同じ動画データ(1280x720)を15秒程度Publishし、受信側アプリケーションでのメッセージ到達時間*2を比較しました。
データの読み込み方法は、rosbridgeではソケット、今回の手法ではパイプを利用しました。

手法 到達時間 [sec]
rosbridge 22353.21254657
今回の手法 22353.30257833

rosbridgeと比較して約100msメッセージの到達時間が早い結果となりました。

C++ノードではプロセス間でのメモリコピーを防ぎ高速化するnodeletが使用できます。今回はnodeletを使っていないため、nodeletを使うことでさらに速度が早くなりそうです。 なお、rosbridgeはJSONデータ受信後にJSONのパース処理が必要なため、実際はこの結果以上に処理時間が必要となります。

topic_tools::ShapeShifterとは?

topic_tools::ShapeShifterは、rosbag*3でトピックを記録(record)する際に使われているデータ型です。 rosbagが任意のトピックをSubscribeできるのはこのデータ型のおかげです。

(参考) rosbagはどうやって任意のトピックをSubscribeしているのか?

通常のROSプログラムと同じようにROS::Subscriberを使ってトピックをSubscribeしています。

rosbagのSubscribe設定について

ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);

ROSチュートリアルでは上記のように、引数でオプションを設定する使い方をしていますが、rosbagではこのような方法ではなく、ros::SubscribeOptionsにオプションを設定して引数で渡すこちらの方法を使っています。

具体的にはRecorder::subscribe()でSubscriberを初期化しており、helperの部分を見るとdoQueue()をコールバック関数として設定していることがわかります。

ros::SubscribeOptions ops;

ops.topic = topic;
ops.queue_size = 100;
ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT<
    const ros::MessageEvent<topic_tools::ShapeShifter const> &> >(
        boost::bind(&Recorder::doQueue, this, _1, topic, sub, count));      <-- doQueue()をコールバックに設定
ops.transport_hints = options_.transport_hints;
*sub = nh.subscribe(ops);

上記設定やdoQueue()の第一引数を見ると、topic_tools::ShapeShifterという型でやり取りしていることがわかります。

void Recorder::doQueue(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count)

JSON変換もしたい場合

ShapeShifterのコールバック関数内で渡されたデータをJSONに変換したい場合、ros_type_introspectionを使うことで変換することができます。
※ 現在、Noeticではros_type_introspectionがサポートされていないようで、後継のros_msg_parserを使う必要があります。

まとめ

今回は、C++で任意のトピックをPublish/Subscribeする方法についてご紹介しました。 ShapeShifterに関する日本語の情報はあまりありませんでしたので、ROS開発の一助となれば幸いです。

*1:rosbridge_serverのlaunchファイル

*2:システム起動時からの経過時間(MONOTONIC_RAW)を使って計測

*3:rosbag: 指定したトピックのデータをファイルに記録することや、ファイルに保存したトピックのデータを再生することができるツール。