aptpod Tech Blog

株式会社アプトポッドのテクノロジーブログです

SDK入門②〜データ移行ツールの作り方〜

計測データを自分でintdashに登録したいみなさん、

こんにちは。ソリューションアーキテクトの伊勢です。

エッジから収集したデータを加工して可視化・確認したいニーズが増えています。

今回はREST APIでintdashにデータを登録する方法を説明します。

はじめに

今回説明するのはREST APIへのデータ送信です。

intdash SDK

典型的な利用シーンは以下の通りです。

  • 保留データの遅延アップロード
    • リアルタイム送信時に送りきれなかったデータをあとで送信
  • 低頻度データ送信
    • リアルタイムAPIを使わずにストリーム送信

データ送信のステータス管理

前回のREST APIからのデータ取得は非常にシンプルでした。

データ送信は少し複雑です。

同時並行で送信される計測データの未回収/回収済を管理するためです。

計測シーケンス

  • 計測はデータ送信を受け付ける単位として複数の計測シーケンスを持ちます。
  • 計測シーケンスが回収する予定の総データポイント数を管理します。
  • データポイントはチャンクという単位でまとめて送信されます。
  • チャンクには計測シーケンス内で連番が振られます。
  • チャンクの送信が完了するとステータスが回収済になります。
  • 回収済のデータポイント合計が計測の回収済みデータポイント数になります。

Protocol Buffersの利用

REST APIへのデータ送信には、Googleが開発したシリアライズフォーマットであるProtocol Buffersを利用できます。1

これによって大量データの送信が安定化します。

  • バイナリ化によるトラフィック軽減
  • シリアライズ・デシリアライズの高速化
  • データフォーマット定義の明確化

インストール

REST APIでデータ送信するため、前回のインストール手順に加えてProtocol Buffersエンコーダーを導入します。

Protocol Buffersエンコーダーはintdash用のプロトコル定義ファイルから生成します。

Buf CLIインストール

プロトコル定義ファイルからProtocol Buffersエンコーダーを生成するツール Buf CLIをインストールします。

brew install bufbuild/buf/buf
buf --version

Buf CLIのインストール

Protocol Buffersエンコーダーの生成

Buf CLIでプロトコル定義ファイルからエンコーダーを生成します。

プロトコル定義ファイルのダウンロード

intdash API specificationページからプロトコル定義ファイルページに遷移し、プロトコル定義ファイル protocol.proto をダウンロードします。2

プロトコル定義ファイルページへのリンク

プロトコル定義ファイルページ

エンコーダーの生成

ワークディレクトリを作成してプロトコル定義ファイルを配置します。

プロトコル定義ファイルのパッケージ名をintdashに変更します。

mkdir -p proto/intdash/v1/ 
cp path/to/protocol.proto proto/intdash/v1/  
sed -i -e "s/package pb;/package intdash.v1;/g" proto/intdash/v1/protocol.proto

Buf CLIの定義ファイルを作成し、エンコーダーを生成します。

  • buf.yaml:Buf CLIのプロジェクト基本設定
  • buf.gen.yaml:Buf CLIのコード生成設定

エンコーダーはgenディレクトリに出力されます。

cat << EOS > ./proto/buf.yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT
EOS

cat << EOS > ./buf.gen.yaml
version: v1
managed:
  enabled: true
plugins:
  - plugin: buf.build/protocolbuffers/python:v23.4
    out: gen
EOS

buf generate proto
ls -l gen

Protocol Buffersエンコーダーの生成

protobufパッケージインストール

生成したエンコードの実行にはPythonのprotobufパッケージが必要です。

仮想環境を有効化してインストールします。

. ./venv/bin/activate
pip install protobuf

動作確認

Pythonを起動して関連クラスをimportします。

エラーが出なければ、正常に読み込めています。3

python
>>> from gen.intdash.v1.protocol_pb2 import (StoreDataChunk, StoreDataChunks,
                                         StoreDataID, StoreDataPoint,
                                         StoreDataPointGroup)

import確認

やってみた

今回は2つのサンプルプログラムを説明します。4

  • データ移行ツール
  • GPS距離計算

データ移行ツール

計測データをJSONファイルにエクスポートし、別計測として再びインポートします。5

インポート時にProtocol Buffersエンコーダーを利用します。

データ移行ツール

実行結果

エクスポートとインポートで処理を分けています。

エクスポート

PYTHONPATHが指定されていないときは設定します。

echo $PYTHONPATH
export PYTHONPATH=/path/to/your_workspace:

対象計測を指定して実行します。

python lesson2/migrate/src/meas_export.py--api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>

エクスポート実行

JSONを見てみるとデータの構造がよくわかります。

計測、基準時刻

計測の主な項目は以下の通りです。

  • uuid: 計測UUID
  • basetime: もっとも優先度の高い基準時刻
  • status: 計測の状態
  • expected_data_points: 総データポイント数
  • received_data_points: 回収済みデータポイント数

基準時刻は計測中に複数送信できるため、リストになっています。

  • id: 基準時刻のID
  • basetime: 基準時刻
  • priority: 優先度

データポイント

データポイントの主な項目は以下の通りです。

  • time: 経過時間(基準時刻からの相対時間)
  • data_type: データ型
  • data_id: データを識別する名前
  • data.d: データペイロード(BASE64エンコード)

この計測はiPhoneアプリ intdash Motion V2で収集した位置情報です。

同じ経過時間で複数のデータ名が送信されているのがわかります。

インポート

別環境のエッジの計測としてデータを登録します。

python lesson2/migrate/src/meas_import.py --api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --edge_uuid <YOUR_EDGE_UUID>

インポート実行

別環境に登録された計測

サンプルプログラム
エクスポート

こちらはデータ取得のみでREST APIの使い方は前回の内容とほぼ変わりません。

計測(マーカー含む)、基準時刻、データポイントを取得しています。

計測データポイント取得でtime_format=“ns”を指定しています。デフォルトではマイクロ秒のため、正確にデータ移行する際には指定が必要です。

    api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(client)
    stream = api.list_project_data_points(
        project_uuid=project_uuid, name=meas_uuid, time_format="ns"
    )

取得した日付項目はdatetime型であるため、JSON出力時にナノ秒精度の時刻文字列にしています。

        if isinstance(obj, datetime):
            iso_str = obj.isoformat()
            if obj.microsecond:
                nano_str = "{:09d}".format(obj.microsecond * 1000)
                iso_str = iso_str.replace(f".{obj.microsecond:06d}", f".{nano_str}")
            return iso_str
インポート

エクスポートされたJSONファイルを読み込んで、新しい計測を作成します。

計測、基準時刻、マーカー情報を各APIのcreate_xxxメソッドで作成しています。

計測作成時に基準時刻が必ず1つできてしまうため、登録後に削除して、改めて作成し直しています。

    current_basetimes = api.list_project_measurement_base_times(
        project_uuid=project_uuid,
        measurement_uuid=measurement_uuid,
    )
    for bt_current in current_basetimes["items"]:
        api.delete_project_measurement_base_time_by_id(
            project_uuid=project_uuid,
            measurement_uuid=measurement_uuid,
            id=bt_current.id,
        )

計測シーケンスは置換メソッド replace_project_measurement_sequence が提供されています。

総データポイント数、回収済みデータポイント数を指定して計測シーケンスを登録します。

    sequence_group = MeasurementSequenceGroupReplace(
        expected_data_points=measurement_src["sequences"]["expected_data_points"],
        final_sequence_number=measurement_src["sequences"]["received_data_points"],
    )

    api = measurement_service_measurement_sequences_api.MeasurementServiceMeasurementSequencesApi(
        client
    )
    sequence = api.replace_project_measurement_sequence(
        project_uuid=project_uuid,
        measurement_uuid=measurement_uuid,
        sequences_uuid=sequence_uuid if sequence_uuid else str(uuid.uuid4()),
        measurement_sequence_group_replace=sequence_group,
    )

チャンクの送信でProtocol Buffersエンコーダーを使います。

  • データID(type, name)とデータポイント(経過時間elapsed_timepayloadをチャンクに格納
        store_data_point_group = StoreDataPointGroup(
            data_id=StoreDataID(
                type=data_point["data_type"], name=data_point["data_name"]
            ),
            data_points=[store_data_point],
        )

        store_data_chunk = StoreDataChunk(
            sequence_number=sequence_number, data_point_groups=[store_data_point_group]
        )

        chunks.append(store_data_chunk)
        sequence_number += 1
  • Content-typeprotocolbufを指定してデータポイントを格納したチャンクを送信
    results = api.create_project_measurement_sequence_chunks(
        project_uuid=project_uuid,
        body=io.BytesIO(chunk.SerializeToString()),
        _content_type="application/vnd.iscp.v2.protobuf",
    )

すべてのデータポイントを送信したら、計測のステータスを完了にします。

    api = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(client)
    api.complete_project_measurement(
        measurement_uuid=measurement_uuid, project_uuid=project_uuid
    )

GPS距離計算

GPSデータごとに緯度経度〜基準点からの距離を計算してREST APIで登録します。

全体構成図

GPS距離計算

元の計測は変更せず、同じ基準時刻の計測を新たに作ります。

なお、元の計測に新たなデータポイントを追加することもできます。6

実行結果

実行します。

python lesson2/distance/src/distance.py --api_url https://example.intdash.jp --api_token <YOUR_API_TOKEN> --project_uuid <YOUR_PROJECT_UUID> --meas_uuid <YOUR_MEAS_UUID>

距離算出実行

intdashに登録した距離を元のGPSデータと一緒に可視化しました。

distanceというValue Currentパーツにバインドしています。

基準点は品川駅

サンプルプログラム

クラスを分割しました。

  • MeasurementReader:元計測データを取得
  • DistanceCalculator:原点からの距離を算出
  • MeasurementWriter:新規計測データを送信
  • DetectService:各クラスの統合

REST APIアクセス部はデータ移行ツールとほとんど同じですが、 一定件数ずつフェッチしつつ登録するように制御しています。

        api = measurement_service_data_points_api.MeasurementServiceDataPointsApi(
            self.client
        )
        stream = api.list_project_data_points(
            project_uuid=self.project_uuid,
            name=self.meas_uuid,
            data_id_filter=["#:1/gnss_coordinates"],
            start=self.start,
            limit=fetch_size,
            time_format="ns",
        )

startに前回データポイントの時刻+1からフェッチを開始しています。

おわりに

今回はREST APIでのデータ送信をご紹介しました。

計測シーケンスやProtocol Bufferは少しとっつきづらく、ソースも複雑に見えますが、パターンがわかると応用できると思います。

次回はリアルタイムAPIを説明していきます。


  1. Protocol Buffersを使わずにJSONで送信もできます。送信データがProtocol BuffersでシリアライズされているかはHTTPヘッダのContent-Typeでサーバーに通知します。
  2. intdash API specificationページの参照にはユーザー登録が必要です。
  3. 動作確認できたら、Buf CLI、ワークディレクトリ、プロトコル定義ファイルは削除しても構いません。
  4. サンプルプログラムをGitHubで公開しています。
  5. データポイントが数百件ほどの小さい計測を想定しています。
  6. 計測から任意の計測データを変更・削除することはできません。当該データのみを変更・削除したい場合は、別に計測を作成して元の計測を削除します。