手軽にデータストリーミングを開発したいみなさん、
こんにちは。ソリューションアーキテクトの伊勢です。
これまでintdashにデータをストリームするには、クライアントライブラリを使用してコーディングが必要でした。
2023年12月バージョンからREST APIでアップストリームできるようになったのでご紹介します。
はじめに
ストリームとは
ストリームは、データをリアルタイム(または、ほぼリアルタイム)で継続的に送受信するプロセスです。
アップストリームは、デバイス側からプラットフォーム側にデータを送信する方向のストリームです。
intdashでは、独自プロトコルiSCP(intdash Stream Control Protocol)で高頻度・低遅延のストリームを実現しています。
また、intdashを使ったアプリケーション開発をしやすくするため、intdashでは各種クライアントライブラリをご用意しています。
もっと手軽にデータ送信
今回追加されたのは、intdashにHTTPでデータ送信するためのAPIです。
クライアントライブラリとの違い
クライアントライブラリとはトレードオフがあります。
特性 | ライブラリ | REST API |
---|---|---|
学習コスト | ❌ | ⭕️ |
実行環境制約 | ❌ | ⭕️ |
アプリケーションの柔軟性 | ⭕️ | ❌ |
使いどころ
- 低頻度データ(〜数秒に1回)
- ライブラリ追加が難しい動作環境
- 開発時の簡単な検証
向かない使い方
- 高頻度データ(1秒に数回〜)
- 処理性能要求が高めのアプリケーションへの組み込み
追加API
具体的に追加されたAPIは以下の4つです。
- Create Upstream:アップストリーム作成
- Close Upstream:アップストリームを閉じる
- Send Upstream Chunks:アップストリームチャンクを送信
- Send Upstream Metadata:メタデータを送信
叩いてみた
実際に使ってみました。
Macbookからcurlコマンドを叩きながら、サーバーの計測の状態変化をMeas Hubで見ていきます。1
計測の作成
まず、データを格納するための計測を作成します。
HTTPボディで基準時刻とエッジを指定しています。
サーバーに、データが入っていない空の計測が作成されます。
ステータスは計測準備中
となっています。
アップストリームを作成
エッジとサーバー間の送受信プロセスであるアップストリームを作成します。
HTTPボディでエッジsource_node_id
と計測session_id
を指定しています。
計測のステータスが計測中
に変わります。
アップストリームチャンクを送信
いよいよデータを送信します。
URLでストリームを指定し、HTTPボディでデータの定義と値を指定します。
Meas Hubに送信データのデータ型とデータ名が表示されます。
アップストリームを閉じる
最後にストリームを閉じます。
URLでストリームを指定し、HTTPボディで計測完了close_session
を指定します。
計測のステータスは終了に変わり、計測の終了時刻が表示されます。
最小エッジでストリームしてみた
OSがない環境でもHTTPリクエストは送信できます。
M5Stackシリーズのマイコン ATOM Lite にMicroPythonで実装してみました。2
サイズは24mm x 24mm x 10mm。ストリームするintdashエッジとしては過去最小ではないかと思います。
照度センサーの値を1Hzでアップストリームします。
おわりに
手軽にストリームを実現できる方法としてREST APIの新機能をご紹介しました。
シェルでシステム情報を送信したり、intdashを使った開発のパターンが広がります。
データ収集プラットフォームの開発はぜひaptpodにご相談ください。
Appendix. サンプルcurlコマンド
# エッジのOAuthクライアントクレデンシャル払い出し curl -X POST https://example.intdash.jp/api/auth/oauth2/token \ -d "grant_type=client_credentials&client_id={YOUR_ID}&client_secret={YOUR_SECRET}" # アクセストークン export ACCESS_TOKEN={YOUR_ACCESS_TOKEN} # 計測作成 curl -X POST https://example.intdash.jp/api/v1/projects/{YOUR_PROJECT_ID}/measurements \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "basetime": "2024-01-01T00:00:00.000000Z", "basetime_type": "ntp", "edge_uuid": "{YOUR_EDGE_ID}", "protected": false }' # ストリーム作成 curl -i -X POST https://example.intdash.jp/api/iscp/projects/{YOUR_PROJECT_ID}/upstreams \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "session_id": "8efe703c-b617-4a6d-bea9-ed9ea11c2c66", "source_node_id": "{YOUR_EDGE_ID}", "persist": true }' # チャンク送信 curl -i -X POST https://example.intdash.jp/api/iscp/projects/{YOUR_PROJECT_ID}/upstreams/{YOUR_STREAM_ID}/chunks \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "data_point_groups": [ { "data_id": { "type": "float64", "name": "1/test" }, "data_points": [ { "elapsed_time": 1, "float64": 0.123 } ] } ] }' # ストリームを閉じる curl -i -X PUT https://example.intdash.jp/api/iscp/projects/${YOUR_PROJECT_ID}/upstreams/{YOUR_STREAM_ID}/close \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ -H "Content-Type: application/json" \ -d '{ "close_session": true }'
Appendix. サンプルMicroPythonプログラム
import os import sys import io import M5 from M5 import BtnA from hardware import Pin, RGB, I2C from unit import DLightUnit import time import requests2 import network import ntptime import json import gc # 定数定義 PROJECT_ID = 'YOUR_PROJECT_ID' EDGE_ID = 'YOUR_EDGE_ID' EDGE_SECRET = 'EDGE_SECRET' WIFI_SSID = 'YOUR_SSID' WIFI_PASSWORD = 'YOUR_PASS' NTP_SERVER = 'YOUR_NTP_SERVER' TIMEZONE = 'GMT-9' API_BASE_URL = "https://example.intdash.jp/api" # グローバル変数 access_token = None stream_id = None start_time_us = None is_measuring = True wlan = network.WLAN(network.STA_IF) rgb = RGB() dlight_0 = DLightUnit(I2C(0, scl=Pin(32), sda=Pin(26), freq=100000)) def send_request(method, url, headers=None, json=None, data=None): """ HTTPリクエストを実行する共通関数 :param method: HTTPメソッド (GET/POST/PUT/DELETE) :param url: リクエスト先のURL :param headers: リクエストのヘッダー情報 (オプション) :param json: JSON形式で送信するデータ (オプション) :param data: フォームエンコード形式で送信するデータ (オプション) :return: 200番台はHTTPレスポンス、それ以外はNone """ default_headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}' } headers = headers or default_headers request_func = { 'POST': requests2.post, 'PUT': requests2.put, 'GET': requests2.get, 'DELETE': requests2.delete }.get(method) if request_func is None: raise ValueError(f"Unsupported HTTP method: {method}") http_req = request_func(url, headers=headers, json=json, data=data) if 200 <= http_req.status_code < 300: response = http_req.json() else: print(f"Failed request. Status Code: {http_req.status_code}, Response: {http_req.text}") response = None http_req.close() gc.collect() return response def get_current_time(): """ 現在時刻取得(ISO 8601形式) :return: ISO 8601形式の現在時刻文字列 """ current_time = time.gmtime() return "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.000000Z".format( *current_time[:6] ) def get_elapsed_time(): """ 経過時間取得 :return: 経過時間(ナノ秒) """ return time.ticks_diff(time.ticks_us(), start_time_us) * 1000 def connect_wifi(): """ WiFi接続 """ if wlan.isconnected(): print("WiFi is already connected.") return wlan.active(False) wlan.active(True) wlan.connect(WIFI_SSID, WIFI_PASSWORD) while not wlan.isconnected(): print("Connecting to WiFi...") time.sleep(1) print("WiFi connected. IP Address:", wlan.ifconfig()) def sync_ntp(): """ NTPサーバー同期 """ ntptime.host = NTP_SERVER max_retries = 3 for attempt in range(max_retries): try: ntptime.settime() print("NTP time synchronized successfully.") print("Current Local Time (JST):", time.localtime()) break except OSError as e: print(f"NTP synchronization failed on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: time.sleep(5) def authenticate(): """ 認証 """ global access_token url = f"{API_BASE_URL}/auth/oauth2/token" headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = f"grant_type=client_credentials&client_id={EDGE_ID}&client_secret={EDGE_SECRET}" response = send_request('POST', url, headers=headers, data=data) access_token = response.get("access_token") if access_token: print("Access token obtained.") else: print("Failed to obtain access token.") def setup_measurement(): """ 計測・ストリーム開始 """ global stream_id, start_time_us start_time_us = time.ticks_us() current_time = get_current_time() measurement_url = f"{API_BASE_URL}/v1/projects/{PROJECT_ID}/measurements" body = { "basetime": current_time, "basetime_type": "ntp", "edge_uuid": EDGE_ID, "protected": False } measurement_response = send_request('POST', measurement_url, json=body) measurement_id = measurement_response.get("uuid") stream_url = f"{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams" body = { "session_id": measurement_id, "source_node_id": EDGE_ID, "persist": True } stream_response = send_request('POST', stream_url, json=body) stream_id = stream_response.get("stream_id") def btnA_wasPressed_event(state): """ ボタンイベント 計測中はストリームクローズ 計測停止中はストリーム開始 :param state: ボタンの状態 """ global is_measuring if is_measuring: is_measuring = False rgb.fill_color(0x000000) close_url = f'{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams/{stream_id}/close' send_request('PUT', close_url, json={"close_session": True}) else: setup_measurement() is_measuring = True def setup(): """ 初期設定 """ M5.begin() BtnA.setCallback(type=BtnA.CB_TYPE.WAS_PRESSED, cb=btnA_wasPressed_event) rgb.fill_color(0xFF0077) dlight_0.configure(dlight_0.CONTINUOUSLY, dlight_0.L_RESOLUTION_MODE) connect_wifi() sync_ntp() authenticate() setup_measurement() def loop(): """ メインループ処理 """ M5.update() if BtnA.isPressed() or not is_measuring: time.sleep(1) return rgb.fill_color(0x0000FF) lux = dlight_0.get_lux() print("Current Lux:", lux) body = { "data_point_groups": [ { "data_id": {"type": "float64", "name": "1/lux"}, "data_points": [{"elapsed_time": get_elapsed_time(), "float64": lux}] } ] } chunks_url = f"{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams/{stream_id}/chunks" send_request('POST', chunks_url, json=body) rgb.fill_color(0x000077) time.sleep(1) if __name__ == '__main__': try: setup() while True: loop() except (Exception, KeyboardInterrupt) as e: print("An error occurred:", e)