
計測データを自分でintdashに登録したいみなさん、
こんにちは。ソリューションアーキテクトの伊勢です。
エッジから収集したデータを加工して可視化・確認したいニーズが増えています。
今回はREST APIでintdashにデータを登録する方法を説明します。
はじめに
今回説明するのはREST APIへのデータ送信です。

典型的な利用シーンは以下の通りです。
- 保留データの遅延アップロード
- リアルタイム送信時に送りきれなかったデータをあとで送信
- 低頻度データ送信
- リアルタイムAPIを使わずにストリーム送信
データ送信のステータス管理
前回のREST APIからのデータ取得は非常にシンプルでした。
データ送信は少し複雑です。
同時並行で送信される計測データの未回収/回収済を管理するためです。

- 計測はデータ送信を受け付ける単位として複数の計測シーケンスを持ちます。
- 計測シーケンスが回収する予定の総データポイント数を管理します。
- データポイントはチャンクという単位でまとめて送信されます。
- チャンクには計測シーケンス内で連番が振られます。
- チャンクの送信が完了するとステータスが回収済になります。
- 回収済のデータポイント合計が計測の回収済みデータポイント数になります。
Protocol Buffersの利用
REST APIへのデータ送信には、Googleが開発したシリアライズフォーマットであるProtocol Buffersを利用できます。1
これによって大量データの送信が安定化します。
- バイナリ化によるトラフィック軽減
- シリアライズ・デシリアライズの高速化
- データフォーマット定義の明確化
インストール
REST APIでデータ送信するため、前回のインストール手順に加えてProtocol Buffersエンコーダーを導入します。
Protocol Buffersエンコーダーはintdash用のプロトコル定義ファイルから生成します。
Buf CLIインストール
プロトコル定義ファイルからProtocol Buffersエンコーダーを生成するツール Buf CLIをインストールします。
brew install bufbuild/buf/buf
buf --version

Protocol Buffersエンコーダーの生成
Buf CLIでプロトコル定義ファイルからエンコーダーを生成します。
プロトコル定義ファイルのダウンロード
intdash API specificationページからプロトコル定義ファイルページに遷移し、プロトコル定義ファイル protocol.proto をダウンロードします。2


エンコーダーの生成
ワークディレクトリを作成してプロトコル定義ファイルを配置します。
プロトコル定義ファイルのパッケージ名をintdashに変更します。
mkdir -p proto/intdash/v1/ cp path/to/protocol.proto proto/intdash/v1/ sed -i -e "s/package pb;/package intdash.v1;/g" proto/intdash/v1/protocol.proto
Buf CLIの定義ファイルを作成し、エンコーダーを生成します。
buf.yaml:Buf CLIのプロジェクト基本設定buf.gen.yaml:Buf CLIのコード生成設定
エンコーダーはgenディレクトリに出力されます。
cat << EOS > ./proto/buf.yaml version: v1 breaking: use: - FILE lint: use: - DEFAULT EOS cat << EOS > ./buf.gen.yaml version: v1 managed: enabled: true plugins: - plugin: buf.build/protocolbuffers/python:v23.4 out: gen EOS buf generate proto ls -l gen

protobufパッケージインストール
生成したエンコードの実行にはPythonのprotobufパッケージが必要です。
仮想環境を有効化してインストールします。
. ./venv/bin/activate
pip install protobuf
動作確認
Pythonを起動して関連クラスをimportします。
エラーが出なければ、正常に読み込めています。3
python >>> from gen.intdash.v1.protocol_pb2 import (StoreDataChunk, StoreDataChunks, StoreDataID, StoreDataPoint, StoreDataPointGroup)

やってみた
今回は2つのサンプルプログラムを説明します。4
- データ移行ツール
- GPS距離計算
データ移行ツール
計測データをJSONファイルにエクスポートし、別計測として再びインポートします。5
インポート時にProtocol Buffersエンコーダーを利用します。

実行結果
エクスポートとインポートで処理を分けています。
エクスポート
PYTHONPATHが指定されていないときは設定します。
echo $PYTHONPATH export PYTHONPATH=/path/to/your_workspace:
対象計測を指定して実行します。
python lesson2/migrate/src/meas_export.py--api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>

JSONを見てみるとデータの構造がよくわかります。

計測の主な項目は以下の通りです。
uuid: 計測UUIDbasetime: もっとも優先度の高い基準時刻status: 計測の状態expected_data_points: 総データポイント数received_data_points: 回収済みデータポイント数
基準時刻は計測中に複数送信できるため、リストになっています。
id: 基準時刻のIDbasetime: 基準時刻priority: 優先度

データポイントの主な項目は以下の通りです。
time: 絶対時刻(UNIXエポックからの経過時間、計測の基準時刻+経過時間)data_type: データ型data_id: データを識別する名前data.d: データペイロード(BASE64エンコード)
この計測はiPhoneアプリ intdash Motion V2で収集した位置情報です。
同じ経過時間で複数のデータ名が送信されているのがわかります。
インポート
別環境のエッジの計測としてデータを登録します。
python lesson2/migrate/src/meas_import.py --api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --edge_uuid <YOUR_EDGE_UUID>


サンプルプログラム
エクスポート
こちらはデータ取得のみでREST APIの使い方は前回の内容とほぼ変わりません。
計測(マーカー含む)、基準時刻、データポイントを取得しています。
計測データポイント取得でtime_format=“ns”を指定しています。デフォルトではマイクロ秒のため、正確にデータ移行する際には指定が必要です。
api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(client)
stream = api.list_project_data_points(
project_uuid=project_uuid, name=meas_uuid, time_format="ns"
)
取得した日付項目はdatetime型であるため、JSON出力時にナノ秒精度の時刻文字列にしています。
if isinstance(obj, datetime): iso_str = obj.isoformat() if obj.microsecond: nano_str = "{:09d}".format(obj.microsecond * 1000) iso_str = iso_str.replace(f".{obj.microsecond:06d}", f".{nano_str}") return iso_str
インポート
エクスポートされたJSONファイルを読み込んで、新しい計測を作成します。
計測、基準時刻、マーカー情報を各APIのcreate_xxxメソッドで作成しています。
計測作成時に基準時刻が必ず1つできてしまうため、登録後に削除して、改めて作成し直しています。
current_basetimes = api.list_project_measurement_base_times(
project_uuid=project_uuid,
measurement_uuid=measurement_uuid,
)
for bt_current in current_basetimes["items"]:
api.delete_project_measurement_base_time_by_id(
project_uuid=project_uuid,
measurement_uuid=measurement_uuid,
id=bt_current.id,
)
計測シーケンスは置換メソッド replace_project_measurement_sequence が提供されています。
総データポイント数、回収済みデータポイント数を指定して計測シーケンスを登録します。
sequence_group = MeasurementSequenceGroupReplace(
expected_data_points=measurement_src["sequences"]["expected_data_points"],
final_sequence_number=measurement_src["sequences"]["received_data_points"],
)
api = measurement_service_measurement_sequences_api.MeasurementServiceMeasurementSequencesApi(
client
)
sequence = api.replace_project_measurement_sequence(
project_uuid=project_uuid,
measurement_uuid=measurement_uuid,
sequences_uuid=sequence_uuid if sequence_uuid else str(uuid.uuid4()),
measurement_sequence_group_replace=sequence_group,
)
チャンクの送信でProtocol Buffersエンコーダーを使います。
- データID(
type,name)とデータポイント(経過時間elapsed_timeとpayloadをチャンクに格納
store_data_point_group = StoreDataPointGroup(
data_id=StoreDataID(
type=data_point["data_type"], name=data_point["data_name"]
),
data_points=[store_data_point],
)
store_data_chunk = StoreDataChunk(
sequence_number=sequence_number, data_point_groups=[store_data_point_group]
)
chunks.append(store_data_chunk)
sequence_number += 1
Content-typeにprotocolbufを指定してデータポイントを格納したチャンクを送信
results = api.create_project_measurement_sequence_chunks(
project_uuid=project_uuid,
body=io.BytesIO(chunk.SerializeToString()),
_content_type="application/vnd.iscp.v2.protobuf",
)
すべてのデータポイントを送信したら、計測のステータスを完了にします。
api = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(client)
api.complete_project_measurement(
measurement_uuid=measurement_uuid, project_uuid=project_uuid
)
GPS距離計算
GPSデータごとに緯度経度〜基準点からの距離を計算してREST APIで登録します。
全体構成図

元の計測は変更せず、同じ基準時刻の計測を新たに作ります。
なお、元の計測に新たなデータポイントを追加することもできます。6
実行結果
実行します。
python lesson2/distance/src/distance.py --api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>

intdashに登録した距離を元のGPSデータと一緒に可視化しました。
distanceというValue Currentパーツにバインドしています。

サンプルプログラム
クラスを分割しました。
- MeasurementReader:元計測データを取得
- DistanceCalculator:原点からの距離を算出
- MeasurementWriter:新規計測データを送信
- DetectService:各クラスの統合
REST APIアクセス部はデータ移行ツールとほとんど同じですが、 一定件数ずつフェッチしつつ登録するように制御しています。
api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(
self.client
)
stream = api.list_project_data_points(
project_uuid=self.project_uuid,
name=self.meas_uuid,
data_id_filter=["#:1/gnss_coordinates"],
start=self.start,
limit=fetch_size,
time_format="ns",
)
startに前回データポイントの時刻+1からフェッチを開始しています。
おわりに
今回はREST APIでのデータ送信をご紹介しました。
計測シーケンスやProtocol Bufferは少しとっつきづらく、ソースも複雑に見えますが、パターンがわかると応用できると思います。
次回はリアルタイムAPIを説明していきます。
-
Protocol Buffersを使わずにJSONで送信もできます。送信データがProtocol BuffersでシリアライズされているかはHTTPヘッダの
Content-Typeでサーバーに通知します。↩ - intdash API specificationページの参照にはユーザー登録が必要です。↩
- 動作確認できたら、Buf CLI、ワークディレクトリ、プロトコル定義ファイルは削除しても構いません。↩
- サンプルプログラムをGitHubで公開しています。↩
- データポイントが数百件ほどの小さい計測を想定しています。↩
- 計測から任意の計測データを変更・削除することはできません。当該データのみを変更・削除したい場合は、別に計測を作成して元の計測を削除します。↩