intdashにリアルタイムでデータを登録したいみなさん、
こんにちは。ソリューションアーキテクトの伊勢です。
収集データをリアルタイムで可視化・確認したいニーズが増えています。
今回は収集データをリアルタイムに加工・登録する方法をご紹介します。
はじめに
今回の対象はリアルタイムAPIへのデータ送信です。
前提
今回のサンプルプログラムで以下のライブラリを使います。1
- YOLO
- OpenCV
- Gstreamer
- PyGObject
YOLOとは
YOLO(You Only Look Once)はディープラーニングモデルです。
画像に写っている物体がどこにあり、何なのかを高速に認識できるのが特徴です。
今回使うのは物体検出の精度と速度を向上させたモデルであるYOLOv4の軽量版YOLOv4-tinyです。
OpenCVとは
画像の読み込み・表示・編集など画像処理のためののオープンソースライブラリです。
今回はOpenCVからYOLOv4-tinyモデルを呼び出し、矩形を描画します。
Gstreamerとは
動画・音声の変換などを行えるオープンソースのメディアフレームワークです。
前回利用したFFmpegよりもリアルタイム処理に向いています。
今回はH.264データのデコード・エンコードに利用します。
PyGObjectとは
Pythonから、C言語ライブラリで使用されるオブジェクト指向プログラミングのフレームワークGObjectを利用するためのライブラリです。
今回は、GObjectを基盤として構築されたGstreamerを制御するのに利用します。
インストール
クライアントライブラリ
クライアントライブラリは前回インストールしたパッケージを利用します。
サンプルプログラム用ライブラリ
利用ライブラリをインストールします。
Gstreamerインストール
Gstreamerをインストールします。バージョン、カラーバー表示を確認します。2
brew install gstreamer gst-launch-1.0 --version gst-launch-1.0 videotestsrc ! autovideosink
Pythonパッケージインストール
Pythonパッケージをインストールします。
- opencv-python: OpenCVのPythonラッパー
- numpy: 数値計算や配列操作のためのPythonライブラリ、画像編集に利用
- PyGObject: GObjectのPythonラッパー
pip install opencv-python numpy PyGObject
やってみた
映像をリアルタイムで物体検出します。
全体構成
H.264データをダウンストリーム・加工してintdashにアップストリームします。
H.264データが可視化されるにはサーバーまでを2往復することになります。
違和感を感じさせないためには低遅延であることが重要です。
実行結果
サンプルプログラムを起動します。
python lesson4/src/detect_people.py --api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --edge_uuid <YOUR_EDGE_UUID>
intdash Motion V2でデータ収集を開始します。
- Video
- Data Type:
h264_frame
- Data Name:
1/h264
- Data Type:
せっかくなので検出人数が多いところで撮影しました。
- 検出物体の人を緑、他を赤にして矩形描画したH.264データをアップストリーム
- 検出人数もアップストリーム
- 利用SIMではLIVE再生だと元の映像含めて通信が不安定
- 元データと経過時間が同じにしているため、過去再生では完全に時刻同期
サンプルプログラム
構成が少し複雑になったので図示しました。
フレームのデコード・物体検出・エンコードをスムーズに行うために非同期処理にしています。
- Gstreamerパイプラインとキューを使って非同期処理間でデータ授受
- 新規計測として検出結果(矩形描画映像、検出人数)をアップストリーム
計測作成
アップストリーム先の計測をREST APIで作成します。
作成後に計測UUIDをアップストリーム処理に渡します。
measurement = self.writer.create_measurement("Created by DetectService") await self.upstreamer.open(measurement.uuid)
リアルタイムAPI接続
アップストリームのためにノードID(エッジUUID)を指定します。
帯域が混み合っていたため、
ping_timeout
: Pingタイムアウト(秒)
を延ばしました(デフォルト1秒)。
api_url_parsed = urllib.parse.urlparse(api_url) conn = await iscp.Conn.connect( address=f"{api_url_parsed.hostname}:{api_port}", connector=iscp.WebSocketConnector(enable_tls=True), token_source=lambda: api_token, project_uuid=project_uuid, node_id=edge_uuid, ping_timeout=ping_timeout, )
データポイント受信
起動後はダウンストリームでデータポイントを待ち受けます。
一定時間データポイントが発生しない場合はTimeoutError
をraiseします。
async for msg in self.down.chunks(timeout=timeout):
H.264アップストリーム
Gstreamerから取得したフレームがIDRフレームかNon IDRフレームかを判定します。
H.264を規定するISO/IEC 14496-10のAnnex Bにおいて定義されているバイトストリームフォーマットのNAL Unit Typeが
- SPS (nal_type:7)
- Sequence Parameter Set:シーケンス全体に関するパラメータ情報
- PPS (nal_type:8)
- Picture Parameter Set:個々のフレームまたはグループオブフレーム(GOF)に関するパラメータ情報
- IDR (nal_type:5)
が順序通りに存在する場合にIDRフレームと判断します。
# IDRフレームの順序判定 if nalu_type == 7: sps_found = True elif nalu_type == 8 and sps_found: pps_found = True elif nalu_type == 5 and sps_found and pps_found: return True
なお、エンコードしたH.264のIDRフレームの頻度はGstreamerパイプラインの
""" key-int-max={key_int_max} """
で指定しています。
Convertor
Gstreamerパイプラインをラップするクラスです。
push
メソッドで1フレームを与えて、get
で取り出します。
Gstreamerがバッファするため、getでは非同期に読み出して返します。
while True: sample = await asyncio.to_thread(self.sink.emit, "pull-sample")
Detector
物体検出クラスです。
特定の物体検出結果(人物)の矩形色変更、検出人数カウントをします。
if self.class_names[class_ids[i]] == "person": color = (0, 255, 0) count = count + 1 else: color = (0, 0, 255)
タイムアウトで完了
検出結果は1つの計測に格納されますが、元計測が複数あっても構いません。
TimeoutError
が発生したら場合は計測を完了します。
except TimeoutError: self.writer.complete_measurement(measurement.uuid)
おわりに
今回はリアルタイムAPIのデータ送信をご紹介しました。
REST APIの計測作成も含めた集大成の内容でした。
これでひと通りの機能を網羅して入門シリーズもひと区切りです。
今後はより実用向けの内容も紹介していきたいと考えています。