はじめに
製品開発グループの野本です。
組込ソフトウェアエンジニアとしてデータ収集用端末のソフトウェア開発を担当しています。
今回はROSの取り組みの一環として、C++で任意のトピックをPublish/Subscribeする方法についてご紹介します。
- はじめに
- 背景
- 調査結果
- 性能測定
- topic_tools::ShapeShifterとは?
- (参考) rosbagはどうやって任意のトピックをSubscribeしているのか?
- JSON変換もしたい場合
- まとめ
背景
ROSは複数のノードがトピックを介してノード間通信を行っています。
ROSノードとROS以外のアプリケーションが通信する選択肢として、rosbridgeがあります。
rosbridgeはWebSocket、TCP、UDPに対応*1しており、TCPを使用する場合はrosbridge_tcpノードがTCPサーバーとして動作します。クライアントは指定したトピックのデータをJSONとしてPublish/Subscribeすることができますが、以下のような懸念点があります。
- JSON化するとデータが肥大化しやすい、データをそのまま扱う場合は変換処理が冗長(→生データのまま扱いたい...)
- pythonで動いておりパフォーマンスが不安(→C++で動かしたい...)
これらの懸念点を解決するため、以下のようにC++ノードで動作し、JSONではなくバイナリの生データで任意トピックのPublish/Subscribeをする方法について調査しました。
調査結果
データ型にtopic_tools::ShapeShifter
を利用することで、C++で任意トピックを生データのままPublish/Subscribeすることができました。具体的には以下のように、ShapeShifter型のPublisherとSubscriberを利用します。
Subscribe
// コールバック関数 void topicCallback(const ShapeShifter::ConstPtr& topic_msg) { // トピック情報の取得 const std::string& md5sum = topic_msg->getMD5Sum(); const std::string& datatype = topic_msg->getDataType(); const std::string& definition = topic_msg->getMessageDefinition(); const uint32_t topic_msg_size = topic_msg->size(); // 生データ(バイナリ)の取得 std::vector<uint8_t> data; data.resize(topic_msg_size); ros::serialization::OStream stream(data.data(), topic_msg_size); topic_msg->write(stream); // 取り出したデータをプロセス間通信等でROS空間外に送信する .... } int main(int argc, char** argv) { ... const std::string topic_name = "/topic_name"; ros::Subscriber sub = nh.subscribe<const topic_tools::ShapeShifter::ConstPtr&>(topic_name, 10, topicCallback); ... }
参考:How to create a generic Topic Subscriber
Advertise, Publish
int main(int argc, char** argv) { ... // ShapeShifter設定 std::string topic_name; std::string datatype; std::string md5sum; std::string definition; // 上記stringをPublishしたい情報に合わせて設定する(記述省略) // Subscribe側で取得したデータを設定するとそのままPublishできる topic_tools::ShapeShifter shape_shifter; shape_shifter.morph(md5sum, datatype, definition, ""); ros::Publisher pub = shape_shifter.advertise(nh, topic_name, 100); // データ設定 uint8_t* data; uint32_t data_size; // data, data_sizeを設定する(記述省略) ros::serialization::OStream stream(data, data_size); shape_shifter.read(stream); pub.publish(shape_shifter); ... }
参考:Generic ROS publisher using ShapeShifter
Subscribe側で取得したデータ(md5sum、datatype、definition、data)をプロセス間通信でROS空間から取り出し、ネットワークで遠隔のロボットに転送してPublishする、といったことがC++ノード&生バイナリデータで実現可能です。
性能測定
測定環境
PC: ThinkPad X1 Carbon (7th Gen)
ROS/OS: Melodic / Ubuntu 18.04
データサイズ
rosbridgeと今回の手法で、こちらの動画データ(640x360、1280x720)をPublishした際の1メッセージのデータサイズを比較しました。
640x360
手法 | 1メッセージのデータサイズ(Byte) |
---|---|
rosbridge | 921857 |
今回の手法 | 693372 |
1280x720
手法 | 1メッセージのデータサイズ(Byte) |
---|---|
rosbridge | 3686658 |
今回の手法 | 2766972 |
rosbridgeと比較して約25%のデータ量を削減できていることがわかりました。
今回の手法では、トピック名およびSubscribe側で取得したデータ全て(md5sum、datatype、definition、data)を転送しています。md5sum、datatype、definitionはAdvertiseする際に必要なデータで、一度Advertiseした後は不要です。これらを省くことでさらにデータ量を削減できそうです。
メッセージ到達時間
rosbridgeと今回の手法で上記と同じ動画データ(1280x720)を15秒程度Publishし、受信側アプリケーションでのメッセージ到達時間*2を比較しました。
データの読み込み方法は、rosbridgeではソケット、今回の手法ではパイプを利用しました。
手法 | 到達時間 [sec] |
---|---|
rosbridge | 22353.21254657 |
今回の手法 | 22353.30257833 |
rosbridgeと比較して約100msメッセージの到達時間が早い結果となりました。
C++ノードではプロセス間でのメモリコピーを防ぎ高速化するnodeletが使用できます。今回はnodeletを使っていないため、nodeletを使うことでさらに速度が早くなりそうです。 なお、rosbridgeはJSONデータ受信後にJSONのパース処理が必要なため、実際はこの結果以上に処理時間が必要となります。
topic_tools::ShapeShifterとは?
topic_tools::ShapeShifterは、rosbag*3でトピックを記録(record)する際に使われているデータ型です。 rosbagが任意のトピックをSubscribeできるのはこのデータ型のおかげです。
(参考) rosbagはどうやって任意のトピックをSubscribeしているのか?
通常のROSプログラムと同じようにROS::Subscriber
を使ってトピックをSubscribeしています。
rosbagのSubscribe設定について
ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);
ROSチュートリアルでは上記のように、引数でオプションを設定する使い方をしていますが、rosbagではこのような方法ではなく、ros::SubscribeOptions
にオプションを設定して引数で渡すこちらの方法を使っています。
具体的にはRecorder::subscribe()でSubscriberを初期化しており、helper
の部分を見るとdoQueue()をコールバック関数として設定していることがわかります。
ros::SubscribeOptions ops; ops.topic = topic; ops.queue_size = 100; ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>(); ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>(); ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT< const ros::MessageEvent<topic_tools::ShapeShifter const> &> >( boost::bind(&Recorder::doQueue, this, _1, topic, sub, count)); <-- doQueue()をコールバックに設定 ops.transport_hints = options_.transport_hints; *sub = nh.subscribe(ops);
上記設定やdoQueue()
の第一引数を見ると、topic_tools::ShapeShifter
という型でやり取りしていることがわかります。
void Recorder::doQueue(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event, string const& topic, shared_ptr<ros::Subscriber> subscriber, shared_ptr<int> count)
JSON変換もしたい場合
ShapeShifterのコールバック関数内で渡されたデータをJSONに変換したい場合、ros_type_introspectionを使うことで変換することができます。
※ 現在、Noeticではros_type_introspectionがサポートされていないようで、後継のros_msg_parserを使う必要があります。
まとめ
今回は、C++で任意のトピックをPublish/Subscribeする方法についてご紹介しました。 ShapeShifterに関する日本語の情報はあまりありませんでしたので、ROS開発の一助となれば幸いです。
*1:rosbridge_serverのlaunchファイル
*2:システム起動時からの経過時間(MONOTONIC_RAW)を使って計測
*3:rosbag: 指定したトピックのデータをファイルに記録することや、ファイルに保存したトピックのデータを再生することができるツール。