aptpod Tech Blog

株式会社アプトポッドのテクノロジーブログです

SDK入門⑦〜計測リプレイツールの作り方〜

既存の計測データをアップストリームしたいみなさん、

こんにちは。ソリューションアーキテクトの伊勢です。

システム開発の現場では、一度取得した計測データを再利用して送信したい場面があります。

今回はintdashの計測データを "リプレイ" する方法をご紹介します。

はじめに

開発中にエッジの載っているモビリティをいつでも自由に動かせるとは限りません。

  • 他の作業と利用が重なる
  • 計測には複数名体制の準備が必要
  • 利用施設の申請が必要
  • 偶然得られたデータが再現できない

また、本番データを検証環境でデバッグしたいケースもあります。

そんなときに、取り溜めた計測データを手軽にリアルタイム送信できれば、開発効率が高まります。

再び、intdash API/SDKとは

本シリーズではこれまで、intdash APIを REST API と リアルタイムAPI に分けて説明してきました。

このうち、REST APIは一般的なHTTPリクエスト・レスポンス形式の通信方式です。

intdash SDK

REST APIで唯一、大容量データ取得に特化した List Data Points エンドポイントは、チャンク転送エンコーディングをサポートしています。

これは、大きなデータを効率よく転送し、クライアント側のメモリ消費を抑えるための設計です。

List Data Pontsエンドポイントのレスポンス定義

チャンク転送エンコーディング

サーバーがコンテンツの全体のサイズを事前に決定できない場合や、大容量データ転送の効率化に使用されるデータ転送方式です。

通常、HTTPレスポンスの Content-Length ヘッダには、クライアントが受信すべきデータの総バイト数が記載されます。

しかし、動的に生成されるコンテンツや巨大データのようにデータサイズが事前に不明な場合、Content-Length を指定するのが難しくなります。

そこで Transfer-Encoding: chunked ヘッダを指定して、データをチャンク(塊)に分けて順次送信する方式が使われます。

これには以下の利点があります。

  • サーバーは全データを用意する前にレスポンスを開始できる
  • クライアントはメモリを節約しつつデータを逐次処理できる

この方式は、ストリーミングや大容量データのダウンロードに適しています。1

List Data Points のレスポンス確認

レスポンスヘッダを確認してみます。

curlコマンドで List Data Points エンドポイントを叩きます。

HTTP1.1を明示的に指定しています。2

export API_URL=<YOUR_API_URL>
export API_TOKEN=<YOUR_API_TOKEN>
export PROJECT_UUID=<YOUR_PROJECT_UUID>
export MEAS_UUID=<YOUR_MEAS_UUID>
curl -I -X GET "${API_URL}/api/v1/projects/${PROJECT_UUID}/data?name=${MEAS_UUID}" -H "X-Intdash-Token: ${API_TOKEN}" --http1.1

レスポンスヘッダに Transfer-Encoding: chunked が含まれていることが確認できます。

List Data PointsのHTTPレスポンス

一方、他のエンドポイントでは Content-Length が使用されています。

curl -I -X GET "${API_URL}/api/v1/projects/${PROJECT_UUID}/measurements/${MEAS_UUID}" -H "X-Intdash-Token: ${API_TOKEN}" --http1.1

List Data Points以外のHTTPレスポンス

今回紹介するサンプルプログラム3では、このチャンク転送エンコーディングを活用して、大容量の計測データ取得時にメモリを節約します。

やってみた

インストール

SDK入門③まででインストールしたパッケージを利用します。

加えて、メモリ使用量を表示するため、システム監視ライブラリを導入します。

pip install psutil

全体構成

A環境に保存された計測データをB環境に再度アップストリームします。

全体構成図

対象計測

データ収集に労力のかかる例として長時間の計測を使います。

荷物搬送のためカーフェリー移動した北海道出張

さんふらわあ しれとこ は2025年に引退が決まっているベテラン船で、客室はすべて年季が入ったベット4つの相部屋です。 www.sunflower.co.jp

当社製 EDGEPLANT T1 でGPSデータを収集しました。

客室ベッドでT1と添い寝

茨城県大洗港〜北海道苫小牧港は約302海里

乗船〜下船の20時間26分32秒

リプレイ開始

20時間超の計測を等倍で再生するのは辛いので早送りします。

再生スピード調整機能を使って60倍速でアップストリームします。


起動すると、A環境からREST APIからの計測データ取得と、B環境へのアップストリームが始まります。

リプレイ開始

VM2M Data Visualizer のLIVE再生で位置情報が60倍速で更新されていきます。

津軽海峡冬景色

Edge Finderでリアルタイムデータを確認

再生が完了すると、ちょうど時間が1/60の計測が作成されています。

20分26秒に短縮

起動オプション

今回は、データ取得元を計測UUIDで指定して、別サーバー環境に、再生スピード指定でアップストリームしました。

他にも以下のオプションがあります。

データ取得元オプション

取得元の時間範囲として、計測、またはエッジ・開始時刻・終了時刻を指定します。

  • --api_url requried:サーバーURL
  • --api_token requried: APIトークン
  • --project_uuid:プロジェクト、省略時はGlobal Project
  • いずれか requried
    • 計測指定:
      • --meas_uuid:計測
    • エッジ指定:
      • --edge_uuid:エッジ
      • --start:開始時刻、RFC3339形式
      • --end:終了時刻、RFC3339形式
  • --data_id_filter:データIDフィルター、カンマ区切りで複数指定
    • <データ型名>:<データ名>, <データ型名>:<データ名>, ...
データ送信先オプション

プロジェクト以外は、省略時はデータ取得元と同じになります。

  • --dst_api_url:サーバーURL
  • --dst_api_token: APIトークン
  • --dst_project_uuid:プロジェクト、省略時はGlobal Project
  • --dst_edge_uuid:エッジ
再生オプション
  • --speed:再生スピード、省略時は等倍(1.0)

サンプルプログラム説明

APIアクセス部分を中心に説明します。

クラスアーキテクチャ

2つの処理を非同期で実行しています。

  • Feed:REST APIから受け取ったデータポイントをキューに登録します。
    • メモリ消費量を抑えるため、キューが一杯だったら、空くまで待機します。
  • Fetch:キューからデータポイントを取り出して、アップストリームします。
    • 元の計測と同じ頻度でデータ送信するため、経過時間分スリープします。
      • 再生スピードを指定すると スリープ時間をその分調整します。

データポイント取得

メモリ消費低減のため、_preload_contentFalseにして随時読み込みます。

        api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(
            self.client
        )
        params = {
            "project_uuid": self.project_uuid,
            "name": self.meas_uuid if self.meas_uuid else self.edge_uuid,
            "time_format": "ns",
            "_preload_content": False,  # 全データロードの抑止
        }
        if self.start:
            params["start"] = self.start
        if self.end:
            params["end"] = self.end
        if self.data_id_filter:
            params["data_id_filter"] = self.data_id_filter
        stream = api.list_project_data_points(**params)

時刻変換

データ送信を待機するため、取得したデータポイントの絶対時刻から経過時間を算出します。

        basetime_ns = int(basetime.timestamp() * 1_000_000) * 1_000
...
            elapsed_time = tuple[0] - basetime_ns

再生スピード調整

算出した経過時間分スリープします。

            elapsed_time_speed = int(elapsed_time / self.speed)
            sleep_time = (
                basetime_ns + elapsed_time_speed - iscp.DateTime.utcnow().unix_nano()
            ) / 1_000_000_000

            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

送信するデータポイントの経過時間は、現在時刻と新たな計測の基準時間との差分です。

            elapsed_time_replay = (
                iscp.DateTime.utcnow().unix_nano() - start_time.unix_nano()
            )
            await self.upstreamer.send(elapsed_time_replay, type, name, data)

メモリ使用量表示

チャンク転送エンコーディングによりメモリ消費が抑えられることを確認するため、使用量をログ表示しています。

        process = psutil.Process()
        mem_info = process.memory_info()
        logging.info(f"Memory Usage: {mem_info.rss / 1024 / 1024:.2f} MB")

メモリ使用量ログ

おおよそ55MBで推移しています。

試しに_preload_contentオプションを外して(デフォルトTrueで)実行すると、ストリーム開始まで10秒ほどかかり、600〜700MBほど消費します。

一括取得で実行

サンプルプログラム修正

本シリーズで以前、REST APIで計測データをエクスポート/インポートするサンプルプログラムを紹介しました。

tech.aptpod.co.jp

こちらは、計測データ量に依存してメモリ消費が増えてしまうため、長時間計測には不向きでした。

今回、同じように_preload_contentオプションを指定して、メモリ消費量を抑えるようした修正版を追加しました。4

エクスポート

List Data Pointのオプション指定を変更しました。

    api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(client)
    params = {
        "project_uuid": project_uuid,
        "name": meas_uuid,
        "time_format": "ns",
        "_preload_content": False,
    }
    stream = api.list_project_data_points(**params)


一定量ずつ取得した計測データをバッファに取っておいて、1行ずつ順次返すように変更しました。

    # バッファ格納
    buffer = b""
    while True:
        chunk = stream.read(chunk_size)
        if not chunk:
            break
        buffer += chunk

        # JSON Line切り出し
        while b"\n" in buffer:
            line, buffer = buffer.split(b"\n", 1)
            line_json = json.loads(line.decode())
            if "data" not in line_json:
                continue
            if "d" not in line_json["data"]:
                continue
            yield line_json


出力ファイルがJSONだとインデント処理が煩雑になるため、JSON Linesに変更しました。

修正前フォーマット:JSON

{
   "measurement": <計測オブジェクト>,
   "basetimes": [<基準時刻>, <基準時刻>, ..],
   "datapoints": [<データポイント>, <データポイント>, ..]
}

修正後フォーマット:JSON Lines

{"measurement": <計測オブジェクト>}
{"basetime": <基準時刻>}
{"basetime": <基準時刻>}
...
{"datapoint": <データポイント>}
{"datapoint": <データポイント>}
...


さきほどのGPS計測をエクスポートすると以下のようになります。

修正前は1.4GBほどメモリを消費しました。5

python lesson2/migrate/src/meas_export.py --api_url <YOUR_API_URL> --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>

エクスポート修正前:一括取得版

修正後は46MBほどになりました。

python lesson2/migrate/src/meas_export_mem.py --api_url <YOUR_API_URL> --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>

エクスポート修正後:随時取得版

インポート

あわせて、JSON Linesファイルを随時読み出すようにインポートを修正しました。

    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            try:
                yield json.loads(line, object_hook=measurement_decoder)
            except json.JSONDecodeError as e:
                logging.warning(f"JSON decode error: {e}")


CHUNK_SIZEずつ送信するよう制御しています。

                buffer.append(entry["datapoint"])
                if len(buffer) >= CHUNK_SIZE:
                    sequence_number = send_chunks(
                        client,
                        project_uuid,
                        measurement_obj.uuid,
                        measurement["basetime"],
                        sequence_uuid,
                        buffer,
                        sequence_number,
                    )
                    buffer.clear()


エクスポートしたファイルのインポート結果です。

修正前は2.5GBほどメモリを消費して途中で止まりました。

python lesson2/migrate/src/meas_import.py --api_url <YOUR_API_URL> --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --edge_uuid <YOUR_EDGE_UUID> --src_file <YOUR_JSON_FILE>

インポート修正前:一括読み出し版

修正後は50MB未満に収まりました。

python lesson2/migrate/src/meas_import_mem.py --api_url <YOUR_API_URL> --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --edge_uuid <YOUR_EDGE_UUID> --src_file <YOUR_JSONL_FILE>

インポート修正後:随時み出し版

おわりに

今回は開発で利用できるツールの一例を紹介しました。

実データを元に繰り返し送信したり、別環境で再現できると検証の質が向上します。

このようにREST/リアルタイムAPIを組みあわせると自由度の高い制御も可能です。

リンク

本シリーズの過去記事はこちらからご覧ください。


  1. 同じ"ストリーミング"と呼んでいますが、チャンク転送エンコーディングとintdashのリアルタイムAPI(iSCP)は別のプロトコルです。
  2. HTTP2以降ではTransfer-Encodingは省略されうるためです。
  3. GitHubで公開しています。
  4. GitHubのlesson2に反映してあります。
  5. JSONフォーマットを一時保持するメモリ消費が大半です。