aptpod Tech Blog

株式会社アプトポッドのテクノロジーブログです

intdashからAmazon S3にデータ伝送してみた

収集データを手軽に分析に利用したいみなさん、

こんにちは。ソリューションアーキテクトの伊勢です。

intdashは収集データをリアルタイムに可視化できるのが強みですが、

定時/逐次処理によるデータ分析やレポーティングのご要望もいただいています。

そこで今回は、2023年12月から新たに追加された永続化拡張機能をご紹介します。1

はじめに

永続化拡張とは

収集データをintdash内の時系列データベースに永続化すると同時に、分析基盤などの他プラットフォームにも連携します。2

他プラットフォームへデータを流し込む

連携先には以下を選択できます。

これにより、データ連携をカスタム開発せず、他プラットフォームでの収集データ利用が可能になります。

今回やること

スマートフォンアプリintdash Motion V2で計測・収集したGPSデータを、intdashからAmazon S3にCSVファイルとしてほぼリアルタイムに書き出します。

計測完了後、CSVファイルをまとめてPCで取得します。

全体構成

手順

S3バケットの準備

連携先となるS3のバケットを作成します。

S3バケット作成

AWSアクセスキーの準備

intdashからのアクセスに使用する、IAMでバケット所有者のAWSアクセスキーIDとシークレットアクセスキーを払い出します。

アクセスキー払い出し

永続化拡張設定

intdashに永続化拡張を設定します。

誤って大量データ書き出しが始まらないよう、設定を作成したあとに有効化します。3

設定の作成

設定項目は以下の通りです。

  • 設定の名前と説明
  • 永続化タイプ:S3
  • S3設定
    • リージョン、バケット名
    • パーティション単位(年、月、日)
    • AWSアクセスキーID、シークレットアクセスキー
  • ファイルフォーマット:CSV、JSON、RAW(Bytes)、Parquet
  • 対象/除外データリスト
  • 対象/除外ノード(エッジのUUID)リスト
  • 有効/無効

作成した設定では、 対象データをGPSの2項目に制限しています。

登録された設定を確認して有効化します。

設定の有効化

データ連携確認

intdash Motionで計測してみます。

GPSデータの各項目がEdge Finderに表示されます。

intdashサーバーにてGPSデータを受信

計測を開始すると、すぐにS3バケットにCSVファイルが現れ始めます。

data-YYYY-MM-DDThh:mm:ss.sssssssssZ.csvとナノ秒単位のファイル名が付与されます。

S3バケットにCSVファイルが出現

ファイルの中身はこのようになっています。

GPSデータのうち、先ほど設定した2つの対象データのみが出力されています。

  • 緯度経度1/gnss_coordinates:x, y列
  • 高度1/gnss_altitude:float列

一部項目は割愛

  • time:絶対時刻
  • source_node_id:送信元エッジID
  • session_id:計測ID
  • data_type:データタイプ
  • data_name:データ名称
  • string, float, int, bool, x, y, z, w:値、データタイプに応じた各列に格納
  • bytes:データ生値、Base64エンコード

1ファイル中には複数のエッジ・計測・データの行が混在します。

計測

移動しながら1時間ほど計測しました。

分析・レポート作成

Jupyter NotebookでCSVファイルを取得して地図に表示します。4

緯度経度をつないだ線を、高度のレンジごとで色分けしています。

新宿駅は武蔵野台地、品川駅は埋立地

拡大すると1Hzで収集した緯度経度を綺麗に描けているのがわかります。

五反田〜目黒は激登り

遅延アップロード

永続化拡張は、リアルタイムで送りきれなかったデータを回収する遅延アップロードにも働きます。

電波が悪い状況で試しました。

京都出張者の朝は早い

Motionに送りきれなかったデータが残っています。

手動でアップロードします。

xx.x % Uplodedの計測を選択してUpload

S3に大きめのCSVファイルが出現

なお、エッジデバイス用のエージェントソフトウェア intdash Edge Agent なら遅延アップロードを自動で行えます。

www.youtube.com

さて、再度プロットしてみました。

回線速度が落ちる静岡〜浜松間

遅延アップロードで補完されたデータにより、高度のグラデーションがより精緻に表現されています。

おわりに

永続化拡張機能によって、分析のためのデータ連携の開発が不要になり、システムインテグレーションの期間・コスト低減が期待できます。

他にも、画像分析などのAI活用、他プラットフォームデータとの複合データ分析など、様々な応用が考えられます。

モビリティデータを分析基盤で活用したい方は、ぜひintdashをご検討ください。

Appendix. 分析プログラム

import boto3
import pandas as pd
import folium
from io import StringIO
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import time

# AWSアクセスキーを設定
session = boto3.Session(
    aws_access_key_id='YOUR_ACCESS_KEY_ID',
    aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
    region_name='ap-northeast-1'
)

s3 = session.client('s3')
bucket_name = 'YOUR_BUCKET'
prefix = 'YYYY/MM/DD/'  # S3オブジェクトキーパス

# ページネーションでファイルリストを取得
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
file_list = []
for page in page_iterator:
    for content in page.get('Contents', []):
        file_list.append(content['Key'])

# バッチサイズ設定
batch_size = 100  # バッチサイズ(ファイル数)
all_coordinates_data = []  # 全ての座標データを格納するリスト
all_altitude_data = []  # 全ての高度データを格納するリスト

# カラーマップ設定
cmap = plt.get_cmap('jet')  # 'jet'カラーマップを使用
norm = mcolors.Normalize(vmin=0, vmax=30) 

# 地図中心設定
folium_map = folium.Map(location=[35.679889, 139.73875], zoom_start=12)

# ファイルをバッチサイズごとに処理
for batch_start in range(0, len(file_list), batch_size):
    batch_files = file_list[batch_start:batch_start + batch_size]
    
    # バッチ処理
    for file_key in sorted(batch_files):
        try:
            csv_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
            body = csv_obj['Body'].read().decode('utf-8')
            df = pd.read_csv(StringIO(body), usecols=['time', 'data_name', 'x', 'y', 'float'])

            # 緯度経度データ抽出
            coords_df = df[df['data_name'] == '1/gnss_coordinates'][['time', 'x', 'y']]
            # 高度データ抽出
            alt_df = df[df['data_name'] == '1/gnss_altitude'][['time', 'float']]

            all_coordinates_data.extend(coords_df.values.tolist())
            all_altitude_data.extend(alt_df.values.tolist())

        except Exception as e:
            print(f"Error reading {file_key}: {e}")
    
    # S3 API制限回避
    time.sleep(2)  # スリープ

# DataFrameに変換してソート
coordinates_df = pd.DataFrame(all_coordinates_data, columns=['time', 'x', 'y'])
altitude_df = pd.DataFrame(all_altitude_data, columns=['time', 'altitude'])

# timeでソート
coordinates_df.sort_values('time', inplace=True)
altitude_df.sort_values('time', inplace=True)

# 緯度(x)と経度(y)を地図にプロット
for i in range(len(coordinates_df) - 1):
    # 現在のポイントと次のポイントのデータ
    row_current = coordinates_df.iloc[i]
    row_next = coordinates_df.iloc[i + 1]
    
    # 高度の行をtimeで検索
    altitude_row_current = altitude_df[altitude_df['time'] == row_current['time']]
    altitude_current = altitude_row_current['altitude'].values[0] if not altitude_row_current.empty else None

    # 色の設定
    if altitude_current is not None:
        color = mcolors.to_hex(cmap(norm(altitude_current)))
    else:
        color = 'gray'

    # ポリラインを描画
    folium.PolyLine(
        locations=[(row_current['x'], row_current['y']), (row_next['x'], row_next['y'])],
        weight=5,
        opacity=0.8,
        color=color
    ).add_to(folium_map)

# 地図表示
folium_map.save('map.html')
folium_map

  1. 本機能はβ版であり、次バージョンよりインターフェース仕様が変更になる可能性があります。
  2. 時系列データベースへの永続化をフックし、およそ1秒〜数秒で送信します。送信性能はサーバースペックやネットワークに依存します。
  3. 現バージョンでは設定が反映されるまで時間がかかる場合があります。
  4. Pythonプログラムを本記事の最後に掲載しています。