aptpod Tech Blog

株式会社アプトポッドのテクノロジーブログです

最小エッジでストリームしてみた

手軽にデータストリーミングを開発したいみなさん、

こんにちは。ソリューションアーキテクトの伊勢です。

これまでintdashにデータをストリームするには、クライアントライブラリを使用してコーディングが必要でした。

2023年12月バージョンからREST APIでアップストリームできるようになったのでご紹介します。

はじめに

ストリームとは

ストリームは、データをリアルタイム(または、ほぼリアルタイム)で継続的に送受信するプロセスです。

アップストリームは、デバイス側からプラットフォーム側にデータを送信する方向のストリームです。

www.aptpod.co.jp

intdashでは、独自プロトコルiSCP(intdash Stream Control Protocol)で高頻度・低遅延のストリームを実現しています。

tech.aptpod.co.jp

また、intdashを使ったアプリケーション開発をしやすくするため、intdashでは各種クライアントライブラリをご用意しています。

もっと手軽にデータ送信

今回追加されたのは、intdashにHTTPでデータ送信するためのAPIです。

クライアントライブラリとREST API

クライアントライブラリとの違い

クライアントライブラリとはトレードオフがあります。

特性 ライブラリ REST API
学習コスト ⭕️
実行環境制約 ⭕️
アプリケーションの柔軟性 ⭕️

使いどころ

  • 低頻度データ(〜数秒に1回)
  • ライブラリ追加が難しい動作環境
  • 開発時の簡単な検証

向かない使い方

  • 高頻度データ(1秒に数回〜)
  • 処理性能要求が高めのアプリケーションへの組み込み

追加API

具体的に追加されたAPIは以下の4つです。

  • Create Upstream:アップストリーム作成
  • Close Upstream:アップストリームを閉じる
  • Send Upstream Chunks:アップストリームチャンクを送信
  • Send Upstream Metadata:メタデータを送信

叩いてみた

実際に使ってみました。

Macbookからcurlコマンドを叩きながら、サーバーの計測の状態変化をMeas Hubで見ていきます。1

計測の作成

まず、データを格納するための計測を作成します。

HTTPボディで基準時刻とエッジを指定しています。

計測作成

サーバーに、データが入っていない空の計測が作成されます。

ステータスは計測準備中となっています。

空の計測

アップストリームを作成

エッジとサーバー間の送受信プロセスであるアップストリームを作成します。

HTTPボディでエッジsource_node_idと計測session_idを指定しています。

ストリーム作成

計測のステータスが計測中に変わります。

計測開始

アップストリームチャンクを送信

いよいよデータを送信します。

URLでストリームを指定し、HTTPボディでデータの定義と値を指定します。

データ送信

Meas Hubに送信データのデータ型とデータ名が表示されます。

回収済みデータポイント数が増加

データIDリストに送信データ

アップストリームを閉じる

最後にストリームを閉じます。

URLでストリームを指定し、HTTPボディで計測完了close_sessionを指定します。

ストリームを閉じる

計測のステータスは終了に変わり、計測の終了時刻が表示されます。

計測終了

最小エッジでストリームしてみた

OSがない環境でもHTTPリクエストは送信できます。

M5Stackシリーズのマイコン ATOM Lite にMicroPythonで実装してみました。2

サイズは24mm x 24mm x 10mm。ストリームするintdashエッジとしては過去最小ではないかと思います。

照度センサーの値を1Hzでアップストリームします。

www.youtube.com

マイコンからHTTPでストリーム

おわりに

手軽にストリームを実現できる方法としてREST APIの新機能をご紹介しました。

シェルでシステム情報を送信したり、intdashを使った開発のパターンが広がります。

データ収集プラットフォームの開発はぜひaptpodにご相談ください。

Appendix. サンプルcurlコマンド

# エッジのOAuthクライアントクレデンシャル払い出し
curl -X POST https://example.intdash.jp/api/auth/oauth2/token \
-d "grant_type=client_credentials&client_id={YOUR_ID}&client_secret={YOUR_SECRET}"

# アクセストークン
export ACCESS_TOKEN={YOUR_ACCESS_TOKEN}

# 計測作成
curl -X POST https://example.intdash.jp/api/v1/projects/{YOUR_PROJECT_ID}/measurements \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
  "basetime": "2024-01-01T00:00:00.000000Z",
  "basetime_type": "ntp",
  "edge_uuid": "{YOUR_EDGE_ID}",
  "protected": false
}'


# ストリーム作成
curl -i -X POST https://example.intdash.jp/api/iscp/projects/{YOUR_PROJECT_ID}/upstreams \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
  "session_id": "8efe703c-b617-4a6d-bea9-ed9ea11c2c66",
  "source_node_id": "{YOUR_EDGE_ID}",
  "persist": true
}'


# チャンク送信
curl -i -X POST https://example.intdash.jp/api/iscp/projects/{YOUR_PROJECT_ID}/upstreams/{YOUR_STREAM_ID}/chunks \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
  "data_point_groups": [
    {
      "data_id": {
        "type": "float64",
        "name": "1/test"
      },
      "data_points": [
        {
          "elapsed_time": 1,
          "float64": 0.123
        }
      ]
    }
  ]
}'

# ストリームを閉じる
curl -i -X PUT https://example.intdash.jp/api/iscp/projects/${YOUR_PROJECT_ID}/upstreams/{YOUR_STREAM_ID}/close \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
  "close_session": true
}'

Appendix. サンプルMicroPythonプログラム

import os
import sys
import io
import M5
from M5 import BtnA
from hardware import Pin, RGB, I2C
from unit import DLightUnit
import time
import requests2
import network
import ntptime
import json
import gc

# 定数定義
PROJECT_ID = 'YOUR_PROJECT_ID'
EDGE_ID = 'YOUR_EDGE_ID'
EDGE_SECRET = 'EDGE_SECRET'
WIFI_SSID = 'YOUR_SSID'
WIFI_PASSWORD = 'YOUR_PASS'
NTP_SERVER = 'YOUR_NTP_SERVER'
TIMEZONE = 'GMT-9'
API_BASE_URL = "https://example.intdash.jp/api"

# グローバル変数
access_token = None
stream_id = None
start_time_us = None
is_measuring = True
wlan = network.WLAN(network.STA_IF)
rgb = RGB()
dlight_0 = DLightUnit(I2C(0, scl=Pin(32), sda=Pin(26), freq=100000))


def send_request(method, url, headers=None, json=None, data=None):
    """
    HTTPリクエストを実行する共通関数

    :param method: HTTPメソッド (GET/POST/PUT/DELETE)
    :param url: リクエスト先のURL
    :param headers: リクエストのヘッダー情報 (オプション)
    :param json: JSON形式で送信するデータ (オプション)
    :param data: フォームエンコード形式で送信するデータ (オプション)
    :return: 200番台はHTTPレスポンス、それ以外はNone
    """
    default_headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {access_token}'
    }

    headers = headers or default_headers
    request_func = {
        'POST': requests2.post,
        'PUT': requests2.put,
        'GET': requests2.get,
        'DELETE': requests2.delete
    }.get(method)

    if request_func is None:
        raise ValueError(f"Unsupported HTTP method: {method}")

    http_req = request_func(url, headers=headers, json=json, data=data)

    if 200 <= http_req.status_code < 300:
        response = http_req.json()
    else:
        print(f"Failed request. Status Code: {http_req.status_code}, Response: {http_req.text}")
        response = None

    http_req.close()
    gc.collect()
    return response


def get_current_time():
    """
    現在時刻取得(ISO 8601形式)

    :return: ISO 8601形式の現在時刻文字列
    """
    current_time = time.gmtime()
    return "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.000000Z".format(
        *current_time[:6]
    )


def get_elapsed_time():
    """
    経過時間取得

    :return: 経過時間(ナノ秒)
    """
    return time.ticks_diff(time.ticks_us(), start_time_us) * 1000


def connect_wifi():
    """
    WiFi接続
    """
    if wlan.isconnected():
        print("WiFi is already connected.")
        return

    wlan.active(False)
    wlan.active(True)
    wlan.connect(WIFI_SSID, WIFI_PASSWORD)
    while not wlan.isconnected():
        print("Connecting to WiFi...")
        time.sleep(1)

    print("WiFi connected. IP Address:", wlan.ifconfig())


def sync_ntp():
    """
    NTPサーバー同期
    """
    ntptime.host = NTP_SERVER
    max_retries = 3
    for attempt in range(max_retries):
        try:
            ntptime.settime()
            print("NTP time synchronized successfully.")
            print("Current Local Time (JST):", time.localtime())
            break
        except OSError as e:
            print(f"NTP synchronization failed on attempt {attempt + 1}: {e}")
            if attempt < max_retries - 1:
                time.sleep(5)


def authenticate():
    """
    認証
    """
    global access_token
    url = f"{API_BASE_URL}/auth/oauth2/token"
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    data = f"grant_type=client_credentials&client_id={EDGE_ID}&client_secret={EDGE_SECRET}"

    response = send_request('POST', url, headers=headers, data=data)
    access_token = response.get("access_token")
    if access_token:
        print("Access token obtained.")
    else:
        print("Failed to obtain access token.")


def setup_measurement():
    """
    計測・ストリーム開始
    """
    global stream_id, start_time_us
    start_time_us = time.ticks_us()
    current_time = get_current_time()

    measurement_url = f"{API_BASE_URL}/v1/projects/{PROJECT_ID}/measurements"
    body = {
        "basetime": current_time,
        "basetime_type": "ntp",
        "edge_uuid": EDGE_ID,
        "protected": False
    }
    measurement_response = send_request('POST', measurement_url, json=body)
    measurement_id = measurement_response.get("uuid")

    stream_url = f"{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams"
    body = {
        "session_id": measurement_id,
        "source_node_id": EDGE_ID,
        "persist": True
    }
    stream_response = send_request('POST', stream_url, json=body)
    stream_id = stream_response.get("stream_id")


def btnA_wasPressed_event(state):
    """
    ボタンイベント

    計測中はストリームクローズ
    計測停止中はストリーム開始

    :param state: ボタンの状態
    """
    global is_measuring
    if is_measuring:
        is_measuring = False
        rgb.fill_color(0x000000)
        close_url = f'{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams/{stream_id}/close'
        send_request('PUT', close_url, json={"close_session": True})
    else:
        setup_measurement()
        is_measuring = True


def setup():
    """
    初期設定
    """
    M5.begin()
    BtnA.setCallback(type=BtnA.CB_TYPE.WAS_PRESSED, cb=btnA_wasPressed_event)
    rgb.fill_color(0xFF0077)
    dlight_0.configure(dlight_0.CONTINUOUSLY, dlight_0.L_RESOLUTION_MODE)

    connect_wifi()
    sync_ntp()
    authenticate()
    setup_measurement()


def loop():
    """
    メインループ処理
    """
    M5.update()

    if BtnA.isPressed() or not is_measuring:
        time.sleep(1)
        return

    rgb.fill_color(0x0000FF)
    lux = dlight_0.get_lux()
    print("Current Lux:", lux)

    body = {
        "data_point_groups": [
            {
                "data_id": {"type": "float64", "name": "1/lux"},
                "data_points": [{"elapsed_time": get_elapsed_time(), "float64": lux}]
            }
        ]
    }
    chunks_url = f"{API_BASE_URL}/iscp/projects/{PROJECT_ID}/upstreams/{stream_id}/chunks"
    send_request('POST', chunks_url, json=body)
    rgb.fill_color(0x000077)
    time.sleep(1)


if __name__ == '__main__':
    try:
        setup()
        while True:
            loop()
    except (Exception, KeyboardInterrupt) as e:
        print("An error occurred:", e)

  1. サンプルコマンドを本記事の最後に掲載しています。
  2. サンプルプログラムを本記事の最後に掲載しています。