既存の計測データをアップストリームしたいみなさん、
こんにちは。ソリューションアーキテクトの伊勢です。
システム開発の現場では、一度取得した計測データを再利用して送信したい場面があります。
今回はintdashの計測データを "リプレイ" する方法をご紹介します。
はじめに
開発中にエッジの載っているモビリティをいつでも自由に動かせるとは限りません。
- 他の作業と利用が重なる
- 計測には複数名体制の準備が必要
- 利用施設の申請が必要
- 偶然得られたデータが再現できない
また、本番データを検証環境でデバッグしたいケースもあります。
そんなときに、取り溜めた計測データを手軽にリアルタイム送信できれば、開発効率が高まります。
再び、intdash API/SDKとは
本シリーズではこれまで、intdash APIを REST API と リアルタイムAPI に分けて説明してきました。
このうち、REST APIは一般的なHTTPリクエスト・レスポンス形式の通信方式です。
REST APIで唯一、大容量データ取得に特化した List Data Points エンドポイントは、チャンク転送エンコーディングをサポートしています。
これは、大きなデータを効率よく転送し、クライアント側のメモリ消費を抑えるための設計です。
チャンク転送エンコーディング
サーバーがコンテンツの全体のサイズを事前に決定できない場合や、大容量データ転送の効率化に使用されるデータ転送方式です。
通常、HTTPレスポンスの Content-Length
ヘッダには、クライアントが受信すべきデータの総バイト数が記載されます。
しかし、動的に生成されるコンテンツや巨大データのようにデータサイズが事前に不明な場合、Content-Length
を指定するのが難しくなります。
そこで Transfer-Encoding: chunked
ヘッダを指定して、データをチャンク(塊)に分けて順次送信する方式が使われます。
これには以下の利点があります。
- サーバーは全データを用意する前にレスポンスを開始できる
- クライアントはメモリを節約しつつデータを逐次処理できる
この方式は、ストリーミングや大容量データのダウンロードに適しています。1
List Data Points のレスポンス確認
レスポンスヘッダを確認してみます。
curlコマンドで List Data Points エンドポイントを叩きます。
HTTP1.1を明示的に指定しています。2
export API_URL=<YOUR_API_URL> export API_TOKEN=<YOUR_API_TOKEN> export PROJECT_UUID=<YOUR_PROJECT_UUID> export MEAS_UUID=<YOUR_MEAS_UUID> curl -I -X GET "${API_URL}/api/v1/projects/${PROJECT_UUID}/data?name=${MEAS_UUID}" -H "X-Intdash-Token: ${API_TOKEN}" --http1.1
レスポンスヘッダに Transfer-Encoding: chunked
が含まれていることが確認できます。
一方、他のエンドポイントでは Content-Length
が使用されています。
curl -I -X GET "${API_URL}/api/v1/projects/${PROJECT_UUID}/measurements/${MEAS_UUID}" -H "X-Intdash-Token: ${API_TOKEN}" --http1.1
今回紹介するサンプルプログラム3では、このチャンク転送エンコーディングを活用して、大容量の計測データ取得時にメモリを節約します。
やってみた
インストール
SDK入門③まででインストールしたパッケージを利用します。
加えて、メモリ使用量を表示するため、システム監視ライブラリを導入します。
pip install psutil
全体構成
A環境に保存された計測データをB環境に再度アップストリームします。
対象計測
データ収集に労力のかかる例として長時間の計測を使います。
さんふらわあ しれとこ は2025年に引退が決まっているベテラン船で、客室はすべて年季が入ったベット4つの相部屋です。 www.sunflower.co.jp
当社製 EDGEPLANT T1 でGPSデータを収集しました。
リプレイ開始
20時間超の計測を等倍で再生するのは辛いので早送りします。
再生スピード調整機能を使って60倍速でアップストリームします。
起動すると、A環境からREST APIからの計測データ取得と、B環境へのアップストリームが始まります。
VM2M Data Visualizer のLIVE再生で位置情報が60倍速で更新されていきます。
再生が完了すると、ちょうど時間が1/60の計測が作成されています。
起動オプション
今回は、データ取得元を計測UUIDで指定して、別サーバー環境に、再生スピード指定でアップストリームしました。
他にも以下のオプションがあります。
データ取得元オプション
取得元の時間範囲として、計測、またはエッジ・開始時刻・終了時刻を指定します。
--api_url
requried
:サーバーURL--api_token
requried
: APIトークン--project_uuid
:プロジェクト、省略時はGlobal Project- いずれか
requried
- 計測指定:
--meas_uuid
:計測
- エッジ指定:
--edge_uuid
:エッジ--start
:開始時刻、RFC3339形式--end
:終了時刻、RFC3339形式
- 計測指定:
--data_id_filter
:データIDフィルター、カンマ区切りで複数指定<データ型名>:<データ名>, <データ型名>:<データ名>, ...
データ送信先オプション
プロジェクト以外は、省略時はデータ取得元と同じになります。
--dst_api_url
:サーバーURL--dst_api_token
: APIトークン--dst_project_uuid
:プロジェクト、省略時はGlobal Project--dst_edge_uuid
:エッジ
再生オプション
--speed
:再生スピード、省略時は等倍(1.0)
サンプルプログラム説明
APIアクセス部分を中心に説明します。
2つの処理を非同期で実行しています。
- Feed:REST APIから受け取ったデータポイントをキューに登録します。
- メモリ消費量を抑えるため、キューが一杯だったら、空くまで待機します。
- Fetch:キューからデータポイントを取り出して、アップストリームします。
- 元の計測と同じ頻度でデータ送信するため、経過時間分スリープします。
- 再生スピードを指定すると スリープ時間をその分調整します。
- 元の計測と同じ頻度でデータ送信するため、経過時間分スリープします。
データポイント取得
メモリ消費低減のため、_preload_content
をFalse
にして随時読み込みます。
api = measurement_service_data_points_api.MeasurementServiceDataPointsApi( self.client ) params = { "project_uuid": self.project_uuid, "name": self.meas_uuid if self.meas_uuid else self.edge_uuid, "time_format": "ns", "_preload_content": False, # 全データロードの抑止 } if self.start: params["start"] = self.start if self.end: params["end"] = self.end if self.data_id_filter: params["data_id_filter"] = self.data_id_filter stream = api.list_project_data_points(**params)
時刻変換
データ送信を待機するため、取得したデータポイントの絶対時刻から経過時間を算出します。
basetime_ns = int(basetime.timestamp() * 1_000_000) * 1_000 ... elapsed_time = tuple[0] - basetime_ns
再生スピード調整
算出した経過時間分スリープします。
elapsed_time_speed = int(elapsed_time / self.speed) sleep_time = ( basetime_ns + elapsed_time_speed - iscp.DateTime.utcnow().unix_nano() ) / 1_000_000_000 if sleep_time > 0: await asyncio.sleep(sleep_time)
送信するデータポイントの経過時間は、現在時刻と新たな計測の基準時間との差分です。
elapsed_time_replay = ( iscp.DateTime.utcnow().unix_nano() - start_time.unix_nano() ) await self.upstreamer.send(elapsed_time_replay, type, name, data)
メモリ使用量表示
チャンク転送エンコーディングによりメモリ消費が抑えられることを確認するため、使用量をログ表示しています。
process = psutil.Process()
mem_info = process.memory_info()
logging.info(f"Memory Usage: {mem_info.rss / 1024 / 1024:.2f} MB")
おおよそ55MBで推移しています。
試しに_preload_content
オプションを外して(デフォルトTrue
で)実行すると、ストリーム開始まで10秒ほどかかり、600〜700MBほど消費します。
サンプルプログラム修正
本シリーズで以前、REST APIで計測データをエクスポート/インポートするサンプルプログラムを紹介しました。
こちらは、計測データ量に依存してメモリ消費が増えてしまうため、長時間計測には不向きでした。
今回、同じように_preload_content
オプションを指定して、メモリ消費量を抑えるようした修正版を追加しました。4
エクスポート
List Data Pointのオプション指定を変更しました。
api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(client) params = { "project_uuid": project_uuid, "name": meas_uuid, "time_format": "ns", "_preload_content": False, } stream = api.list_project_data_points(**params)
一定量ずつ取得した計測データをバッファに取っておいて、1行ずつ順次返すように変更しました。
# バッファ格納 buffer = b"" while True: chunk = stream.read(chunk_size) if not chunk: break buffer += chunk # JSON Line切り出し while b"\n" in buffer: line, buffer = buffer.split(b"\n", 1) line_json = json.loads(line.decode()) if "data" not in line_json: continue if "d" not in line_json["data"]: continue yield line_json
出力ファイルがJSONだとインデント処理が煩雑になるため、JSON Linesに変更しました。
修正前フォーマット:JSON
{ "measurement": <計測オブジェクト>, "basetimes": [<基準時刻>, <基準時刻>, ..], "datapoints": [<データポイント>, <データポイント>, ..] }
修正後フォーマット:JSON Lines
{"measurement": <計測オブジェクト>} {"basetime": <基準時刻>} {"basetime": <基準時刻>} ... {"datapoint": <データポイント>} {"datapoint": <データポイント>} ...
さきほどのGPS計測をエクスポートすると以下のようになります。
修正前は1.4GBほどメモリを消費しました。5
python lesson2/migrate/src/meas_export.py --api_url <YOUR_API_URL> --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>
修正後は46MBほどになりました。
python lesson2/migrate/src/meas_export_mem.py --api_url <YOUR_API_URL> --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>
インポート
あわせて、JSON Linesファイルを随時読み出すようにインポートを修正しました。
with open(file_path, "r", encoding="utf-8") as f: for line in f: try: yield json.loads(line, object_hook=measurement_decoder) except json.JSONDecodeError as e: logging.warning(f"JSON decode error: {e}")
CHUNK_SIZE
ずつ送信するよう制御しています。
buffer.append(entry["datapoint"]) if len(buffer) >= CHUNK_SIZE: sequence_number = send_chunks( client, project_uuid, measurement_obj.uuid, measurement["basetime"], sequence_uuid, buffer, sequence_number, ) buffer.clear()
エクスポートしたファイルのインポート結果です。
修正前は2.5GBほどメモリを消費して途中で止まりました。
python lesson2/migrate/src/meas_import.py --api_url <YOUR_API_URL> --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --edge_uuid <YOUR_EDGE_UUID> --src_file <YOUR_JSON_FILE>
修正後は50MB未満に収まりました。
python lesson2/migrate/src/meas_import_mem.py --api_url <YOUR_API_URL> --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --edge_uuid <YOUR_EDGE_UUID> --src_file <YOUR_JSONL_FILE>
おわりに
今回は開発で利用できるツールの一例を紹介しました。
実データを元に繰り返し送信したり、別環境で再現できると検証の質が向上します。
このようにREST/リアルタイムAPIを組みあわせると自由度の高い制御も可能です。
リンク
本シリーズの過去記事はこちらからご覧ください。
- SDK入門①〜社用車で走ったとこ全部見せます〜 :REST APIでデータ取得
- SDK入門②〜データ移行ツールの作り方〜:REST APIでデータ送信
- SDK入門③〜RTSPで映像配信するぞ〜:リアルタイムAPIでデータ取得
- SDK入門④〜YOLOで物体検知しちゃう〜:リアルタイムAPIでデータ送信
- SDK入門⑤〜iPadでData Visualizerを見る会〜:リアルタイムAPIでキャプチャデータ送信
- SDK入門⑥〜最速最高度で計測する日〜: AWS LambdaでREST APIデータ送信