計測データを自分でintdashに登録したいみなさん、
こんにちは。ソリューションアーキテクトの伊勢です。
エッジから収集したデータを加工して可視化・確認したいニーズが増えています。
今回はREST APIでintdashにデータを登録する方法を説明します。
はじめに
今回説明するのはREST APIへのデータ送信です。
典型的な利用シーンは以下の通りです。
- 保留データの遅延アップロード
- リアルタイム送信時に送りきれなかったデータをあとで送信
- 低頻度データ送信
- リアルタイムAPIを使わずにストリーム送信
データ送信のステータス管理
前回のREST APIからのデータ取得は非常にシンプルでした。
データ送信は少し複雑です。
同時並行で送信される計測データの未回収/回収済を管理するためです。
- 計測はデータ送信を受け付ける単位として複数の計測シーケンスを持ちます。
- 計測シーケンスが回収する予定の総データポイント数を管理します。
- データポイントはチャンクという単位でまとめて送信されます。
- チャンクには計測シーケンス内で連番が振られます。
- チャンクの送信が完了するとステータスが回収済になります。
- 回収済のデータポイント合計が計測の回収済みデータポイント数になります。
Protocol Buffersの利用
REST APIへのデータ送信には、Googleが開発したシリアライズフォーマットであるProtocol Buffersを利用できます。1
これによって大量データの送信が安定化します。
- バイナリ化によるトラフィック軽減
- シリアライズ・デシリアライズの高速化
- データフォーマット定義の明確化
インストール
REST APIでデータ送信するため、前回のインストール手順に加えてProtocol Buffersエンコーダーを導入します。
Protocol Buffersエンコーダーはintdash用のプロトコル定義ファイルから生成します。
Buf CLIインストール
プロトコル定義ファイルからProtocol Buffersエンコーダーを生成するツール Buf CLIをインストールします。
brew install bufbuild/buf/buf
buf --version
Protocol Buffersエンコーダーの生成
Buf CLIでプロトコル定義ファイルからエンコーダーを生成します。
プロトコル定義ファイルのダウンロード
intdash API specificationページからプロトコル定義ファイルページに遷移し、プロトコル定義ファイル protocol.proto
をダウンロードします。2
エンコーダーの生成
ワークディレクトリを作成してプロトコル定義ファイルを配置します。
プロトコル定義ファイルのパッケージ名をintdashに変更します。
mkdir -p proto/intdash/v1/ cp path/to/protocol.proto proto/intdash/v1/ sed -i -e "s/package pb;/package intdash.v1;/g" proto/intdash/v1/protocol.proto
Buf CLIの定義ファイルを作成し、エンコーダーを生成します。
buf.yaml
:Buf CLIのプロジェクト基本設定buf.gen.yaml
:Buf CLIのコード生成設定
エンコーダーはgen
ディレクトリに出力されます。
cat << EOS > ./proto/buf.yaml version: v1 breaking: use: - FILE lint: use: - DEFAULT EOS cat << EOS > ./buf.gen.yaml version: v1 managed: enabled: true plugins: - plugin: buf.build/protocolbuffers/python:v23.4 out: gen EOS buf generate proto ls -l gen
protobufパッケージインストール
生成したエンコードの実行にはPythonのprotobuf
パッケージが必要です。
仮想環境を有効化してインストールします。
. ./venv/bin/activate
pip install protobuf
動作確認
Pythonを起動して関連クラスをimport
します。
エラーが出なければ、正常に読み込めています。3
python >>> from gen.intdash.v1.protocol_pb2 import (StoreDataChunk, StoreDataChunks, StoreDataID, StoreDataPoint, StoreDataPointGroup)
やってみた
今回は2つのサンプルプログラムを説明します。4
- データ移行ツール
- GPS距離計算
データ移行ツール
計測データをJSONファイルにエクスポートし、別計測として再びインポートします。5
インポート時にProtocol Buffersエンコーダーを利用します。
実行結果
エクスポートとインポートで処理を分けています。
エクスポート
PYTHONPATH
が指定されていないときは設定します。
echo $PYTHONPATH export PYTHONPATH=/path/to/your_workspace:
対象計測を指定して実行します。
python lesson2/migrate/src/meas_export.py--api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>
JSONを見てみるとデータの構造がよくわかります。
計測の主な項目は以下の通りです。
uuid
: 計測UUIDbasetime
: もっとも優先度の高い基準時刻status
: 計測の状態expected_data_points
: 総データポイント数received_data_points
: 回収済みデータポイント数
基準時刻は計測中に複数送信できるため、リストになっています。
id
: 基準時刻のIDbasetime
: 基準時刻priority
: 優先度
データポイントの主な項目は以下の通りです。
time
: 経過時間(基準時刻からの相対時間)data_type
: データ型data_id
: データを識別する名前data.d
: データペイロード(BASE64エンコード)
この計測はiPhoneアプリ intdash Motion V2で収集した位置情報です。
同じ経過時間で複数のデータ名が送信されているのがわかります。
インポート
別環境のエッジの計測としてデータを登録します。
python lesson2/migrate/src/meas_import.py --api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --edge_uuid <YOUR_EDGE_UUID>
サンプルプログラム
エクスポート
こちらはデータ取得のみでREST APIの使い方は前回の内容とほぼ変わりません。
計測(マーカー含む)、基準時刻、データポイントを取得しています。
計測データポイント取得でtime_format=“ns”
を指定しています。デフォルトではマイクロ秒のため、正確にデータ移行する際には指定が必要です。
api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(client)
stream = api.list_project_data_points(
project_uuid=project_uuid, name=meas_uuid, time_format="ns"
)
取得した日付項目はdatetime型であるため、JSON出力時にナノ秒精度の時刻文字列にしています。
if isinstance(obj, datetime): iso_str = obj.isoformat() if obj.microsecond: nano_str = "{:09d}".format(obj.microsecond * 1000) iso_str = iso_str.replace(f".{obj.microsecond:06d}", f".{nano_str}") return iso_str
インポート
エクスポートされたJSONファイルを読み込んで、新しい計測を作成します。
計測、基準時刻、マーカー情報を各APIのcreate_xxxメソッドで作成しています。
計測作成時に基準時刻が必ず1つできてしまうため、登録後に削除して、改めて作成し直しています。
current_basetimes = api.list_project_measurement_base_times( project_uuid=project_uuid, measurement_uuid=measurement_uuid, ) for bt_current in current_basetimes["items"]: api.delete_project_measurement_base_time_by_id( project_uuid=project_uuid, measurement_uuid=measurement_uuid, id=bt_current.id, )
計測シーケンスは置換メソッド replace_project_measurement_sequence
が提供されています。
総データポイント数、回収済みデータポイント数を指定して計測シーケンスを登録します。
sequence_group = MeasurementSequenceGroupReplace( expected_data_points=measurement_src["sequences"]["expected_data_points"], final_sequence_number=measurement_src["sequences"]["received_data_points"], ) api = measurement_service_measurement_sequences_api.MeasurementServiceMeasurementSequencesApi( client ) sequence = api.replace_project_measurement_sequence( project_uuid=project_uuid, measurement_uuid=measurement_uuid, sequences_uuid=sequence_uuid if sequence_uuid else str(uuid.uuid4()), measurement_sequence_group_replace=sequence_group, )
チャンクの送信でProtocol Buffersエンコーダーを使います。
- データID(
type
,name
)とデータポイント(経過時間elapsed_time
とpayload
をチャンクに格納
store_data_point_group = StoreDataPointGroup( data_id=StoreDataID( type=data_point["data_type"], name=data_point["data_name"] ), data_points=[store_data_point], ) store_data_chunk = StoreDataChunk( sequence_number=sequence_number, data_point_groups=[store_data_point_group] ) chunks.append(store_data_chunk) sequence_number += 1
Content-type
にprotocolbuf
を指定してデータポイントを格納したチャンクを送信
results = api.create_project_measurement_sequence_chunks(
project_uuid=project_uuid,
body=io.BytesIO(chunk.SerializeToString()),
_content_type="application/vnd.iscp.v2.protobuf",
)
すべてのデータポイントを送信したら、計測のステータスを完了にします。
api = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(client) api.complete_project_measurement( measurement_uuid=measurement_uuid, project_uuid=project_uuid )
GPS距離計算
GPSデータごとに緯度経度〜基準点からの距離を計算してREST APIで登録します。
全体構成図
元の計測は変更せず、同じ基準時刻の計測を新たに作ります。
なお、元の計測に新たなデータポイントを追加することもできます。6
実行結果
実行します。
python lesson2/distance/src/distance.py --api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>
intdashに登録した距離を元のGPSデータと一緒に可視化しました。
distance
というValue Currentパーツにバインドしています。
サンプルプログラム
クラスを分割しました。
- MeasurementReader:元計測データを取得
- DistanceCalculator:原点からの距離を算出
- MeasurementWriter:新規計測データを送信
- DetectService:各クラスの統合
REST APIアクセス部はデータ移行ツールとほとんど同じですが、 一定件数ずつフェッチしつつ登録するように制御しています。
api = measurement_service_data_points_api.MeasurementServiceDataPointsApi( self.client ) stream = api.list_project_data_points( project_uuid=self.project_uuid, name=self.meas_uuid, data_id_filter=["#:1/gnss_coordinates"], start=self.start, limit=fetch_size, time_format="ns", )
start
に前回データポイントの時刻+1からフェッチを開始しています。
おわりに
今回はREST APIでのデータ送信をご紹介しました。
計測シーケンスやProtocol Bufferは少しとっつきづらく、ソースも複雑に見えますが、パターンがわかると応用できると思います。
次回はリアルタイムAPIを説明していきます。
-
Protocol Buffersを使わずにJSONで送信もできます。送信データがProtocol BuffersでシリアライズされているかはHTTPヘッダの
Content-Type
でサーバーに通知します。↩ - intdash API specificationページの参照にはユーザー登録が必要です。↩
- 動作確認できたら、Buf CLI、ワークディレクトリ、プロトコル定義ファイルは削除しても構いません。↩
- サンプルプログラムをGitHubで公開しています。↩
- データポイントが数百件ほどの小さい計測を想定しています。↩
- 計測から任意の計測データを変更・削除することはできません。当該データのみを変更・削除したい場合は、別に計測を作成して元の計測を削除します。↩