aptpod Tech Blog

株式会社アプトポッドのテクノロジーブログです

SDK入門④〜YOLOで物体検知しちゃう〜

intdashにリアルタイムでデータを登録したいみなさん、

こんにちは。ソリューションアーキテクトの伊勢です。

収集データをリアルタイムで可視化・確認したいニーズが増えています。

今回は収集データをリアルタイムに加工・登録する方法をご紹介します。

はじめに

今回の対象はリアルタイムAPIへのデータ送信です。

intdash SDK

前提

今回のサンプルプログラムで以下のライブラリを使います。1

  • YOLO
  • OpenCV
  • Gstreamer
  • PyGObject
YOLOとは

YOLO(You Only Look Once)はディープラーニングモデルです。

画像に写っている物体がどこにあり、何なのかを高速に認識できるのが特徴です。

ai-market.jp

今回使うのは物体検出の精度と速度を向上させたモデルであるYOLOv4の軽量版YOLOv4-tinyです。

OpenCVとは

画像の読み込み・表示・編集など画像処理のためののオープンソースライブラリです。

今回はOpenCVからYOLOv4-tinyモデルを呼び出し、矩形を描画します。

Gstreamerとは

動画・音声の変換などを行えるオープンソースのメディアフレームワークです。

前回利用したFFmpegよりもリアルタイム処理に向いています。

今回はH.264データのデコード・エンコードに利用します。

PyGObjectとは

Pythonから、C言語ライブラリで使用されるオブジェクト指向プログラミングのフレームワークGObjectを利用するためのライブラリです。

今回は、GObjectを基盤として構築されたGstreamerを制御するのに利用します。

インストール

クライアントライブラリ

クライアントライブラリは前回インストールしたパッケージを利用します。

サンプルプログラム用ライブラリ

利用ライブラリをインストールします。

Gstreamerインストール

Gstreamerをインストールします。バージョン、カラーバー表示を確認します。2

brew install gstreamer
gst-launch-1.0 --version
gst-launch-1.0 videotestsrc ! autovideosink

Gstreamerインストール

Gstreamerカラーバー表示

Pythonパッケージインストール

Pythonパッケージをインストールします。

  • opencv-python: OpenCVのPythonラッパー
  • numpy: 数値計算や配列操作のためのPythonライブラリ、画像編集に利用
  • PyGObject: GObjectのPythonラッパー
pip install opencv-python numpy PyGObject

Pythonパッケージのインストール

やってみた

映像をリアルタイムで物体検出します。

全体構成

H.264データをダウンストリーム・加工してintdashにアップストリームします。

全体構成図

H.264データが可視化されるにはサーバーまでを2往復することになります。

違和感を感じさせないためには低遅延であることが重要です。

実行結果

サンプルプログラムを起動します。

python lesson4/src/detect_people.py  --api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --edge_uuid <YOUR_EDGE_UUID>

intdash Motion V2でデータ収集を開始します。

  • Video
    • Data Type: h264_frame
    • Data Name: 1/h264

せっかくなので検出人数が多いところで撮影しました。

www.youtube.com

  • 検出物体の人を緑、他を赤にして矩形描画したH.264データをアップストリーム
  • 検出人数もアップストリーム
  • 利用SIMではLIVE再生だと元の映像含めて通信が不安定
  • 元データと経過時間が同じにしているため、過去再生では完全に時刻同期

サンプルプログラム

構成が少し複雑になったので図示しました。

クラスアーキテクチャ

フレームのデコード・物体検出・エンコードをスムーズに行うために非同期処理にしています。

  • Gstreamerパイプラインとキューを使って非同期処理間でデータ授受
  • 新規計測として検出結果(矩形描画映像、検出人数)をアップストリーム
計測作成

アップストリーム先の計測をREST APIで作成します。

作成後に計測UUIDをアップストリーム処理に渡します。

            measurement = self.writer.create_measurement("Created by DetectService")

            await self.upstreamer.open(measurement.uuid)
リアルタイムAPI接続

アップストリームのためにノードID(エッジUUID)を指定します。

帯域が混み合っていたため、

  • ping_timeout: Pingタイムアウト(秒)

を延ばしました(デフォルト1秒)。

    api_url_parsed = urllib.parse.urlparse(api_url)
    conn = await iscp.Conn.connect(
        address=f"{api_url_parsed.hostname}:{api_port}",
        connector=iscp.WebSocketConnector(enable_tls=True),
        token_source=lambda: api_token,
        project_uuid=project_uuid,
        node_id=edge_uuid,
        ping_timeout=ping_timeout,
    )
データポイント受信

起動後はダウンストリームでデータポイントを待ち受けます。

一定時間データポイントが発生しない場合はTimeoutErrorをraiseします。

        async for msg in self.down.chunks(timeout=timeout):
H.264アップストリーム

Gstreamerから取得したフレームがIDRフレームかNon IDRフレームかを判定します。

H.264を規定するISO/IEC 14496-10のAnnex Bにおいて定義されているバイトストリームフォーマットのNAL Unit Typeが

  • SPS (nal_type:7)
    • Sequence Parameter Set:シーケンス全体に関するパラメータ情報
  • PPS (nal_type:8)
    • Picture Parameter Set:個々のフレームまたはグループオブフレーム(GOF)に関するパラメータ情報
  • IDR (nal_type:5)

が順序通りに存在する場合にIDRフレームと判断します。

            # IDRフレームの順序判定
            if nalu_type == 7:
                sps_found = True
            elif nalu_type == 8 and sps_found:
                pps_found = True
            elif nalu_type == 5 and sps_found and pps_found:
                return True

なお、エンコードしたH.264のIDRフレームの頻度はGstreamerパイプラインの

"""
key-int-max={key_int_max}
"""

で指定しています。

Convertor

Gstreamerパイプラインをラップするクラスです。

pushメソッドで1フレームを与えて、getで取り出します。

Gstreamerがバッファするため、getでは非同期に読み出して返します。

        while True:
            sample = await asyncio.to_thread(self.sink.emit, "pull-sample")
Detector

物体検出クラスです。

特定の物体検出結果(人物)の矩形色変更、検出人数カウントをします。

            if self.class_names[class_ids[i]] == "person":
                color = (0, 255, 0)
                count = count + 1
            else:
                color = (0, 0, 255)
タイムアウトで完了

検出結果は1つの計測に格納されますが、元計測が複数あっても構いません。

TimeoutErrorが発生したら場合は計測を完了します。

        except TimeoutError:
            self.writer.complete_measurement(measurement.uuid)

おわりに

今回はリアルタイムAPIのデータ送信をご紹介しました。

REST APIの計測作成も含めた集大成の内容でした。

これでひと通りの機能を網羅して入門シリーズもひと区切りです。

今後はより実用向けの内容も紹介していきたいと考えています。


  1. サンプルプログラムはGitHubで公開しています。
  2. 初回の起動は遅いことがあるようです。