aptpod Tech Blog

株式会社アプトポッドのテクノロジーブログです

計測したら即分析!WebhookでGPS補正を起動してみた

intdashとタイムリーにシステム連携したいみなさん、

こんにちは。ソリューションアーキテクトの伊勢です。

2023年12月バージョンから待望のWebhook機能が利用できるようになったのでご紹介します。

はじめに

Webhookとは

システム内イベントをシステム外にHTTPで通知する仕組みです。


これでintdash内の変化をintdash外からリアルタイムに知ることができます。

イベントドリブンなシステム連携により、効率的なワークフローを実現できます。

intdash Webhook

以下のintdashリソースのイベントを通知することができます。

  • 計測:作成・変更・削除・完了・終了1
  • 接続:接続・切断・アップストリーム開始・ダウンストリーム開始
  • エッジ:作成・変更・削除
  • ユーザー:作成・変更・削除
  • テナント:作成・変更・削除
  • プロジェクトの所属エッジ:追加・変更・除外
  • プロジェクトの所属メンバー:追加・変更・除外

やってみた

Webhookリクエストの確認

早速やってみます。

まずは、WebhookのHTTPリクエストの内容を見てみましょう。

試しにWebhook.siteに通知

アクセスするだけでWebhook用のURLを払い出せるWebhook.siteを利用します。

webhook.site

Webhook.siteで払い出されたURLと、通知するイベントをintdash APIに登録します。

計測リソースのイベントmeasurements_eventを通知するよう登録します。

通知先URLと通知イベント種類を登録

スマートフォンアプリintdash Motionで計測すると、Webhook.siteに通知されます。

内容を見るとmeasurementリソースのcreatedアクションであること、計測のUUIDと発生時刻がわかります。

右上がヘッダー、下がボディ

リクエストボディはこのようなかたちです。

通知先システムはここから必要な情報取り出して処理を行います。

  • 共通
    • delivery_uuid: Webhook通知のID
    • hook_uuid: Webhook設定のID
    • resource_type": リソースの種類
    • action: リソースに対するイベントの種類
    • occurred_at: 発生時刻
  • リソース別:計測の場合
    • project_uuid: プロジェクトのID
    • measurement_uuid: 計測のID

ヘッダーには以下の項目があります。

  • X-Intdash-Signature-256: HMAC
    • リクエスト改竄防止の仕組み
    • Webhook設定にシークレット(Key)を登録すると、シークレットとリクエストボディで生成したメッセージ認証コード(MAC)を付与
    • 通知先システムで同じシークレットを保持し、受け取ったリクエストボディとあわせてMAC値を生成して比較検証

https://upload.wikimedia.org/wikipedia/commons/thumb/0/08/MAC.svg/1322px-MAC.svg.png

メッセージ認証符号 - Wikipedia

計測作成イベント:Slack

では、簡単なシステム連携を実装してみます。

AWS LambdaでSlackに計測開始を通知します。

Slack通知の構成


Slackチャンネルの通知用URLを払い出します。2

Slackの カスタムインテグレーション


Slackに通知するLambda関数を作成します。3

Slack通知Lambda

処理はこんな感じです。

  • HMAC検証
  • 計測作成以外のイベントは無視
  • Slack通知
  • intdash Webhookにレスポンスを返却


API Gatewayを作成して、Slack通知Lambdaを統合します。

API Gatewayに統合

作成したAPI GatewayのURLを確認


先ほど登録したintdash Webhookの設定をAPI GatewayのURLに変更します。

通知先URLを変更


計測を開始するとSlackに通知されます。

計測開始をSlack通知

通知のData Visualizerのリンクをクリックすると自動でLIVE再生が始まるようにしてみました。

Data VisualizerのURLに、準備しておいたスクリーンMotionで自動LIVE再生するようscreenName=Motion&playMode=liveというクエリストリングを付与しています。

都会の空は曇天模様

計測完了イベント:地図マッチング

少し時間のかかる分析処理を回してみます。

GPSデータを地図マッチングで補正します。

地図マッチングとは

GPSデータの緯度経度を道路に沿うように補正します。

今回使うFast Map Matching (FMM) はその新し目のアルゴリズムです。

zenn.dev

全体構成

利用するライブラリがたくさんあるため、コンテナイメージでGPS補正Lambdaを構築します。

GPS補正Lambdaはコンテナをロードしたり、道路ネットワークのノードを事前計算します。

intdash Webhookは通知後に10秒以内にレスポンスを返す必要があるため4、先にレスポンスを返却する別のLambdaを設けます。

地図マッチングの構成

さらに時間かかかる場合はAWS Step FunctionsやAWS Batchを検討することになるでしょう。

GPS補正Lambda

コンテナで実行するPythonプログラムを作成します。

処理はこのようになります。5

  • intdash SDKでエッジ名、計測時刻、GPSデータを取得
  • GPSデータ範囲のOpenStreetMapから道路データを取得
  • 道路データをもとに道路ネットワークモデルを構築
  • 道路ネットワークモデルにGPSデータをマッチング
  • マッチング前後のGPSデータを地図HTMLにプロット
  • 地図HTMLをS3にアップロード
  • マッチング前後の総走行距離を計算
  • 総走行距離と地図HTMLのURLをSlack通知


コンテナ起動時に上記プログラムを実行するカスタムランタイムを作成します。6

コンテナイメージをビルドし、Amazon ECRにプッシュします。

Dockerfileはこのようになります。7

  • 必要なライブラリをインストール
  • FMMソースをビルド
  • intdash SDKをOpenAPIで生成
  • Pythonプログラム、カスタムランタイムをローカルからコピー


GPS補正Lambdaを作成します。

コンテナイメージから作成

タイムアウトは10分、FMMが使うのでメモリを1024MBにしました。

レスポンス返却Lambda

もう1つのLambda関数を作成します。8

処理はこれだけです。

  • HMAC検証
  • 計測完了以外は無視
  • GPS補正Lambdaを非同期起動
  • intdash Webhookのレスポンスを返却

レスポンス返却Lambda

API Gateway

さきほどのAPI Gatewayの統合先をレスポンス返却Lambdaに変更します。

起動Lambdaを変更

起動

intdash Motionで、計測時にあげきれていなかったデータを遅延アップロードします。

GPSがずれやすいトンネルや急カーブのあるルートを選択します。

Motionから遅延アップロード

サーバー上で計測データがすべて回収済みになり、計測が完了します。

計測が完了

Webhook設定により、計測完了イベントが通知されます。

2分ほど待つとSlackにGPS補正結果が通知されます。

補正結果の通知

OpenStreetMapのリンクをクリックすると、S3に配置した地図HTMLが開くようにしてみました。

来んかな福江島

ブルーの線が補正前のGPSデータ、ピンクの線が補正後のGPSデータです。

ズームすると、信号が途絶するトンネルや位置がぶれやすい海岸沿いの急カーブのデータが補正されて、道路に沿っているのがわかります。

ざぁま海と山

おわりに

Webhookによりリアルタイムなシステム連携を実現でき、運用ワークフローが大幅に改善します。

他にも、稼働スケジュールと突き合わせてエッジの予定外起動を検知したり、

処理を逐次起動できるので定期バッチに比べて負荷の平準化も期待できます。

さらに以前ご紹介した永続化拡張と組み合わせて、intdash APIにアクセスする処理を実装せずにデータ分析を組むことも可能です。

tech.aptpod.co.jp

intdashを使ったシステム構築はぜひaptpodにご相談ください。

Appendix. サンプルソース

計測作成イベント:Slack

Slack通知Lambda Pythonプログラム
import logging
import json
import hashlib
import base64
import hmac
import requests

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# 定数定義
SECRET_KEY = 'YOUR_SECRET_KEY'
SLACK_WEBHOOK_URL = 'YOUR_SLACK_WEBHOOK_URL'

def verify_hmac(secret, payload, received):
    """
    HMAC検証

    Args:
        secret (str): シークレットキー
        payload (str): ペイロード
        received (str): 受信したHMAC

    Returns:
        bool: 検証結果(True: 成功, False: 失敗)
    """
    hmac_obj = hmac.new(secret.encode(), payload.encode(), hashlib.sha256)
    computed = base64.b64encode(hmac_obj.digest()).decode()
    return hmac.compare_digest(computed, received)

def send_notification(project_uuid, meas_uuid):
    """
    Slack通知

    @param project_uuid: プロジェクトUUID
    @param meas_uuid: 計測UUID
    @return: なし
    """

    message = {
        'attachments': [
            {
                'color': '#de82a7',
                'author_name': 'intdash Webhook',
                "author_icon": "YOUR_ICON_URL",
                'title': "LIVE再生中です",
                "fields": [
                    {
                        "title": "計測",
                        "value": f"<https://example.intdash.jp/console/measurements/{meas_uuid}/?projectUuid={project_uuid}|Meas Hub>"
                    },
                    {
                        "title": "リアルタイム再生",
                        "value": f"<https://example.intdash.jp/vm2m/?projectUuid={project_uuid}&screenName=Motion&playMode=live|Data Visualizer>"
                    }
                ],
                "footer": "計測したら即分析!WebhookでGPS補正を起動してみた",
                "footer_icon": "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTkbv74KdmChO7FenKcskkqIZTIYEMjJSisLjZVk5_O7pe-QKEBb1Kntdx-grb7dvdsNDs&usqp=CAU",
                'mrkdwn_in': ['text', 'fields']
            }
        ]
    }

    try:
        response = requests.post(
            SLACK_WEBHOOK_URL,
            data=json.dumps(message),
            headers={'Content-Type': 'application/json'}
        )
        if response.status_code != 200:
            logger.error(f"Failed to send Slack notification. Status Code: {response.status_code}, Response: {response.text}")

        logger.info("Slack notification sent successfully.")

    except Exception as e:
        logger.error(f"Exception occurred while sending Slack notification: {e}")

def lambda_handler(event, context):
    """
    エントリポイント

    Args:
        event (dict): イベント
        context (LambdaContext): コンテキスト

    Returns:
        dict: APIレスポンス
    """
    headers = event.get('headers', {})
    body = event.get('body', '{}')
    logger.info(f"Received event: {event}")

    body_dict = json.loads(body)
    payload = json.dumps(body_dict, separators=(',', ':'))

    # HMAC検証
    signature = headers.get('x-intdash-signature-256', '')
    if not verify_hmac(SECRET_KEY, payload, signature):
        logger.warning("HMAC Verification Failed.")
        return {
            'statusCode': 401,
            'body': json.dumps({'message': 'HMAC verification failed'})
        }

    # リソースタイプとアクションの判定
    resource_type = body_dict.get('resource_type')
    action = body_dict.get('action')
    if resource_type != 'measurement' or action != 'created':
        logger.info(f"Ignored: resource_type={resource_type}, action={action}")
        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Resource_type or action ignored'})
        }

    # Slack通知
    try:
        project_uuid = body_dict.get('project_uuid')
        meas_uuid = body_dict.get('measurement_uuid')
        send_notification(project_uuid, meas_uuid)
    except Exception as e:
        logger.error(f"Failed to Slack notification: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'message': 'Failed to Slack notification'})
        }

    return {
        'statusCode': 200,
        'body': json.dumps({'message': 'Webhook received!'})
    }

計測完了イベント:地図マッチング

GPS補正Lambda Pythonプログラム
import sys
import os
import json
import logging
import time
import datetime
import pytz
import requests
from apiclient import Configuration, ApiClient
from apiclient.api import authentication_service_edges_api, measurement_service_measurements_api, measurement_service_data_points_api
import struct
import base64
import numpy as np
import pandas as pd
import geopandas as gpd
import fmm
from math import radians, sin, cos, sqrt, atan2
import osmnx as ox
from shapely.geometry import Polygon
import folium
import boto3

logger = logging.getLogger()
if not logger.hasHandlers():
    handler = logging.StreamHandler()
    logger.addHandler(handler)
logger.setLevel(logging.INFO)

# 環境変数
os.environ['MPLCONFIGDIR'] = '/tmp'

# ローカルストレージ
data_path = "/tmp/"

# intdash API設定
BASE_URL = "http://example.intdash.jp/api"
API_TOKEN = "YOUR_API_TOKEN"

# S3バケット設定
BUCKET_NAME = 'YOUR_BUCKET_NAME'
AWS_REGION = 'YOUR_AWS_REGION'
ACCESS_KEY = 'YOUR_ACCESS_KEY'
SECRET_KEY = 'YOUR_SECRET_KEY'

# Slack Webhook URL
SLACK_WEBHOOK_URL = 'YOUR_SLACK_WEBHOOK_URL'


def get_edge(edge_uuid):
    """
    エッジ情報取得

    @param edge_id: エッジUUID
    @return: エッジ情報
    """
    configuration = Configuration(host=BASE_URL, api_key={"IntdashToken": API_TOKEN})

    with ApiClient(configuration) as api_client:
        api_instance = authentication_service_edges_api.AuthenticationServiceEdgesApi(api_client=api_client)
        edge = api_instance.get_edge(edge_uuid=edge_uuid)
        return edge


def get_meas(project_uuid, meas_uuid):
    """
    計測情報取得

    @param project_uuid: プロジェクトUUID
    @param meas_uuid: 計測UUID 
    @return: 計測情報
    """
    configuration = Configuration(host=BASE_URL, api_key={"IntdashToken": API_TOKEN})

    with ApiClient(configuration) as api_client:
        api_instance = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(api_client=api_client)
        meas = api_instance.get_project_measurement(project_uuid=project_uuid, measurement_uuid=meas_uuid)
        return meas


def get_gnss(project_uuid, meas_uuid):
    """
    GNSSデータ取得

    計測データからGNSS (1/gnss_coordinates) を抽出

    @param project_uuid: プロジェクトUUID
    @param meas_id: 計測UUID
    @return: [(x, y)]
    """
    configuration = Configuration(host=BASE_URL, api_key={"IntdashToken": API_TOKEN})

    with ApiClient(configuration) as api_client:
        api_instance = measurement_service_data_points_api.MeasurementServiceDataPointsApi(api_client=api_client)
        stream = api_instance.list_project_data_points(project_uuid=project_uuid, name=meas_uuid)

        coordinates = []

        while True:
            line = stream.readline()
            if not line:
                break

            resp = json.loads(line.decode())

            if 'data' in resp and 'd' in resp['data']:
                base64_encoded = resp['data']['d']
                bin_data = base64.b64decode(base64_encoded)

                data_id = resp.get('data_id', 'N/A')
                if data_id == '1/gnss_coordinates':
                    x, y = struct.unpack('>dd', bin_data)
                    coordinates.append((x, y))

    return coordinates


def find_bounds(coordinates):
    """
    緯度経度境界探索

    緯度経度の最小値-0.01, 最大値+0.01で境界を作成

    @param coordinates: GPS座標のリスト ([(lat, lon)])
    @return: [(min_lat-0.01, min_lon-0.01), (max_lat+0.01, max_lon+0.01)] のタプル
    """
    lats = [lat for lat, lon in coordinates]
    lons = [lon for lat, lon in coordinates]

    min_lat, max_lat = min(lats), max(lats)
    min_lon, max_lon = min(lons), max(lons)

    bounds = [(min_lat - 0.01, min_lon - 0.01), (max_lat + 0.01, max_lon + 0.01)]

    return bounds


def save_shape(G, filepath=None, encoding="utf-8"):
    """
    シェープファイル保存

    @param G: グラフオブジェクト
    @param filepath: 保存先のディレクトリ
    @param encoding: ファイルエンコーディング
    """
    if filepath is None:
        filepath = os.path.join(ox.settings.data_folder, "graph_shapefile")

    if not os.path.exists(filepath):
        os.makedirs(filepath)
    
    filepath_nodes = os.path.join(filepath, "nodes.shp")
    filepath_edges = os.path.join(filepath, "edges.shp")

    gdf_nodes, gdf_edges = ox.convert.graph_to_gdfs(G)
    gdf_nodes = ox.io._stringify_nonnumeric_cols(gdf_nodes)
    gdf_edges = ox.io._stringify_nonnumeric_cols(gdf_edges)
    
    gdf_edges["fid"] = np.arange(0, gdf_edges.shape[0], dtype='int')

    gdf_nodes.to_file(filepath_nodes, encoding=encoding)
    gdf_edges.to_file(filepath_edges, encoding=encoding)


def sort_coordinates(matched_edges, cpath_ids):
    """
    経路並び替え

    cpath_idsに基づいて経路を並び替える

    @param matched_edges: マッチングされた経路のGeoDataFrame
    @param cpath_ids: FMMのマッチ結果であるcpathのIDリスト
    @return: 並び替えられたGPS座標のリスト ([(lat, lon)]形式)
    """
    ordered_coordinates = []

    for fid in cpath_ids:
        edge = matched_edges[matched_edges['fid'] == fid]
        if not edge.empty:
            linestring = edge.iloc[0]['geometry']
            coords = [(lat, lon) for lon, lat in list(linestring.coords)]
            ordered_coordinates.extend(coords)

    return ordered_coordinates

def calculate_distance(coord1, coord2):
    """
    GPS座標間距離計算

    @param coord1: (lat, lon) 1つ目の座標
    @param coord2: (lat, lon) 2つ目の座標
    @return: 2点間の距離 (km)
    """
    R = 6371
    lat1, lon1 = coord1
    lat2, lon2 = coord2

    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)

    a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    return R * c

def total_distance(coordinates):
    """
    総走行距離計算

    @param coordinates: GPS座標のリスト ([(lat, lon)])
    @return: 総走行距離 (km)
    """
    distance = 0.0
    for i in range(1, len(coordinates)):
        distance += calculate_distance(coordinates[i - 1], coordinates[i])
    return distance


def upload_file(filepath, bucket_name, key):
    """
    S3アップロード

    @param filepath: アップロードするファイルのパス
    @param bucket_name: S3バケット名
    @param key: S3内でのファイルキー
    @return: アップロードされたファイルの公開URL
    """
    s3_client = boto3.client('s3', region_name=AWS_REGION,
                             aws_access_key_id=ACCESS_KEY,
                             aws_secret_access_key=SECRET_KEY)
    
    try:
        s3_client.upload_file(filepath, bucket_name, key, ExtraArgs={'ContentType': 'text/html'})
        url = f"https://{bucket_name}.s3.{AWS_REGION}.amazonaws.com/{key}"
        return url
    except Exception as e:
        logger.error(f"Error uploading to S3: {e}")
        return None


def send_notification(project_uuid, edge, meas, distance_origin, distance_matched, map_url):
    """
    Slack通知

    @param project_uuid: プロジェクトUUID
    @param edge: エッジ情報
    @param meas: 計測情報
    @param distance_origin: 補正前の総走行距離 (km)
    @param distance_matched: 補正後の総走行距離 (km)
    @param map_url: 地図のURL (OpenStreetMap)
    @return: なし
    """

    edge_uuid = edge['uuid']
    edge_name = edge['name']

    meas_uuid = meas['uuid']
    jst = pytz.timezone('Asia/Tokyo')
    base_time = meas['basetime'].astimezone(jst)
    start_time = base_time.strftime('%Y/%m/%d %H:%M:%S')
    duration = meas['duration'] / 1_000_000
    end_time = (base_time + datetime.timedelta(seconds=duration)).strftime('%Y/%m/%d %H:%M:%S')

    message = {
        'attachments': [
            {
                'color': '#00bfff',
                'author_name': 'Fast Map Matching',
                "author_icon": "YOUR_ICON",
                'title': "GPSデータを補正しました",
                "fields": [
                    {
                        "title": "エッジ",
                        "value": f"<https://example.intdash.jp/console/admin/edges/{edge_uuid}|{edge_name}>"
                    },
                    {
                        "title": "計測",
                        "value": f"<https://example.intdash.jp/console/measurements/{meas_uuid}/?projectUuid={project_uuid}|{start_time} - {end_time}>"
                    },
                    {
                        "title": "総走行距離 補正前",
                        "value": f"{distance_origin:.2f} km"
                    },
                    {
                        "title": "総走行距離 補正後",
                        "value": f"{distance_matched:.2f} km"
                    },
                    {
                        "title": "走行ルート",
                        "value": f"<{map_url}|OpenStreetMap>"
                    }
                ],
                "footer": "計測したら即分析!WebhookでGPS補正を起動してみた",
                "footer_icon": "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTkbv74KdmChO7FenKcskkqIZTIYEMjJSisLjZVk5_O7pe-QKEBb1Kntdx-grb7dvdsNDs&usqp=CAU",
                'mrkdwn_in': ['text', 'fields']
            }
        ]
    }

    try:
        response = requests.post(
            SLACK_WEBHOOK_URL,
            data=json.dumps(message),
            headers={'Content-Type': 'application/json'}
        )
        if response.status_code != 200:
            logger.error(f"Failed to send Slack notification. Status Code: {response.status_code}, Response: {response.text}")

        logger.info("Slack notification sent successfully.")

    except Exception as e:
        logger.error(f"Exception occurred while sending Slack notification: {e}")


def lambda_handler(event, context):
    logger.info(f"Received event: {event}")

    try:

        ## 入力

        # GPSデータ読み込み
        project_uuid = event.get('project_uuid')
        meas_uuid = event.get('measurement_uuid')
        coordinates = get_gnss(project_uuid, meas_uuid)
        logger.debug(f"Coordinates: {coordinates}")
        if not coordinates:
            return {
                'statusCode': 200,
                'body': json.dumps({'message': 'No coordinates data found'})
            }
        fmm_coordinates = [[lon, lat] for lat, lon in coordinates]  # fmmは[[lon, lat]]形式

        # OSMデータの取得とシェープファイルへの保存
        bounds = find_bounds(coordinates)
        (min_lat, min_lon), (max_lat, max_lon) = bounds
        boundary_polygon = Polygon([(min_lon, min_lat), (max_lon, min_lat), (max_lon, max_lat), (min_lon, max_lat)])
        G = ox.graph_from_polygon(boundary_polygon, network_type='drive')
        save_shape(G, filepath=f'{data_path}')

        # 道路ネットワークデータの読み込み
        edges = gpd.read_file(f"{data_path}/edges.shp") # 経路: 道路ネットワークの2つのノード(交差点など)を結ぶ線
        network = fmm.Network(f"{data_path}/edges.shp", "fid", "u", "v")
        graph = fmm.NetworkGraph(network)
    
        logger.info(f"Loaded {network.get_edge_count()} edges and {network.get_node_count()} nodes from shapefiles.")


        ## 地図マッチング

        # UBODTの生成・読み込み
        ubodt_gen = fmm.UBODTGenAlgorithm(network, graph)
        ubodt_gen.generate_ubodt(f"{data_path}/ubodt.txt", 0.02, binary=False, use_omp=True)

        ubodt = fmm.UBODT.read_ubodt_csv(f"{data_path}/ubodt.txt")

        # FMMモデルマッチング
        model = fmm.FastMapMatch(network, graph, ubodt)
        coordinates_str_list = [f"{lon} {lat}" for lon, lat in fmm_coordinates]
        wkt = f"LINESTRING({','.join(coordinates_str_list)})"
        fmm_config = fmm.FastMapMatchConfig(5, 20, 15) # 候補数, 探索半径(m), GPS誤差(m)
        result = model.match_wkt(wkt, fmm_config)
        logger.info(f"Matched {len(result.cpath)} edges.")


        ## 出力

        # 地図作成
        m = folium.Map(location=[np.mean([bounds[0][0], bounds[1][0]]), np.mean([bounds[0][1], bounds[1][1]])], zoom_start=12)

        # 補正前GPSデータをプロット
        folium.PolyLine(coordinates, color='deepskyblue', weight=8, opacity=0.5, popup='Original Route').add_to(m)
        folium.Marker(location=coordinates[0], popup='Start', icon=folium.Icon(color='lightred', icon='play', prefix='fa')).add_to(m)
        folium.Marker(location=coordinates[-1], popup='Goal', icon=folium.Icon(color='lightred', icon='flag', prefix='fa')).add_to(m)

        # 補正後のルートをプロット
        matched_edges = edges[edges.fid.isin(result.cpath)]
        matched_coordinates = sort_coordinates(matched_edges, result.cpath)
        folium.PolyLine(matched_coordinates, color='deeppink', weight=7, opacity=0.6, popup='Matched Route').add_to(m)

        # マップ保存・S3アップロード
        map_file = f"map_{meas_uuid}.html"
        map_path = f"{data_path}/{map_file}"
        m.save(map_file)
        map_url = upload_file(map_file, BUCKET_NAME, map_file)
        if not map_url:
            logger.error("Map upload failed.")
        logger.info(f"Map uploaded to S3: {map_url}")

        # 総走行距離を計算
        total_distance_origin = total_distance(coordinates)
        total_distance_matched = total_distance(matched_coordinates)
        logger.info(f"補正前の総走行距離: {total_distance_origin:.2f} km")
        logger.info(f"補正後の総走行距離: {total_distance_matched:.2f} km")

        # Slack通知
        meas = get_meas(project_uuid, meas_uuid)
        edge = get_edge(meas['edge_uuid'])
        send_notification(project_uuid, edge, meas, total_distance_origin, total_distance_matched, map_url)

        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'FMM completed and Slack notification sent'})
        }

    except Exception as e:
        logger.error(f"Error processing event: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }


if __name__ == "__main__":
    # ローカルテスト用
    if len(sys.argv) > 1:
        with open(sys.argv[1], 'r') as f:
            event = json.load(f)
        lambda_handler(event, None)
    else:
        print("Usage: python3 app.py <event.json>")
GPS補正Lambda bootstrapシェル
#!/bin/bash

# -------------------------------------------------------------------
# bootstrap
# -------------------------------------------------------------------
# カスタムランタイム
# 
# コンテナ起動時に実行
# LambdaランタイムAPIからイベントリクエスト取得 
# リクエストイベントを引数にlambda_handler起動
# 結果をLambdaランタイムAPIに登録
# -------------------------------------------------------------------

set -euo pipefail  # エラー発生時にスクリプトを終了し、未定義変数をエラーにする

while true; do
  # LambdaランタイムAPIからイベントリクエスト取得
  RESPONSE=$(curl -i -s "http://${AWS_LAMBDA_RUNTIME_API}/2018-06-01/runtime/invocation/next")
  HEADERS=$(echo "$RESPONSE" | sed -n '/^\r$/q;p')
  REQUEST_ID=$(echo "$HEADERS" | grep -Fi 'Lambda-Runtime-Aws-Request-Id' | awk '{print $2}' | tr -d '\r')
  if [[ -z "$REQUEST_ID" ]]; then
    echo "Error: REQUEST_ID is empty"
    exit 1
  fi
  EVENT_DATA=$(echo "$RESPONSE" | sed '1,/^\r$/d')
  echo "REQUEST_ID: $REQUEST_ID EVENT_DATA: $EVENT_DATA"

  # lambda_handler起動
  RESPONSE=$(python3 -c "
import json
from app import lambda_handler
event = json.loads('''$EVENT_DATA''')
response = lambda_handler(event, None)
print(json.dumps(response))
")
  echo "RESPONSE: $RESPONSE"
  if [[ -z "$RESPONSE" ]]; then
    echo "Error: RESPONSE is empty. Request ID: $REQUEST_ID"
    exit 1
  fi

  # 結果をLambdaランタイムAPIに登録
  curl -s -X POST "http://${AWS_LAMBDA_RUNTIME_API}/2018-06-01/runtime/invocation/${REQUEST_ID}/response" \
  -d "$RESPONSE"
done
GPS補正Lambda コンテナDockerfile
# -------------------------------------------------------------------
# ベースイメージ: Ubuntu 22.04 LTS
# -------------------------------------------------------------------
FROM ubuntu:22.04

# -------------------------------------------------------------------
# 基本ツールのインストール
# - ビルドツール、CMake、Git、Python、npm ...
# -------------------------------------------------------------------
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    cmake \
    g++ \
    git \
    vim \
    python3=3.10.6-1~22.04.1 \
    python3-dev=3.10.6-1~22.04.1 \
    python3-pip=22.0.2+dfsg-1ubuntu0.4 \
    wget \
    curl \
    libgdal-dev=3.4.1+dfsg-1build4 \
    libboost-all-dev=1.74.0.3ubuntu7 \
    libosmium2-dev=2.18.0-1 \
    swig \
    expat \
    bzip2 \
    default-jre=2:1.11-72build2 \
    npm \
    jq \
    && rm -rf /var/lib/apt/lists/* \
    && apt-get clean

# -------------------------------------------------------------------
# Python3 シンボリックリンク設定
# - Python3 と PIP3 をデフォルトに設定
# -------------------------------------------------------------------
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 \
    && update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1

# -------------------------------------------------------------------
# FMMに必要なPython パッケージインストール
# - Numpy、Pandas、Osmnx、Shapely ..
# -------------------------------------------------------------------
RUN pip install --upgrade pip \
    && pip install --no-cache-dir \
    numpy==1.26.4 \
    pandas==2.2.2 \
    osmnx==1.9.4 \
    folium==0.17.0 \
    shapely==2.0.6 \
    polars==1.9.0

# -------------------------------------------------------------------
# FMM のクローンとビルド
# - GitHub から FMM をクローンし、ビルドしてインストール
# -------------------------------------------------------------------
RUN git clone https://github.com/cyang-kth/fmm.git /home/fmm
WORKDIR /home/fmm/build
RUN cmake .. \
    && make -j"$(nproc)" \
    && make install

# -------------------------------------------------------------------
# intdash SDK のインストール準備
# - Node.js 設定と npm パッケージのインストール
# - openapi-generator-cli のインストール
# -------------------------------------------------------------------
WORKDIR /home
RUN npm install -g n@10.0.0 \
    && n 20.18.0 \
    && hash -r \
    && npm install --save @openapitools/openapi-generator-cli

# -------------------------------------------------------------------
# intdash SDK インストール
# - API クライアントコードを生成、Python パッケージとしてインストール
# - コンテナで起動するため、dist-packagesにコピー
# -------------------------------------------------------------------
ENV VERSION=v2.7.0
RUN ./node_modules/.bin/openapi-generator-cli version-manager set 6.1.0 \
    && ./node_modules/.bin/openapi-generator-cli generate \
    -g python -i https://docs.intdash.jp/api/intdash-api/${VERSION}/openapi_public.yaml \
    --package-name=apiclient
RUN cp -r /home/apiclient /usr/local/lib/python3.10/dist-packages/

# -------------------------------------------------------------------
# アプリに必要なPython パッケージをインストール
# - 地図、AWS SDK for Python、HTTP
# -------------------------------------------------------------------
ENV PACKAGE_PREFIX=/var/task
RUN pip install --no-cache-dir \
    folium==0.17.0 \
    boto3==1.35.32 \
    requests==2.32.3

# -------------------------------------------------------------------
# ローカルファイルのコピー
# - アプリケーションファイル
# - ローカルテスト用ファイル
# -------------------------------------------------------------------
COPY bootstrap ${PACKAGE_PREFIX}/bootstrap
RUN chmod +x /var/task/bootstrap
COPY app.py ${PACKAGE_PREFIX}/app.py
COPY event.json /home/event.json

# -------------------------------------------------------------------
# 環境変数の設定
# - GDAL、GEOS、およびその他の必要なライブラリのパスを設定
# -------------------------------------------------------------------
ENV \
  GDAL_DATA=${PACKAGE_PREFIX}/share/gdal \
  PROJ_LIB=/usr/share/proj/ \
  GDAL_CONFIG=${PACKAGE_PREFIX}/bin/gdal-config \
  GEOS_CONFIG=${PACKAGE_PREFIX}/bin/geos-config \
  PATH=${PACKAGE_PREFIX}/bin:$PATH \
  PYTHONPATH=${PACKAGE_PREFIX}

# -------------------------------------------------------------------
# ENTRYPOINT、CMD、WORKDIR の設定
# - FMM実行時にcacheにJSONファイル作成のため、/tmp を親ディレクトリに設定
# -------------------------------------------------------------------
WORKDIR /tmp
ENTRYPOINT ["/var/task/bootstrap"]
CMD ["python3", "/var/task/app.py"]
レスポンス返却Lambda Pythonプログラム
import json
import hmac
import hashlib
import base64
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# 定数定義
SECRET_KEY = 'YOUR_SECRET_KEY'
LAMBDA_CLIENT = boto3.client('lambda')

def verify_hmac(secret, payload, received):
    """
    HMAC検証

    Args:
        secret (str): シークレットキー
        payload (str): ペイロード
        received (str): 受信したHMAC

    Returns:
        bool: 検証結果(True: 成功, False: 失敗)
    """
    hmac_obj = hmac.new(secret.encode(), payload.encode(), hashlib.sha256)
    computed = base64.b64encode(hmac_obj.digest()).decode()
    return hmac.compare_digest(computed, received)

def lambda_handler(event, context):
    """
    エントリポイント

    Args:
        event (dict): イベント
        context (LambdaContext): コンテキスト

    Returns:
        dict: APIレスポンス
    """
    headers = event.get('headers', {})
    body = event.get('body', '{}')
    logger.info(f"Received event: {event}")

    body_dict = json.loads(body)
    payload = json.dumps(body_dict, separators=(',', ':'))

    # HMAC検証
    signature = headers.get('x-intdash-signature-256', '')
    if not verify_hmac(SECRET_KEY, payload, signature):
        logger.warning("HMAC Verification Failed.")
        return {
            'statusCode': 401,
            'body': json.dumps({'message': 'HMAC verification failed'})
        }

    # リソースタイプとアクションの判定
    resource_type = body_dict.get('resource_type')
    action = body_dict.get('action')
    if resource_type != 'measurement' or action != 'completed':
        logger.info(f"Ignored: resource_type={resource_type}, action={action}")
        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Resource_type or action ignored'})
        }

    # FMM Lambdaを非同期起動
    try:
        response = LAMBDA_CLIENT.invoke(
            FunctionName='intdash-fmm',
            InvocationType='Event',
            Payload=json.dumps(body_dict)
        )
        logger.info(f"FMM Lambda invoked successfully: {response}")
    except Exception as e:
        logger.error(f"Failed to invoke FMM Lambda: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'message': 'Failed to invoke FMM Lambda'})
        }

    return {
        'statusCode': 200,
        'body': json.dumps({'message': 'Webhook received and FMM Lambda invoked'})
    }

  1. 完了はエッジ側からすべての計測データを受け取りきったときに自動で遷移します。終了はユーザー操作によりサーバー側で計測が閉じられたときの状態です。
  2. SlackのIncoming Webhookとはリクエストのフォーマットが異なるため、intdash Webhookは直接連携できません。
  3. サンプルプログラムを本記事の最後に掲載しています。
  4. レスポンスが返らない場合は通知が失敗したとみなされ、intdash次回起動時に再送されます。
  5. サンプルプログラムを本記事の最後に掲載しています。
  6. サンプルシェルを本記事の最後に掲載しています。
  7. サンプルファイルを本記事の最後に掲載しています。
  8. サンプルプログラムを本記事の最後に掲載しています。