intdashとタイムリーにシステム連携したいみなさん、
こんにちは。ソリューションアーキテクトの伊勢です。
2023年12月バージョンから待望のWebhook機能が利用できるようになったのでご紹介します。
はじめに
Webhookとは
システム内イベントをシステム外にHTTPで通知する仕組みです。
これでintdash内の変化をintdash外からリアルタイムに知ることができます。
イベントドリブンなシステム連携により、効率的なワークフローを実現できます。
以下のintdashリソースのイベントを通知することができます。
- 計測:作成・変更・削除・完了・終了1
- 接続:接続・切断・アップストリーム開始・ダウンストリーム開始
- エッジ:作成・変更・削除
- ユーザー:作成・変更・削除
- テナント:作成・変更・削除
- プロジェクトの所属エッジ:追加・変更・除外
- プロジェクトの所属メンバー:追加・変更・除外
やってみた
Webhookリクエストの確認
早速やってみます。
まずは、WebhookのHTTPリクエストの内容を見てみましょう。
アクセスするだけでWebhook用のURLを払い出せるWebhook.siteを利用します。
Webhook.siteで払い出されたURLと、通知するイベントをintdash APIに登録します。
計測リソースのイベントmeasurements_event
を通知するよう登録します。
スマートフォンアプリintdash Motionで計測すると、Webhook.siteに通知されます。
内容を見るとmeasurement
リソースのcreated
アクションであること、計測のUUIDと発生時刻がわかります。
リクエストボディはこのようなかたちです。
通知先システムはここから必要な情報取り出して処理を行います。
- 共通
delivery_uuid
: Webhook通知のIDhook_uuid
: Webhook設定のIDresource_type
": リソースの種類action
: リソースに対するイベントの種類occurred_at
: 発生時刻
- リソース別:計測の場合
project_uuid
: プロジェクトのIDmeasurement_uuid
: 計測のID
ヘッダーには以下の項目があります。
X-Intdash-Signature-256
: HMAC- リクエスト改竄防止の仕組み
- Webhook設定にシークレット(Key)を登録すると、シークレットとリクエストボディで生成したメッセージ認証コード(MAC)を付与
- 通知先システムで同じシークレットを保持し、受け取ったリクエストボディとあわせてMAC値を生成して比較検証
計測作成イベント:Slack
では、簡単なシステム連携を実装してみます。
AWS LambdaでSlackに計測開始を通知します。
Slackチャンネルの通知用URLを払い出します。2
Slackに通知するLambda関数を作成します。3
処理はこんな感じです。
- HMAC検証
- 計測作成以外のイベントは無視
- Slack通知
- intdash Webhookにレスポンスを返却
API Gatewayを作成して、Slack通知Lambdaを統合します。
先ほど登録したintdash Webhookの設定をAPI GatewayのURLに変更します。
計測を開始するとSlackに通知されます。
通知のData Visualizer
のリンクをクリックすると自動でLIVE再生が始まるようにしてみました。
Data VisualizerのURLに、準備しておいたスクリーンMotion
で自動LIVE再生するようscreenName=Motion&playMode=live
というクエリストリングを付与しています。
計測完了イベント:地図マッチング
少し時間のかかる分析処理を回してみます。
GPSデータを地図マッチングで補正します。
地図マッチングとは
GPSデータの緯度経度を道路に沿うように補正します。
今回使うFast Map Matching (FMM) はその新し目のアルゴリズムです。
全体構成
利用するライブラリがたくさんあるため、コンテナイメージでGPS補正Lambdaを構築します。
GPS補正Lambdaはコンテナをロードしたり、道路ネットワークのノードを事前計算します。
intdash Webhookは通知後に10秒以内にレスポンスを返す必要があるため4、先にレスポンスを返却する別のLambdaを設けます。
さらに時間かかかる場合はAWS Step FunctionsやAWS Batchを検討することになるでしょう。
GPS補正Lambda
コンテナで実行するPythonプログラムを作成します。
処理はこのようになります。5
- intdash SDKでエッジ名、計測時刻、GPSデータを取得
- GPSデータ範囲のOpenStreetMapから道路データを取得
- 道路データをもとに道路ネットワークモデルを構築
- 道路ネットワークモデルにGPSデータをマッチング
- マッチング前後のGPSデータを地図HTMLにプロット
- 地図HTMLをS3にアップロード
- マッチング前後の総走行距離を計算
- 総走行距離と地図HTMLのURLをSlack通知
コンテナ起動時に上記プログラムを実行するカスタムランタイムを作成します。6
コンテナイメージをビルドし、Amazon ECRにプッシュします。
Dockerfileはこのようになります。7
- 必要なライブラリをインストール
- FMMソースをビルド
- intdash SDKをOpenAPIで生成
- Pythonプログラム、カスタムランタイムをローカルからコピー
GPS補正Lambdaを作成します。
タイムアウトは10分、FMMが使うのでメモリを1024MBにしました。
レスポンス返却Lambda
もう1つのLambda関数を作成します。8
処理はこれだけです。
- HMAC検証
- 計測完了以外は無視
- GPS補正Lambdaを非同期起動
- intdash Webhookのレスポンスを返却
API Gateway
さきほどのAPI Gatewayの統合先をレスポンス返却Lambdaに変更します。
起動
intdash Motionで、計測時にあげきれていなかったデータを遅延アップロードします。
GPSがずれやすいトンネルや急カーブのあるルートを選択します。
サーバー上で計測データがすべて回収済みになり、計測が完了します。
Webhook設定により、計測完了イベントが通知されます。
2分ほど待つとSlackにGPS補正結果が通知されます。
OpenStreetMap
のリンクをクリックすると、S3に配置した地図HTMLが開くようにしてみました。
ブルーの線が補正前のGPSデータ、ピンクの線が補正後のGPSデータです。
ズームすると、信号が途絶するトンネルや位置がぶれやすい海岸沿いの急カーブのデータが補正されて、道路に沿っているのがわかります。
おわりに
Webhookによりリアルタイムなシステム連携を実現でき、運用ワークフローが大幅に改善します。
他にも、稼働スケジュールと突き合わせてエッジの予定外起動を検知したり、
処理を逐次起動できるので定期バッチに比べて負荷の平準化も期待できます。
さらに以前ご紹介した永続化拡張と組み合わせて、intdash APIにアクセスする処理を実装せずにデータ分析を組むことも可能です。
intdashを使ったシステム構築はぜひaptpodにご相談ください。
Appendix. サンプルソース
計測作成イベント:Slack
Slack通知Lambda Pythonプログラム
import logging import json import hashlib import base64 import hmac import requests logger = logging.getLogger() logger.setLevel(logging.INFO) # 定数定義 SECRET_KEY = 'YOUR_SECRET_KEY' SLACK_WEBHOOK_URL = 'YOUR_SLACK_WEBHOOK_URL' def verify_hmac(secret, payload, received): """ HMAC検証 Args: secret (str): シークレットキー payload (str): ペイロード received (str): 受信したHMAC Returns: bool: 検証結果(True: 成功, False: 失敗) """ hmac_obj = hmac.new(secret.encode(), payload.encode(), hashlib.sha256) computed = base64.b64encode(hmac_obj.digest()).decode() return hmac.compare_digest(computed, received) def send_notification(project_uuid, meas_uuid): """ Slack通知 @param project_uuid: プロジェクトUUID @param meas_uuid: 計測UUID @return: なし """ message = { 'attachments': [ { 'color': '#de82a7', 'author_name': 'intdash Webhook', "author_icon": "YOUR_ICON_URL", 'title': "LIVE再生中です", "fields": [ { "title": "計測", "value": f"<https://example.intdash.jp/console/measurements/{meas_uuid}/?projectUuid={project_uuid}|Meas Hub>" }, { "title": "リアルタイム再生", "value": f"<https://example.intdash.jp/vm2m/?projectUuid={project_uuid}&screenName=Motion&playMode=live|Data Visualizer>" } ], "footer": "計測したら即分析!WebhookでGPS補正を起動してみた", "footer_icon": "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTkbv74KdmChO7FenKcskkqIZTIYEMjJSisLjZVk5_O7pe-QKEBb1Kntdx-grb7dvdsNDs&usqp=CAU", 'mrkdwn_in': ['text', 'fields'] } ] } try: response = requests.post( SLACK_WEBHOOK_URL, data=json.dumps(message), headers={'Content-Type': 'application/json'} ) if response.status_code != 200: logger.error(f"Failed to send Slack notification. Status Code: {response.status_code}, Response: {response.text}") logger.info("Slack notification sent successfully.") except Exception as e: logger.error(f"Exception occurred while sending Slack notification: {e}") def lambda_handler(event, context): """ エントリポイント Args: event (dict): イベント context (LambdaContext): コンテキスト Returns: dict: APIレスポンス """ headers = event.get('headers', {}) body = event.get('body', '{}') logger.info(f"Received event: {event}") body_dict = json.loads(body) payload = json.dumps(body_dict, separators=(',', ':')) # HMAC検証 signature = headers.get('x-intdash-signature-256', '') if not verify_hmac(SECRET_KEY, payload, signature): logger.warning("HMAC Verification Failed.") return { 'statusCode': 401, 'body': json.dumps({'message': 'HMAC verification failed'}) } # リソースタイプとアクションの判定 resource_type = body_dict.get('resource_type') action = body_dict.get('action') if resource_type != 'measurement' or action != 'created': logger.info(f"Ignored: resource_type={resource_type}, action={action}") return { 'statusCode': 200, 'body': json.dumps({'message': 'Resource_type or action ignored'}) } # Slack通知 try: project_uuid = body_dict.get('project_uuid') meas_uuid = body_dict.get('measurement_uuid') send_notification(project_uuid, meas_uuid) except Exception as e: logger.error(f"Failed to Slack notification: {e}") return { 'statusCode': 500, 'body': json.dumps({'message': 'Failed to Slack notification'}) } return { 'statusCode': 200, 'body': json.dumps({'message': 'Webhook received!'}) }
計測完了イベント:地図マッチング
GPS補正Lambda Pythonプログラム
import sys import os import json import logging import time import datetime import pytz import requests from apiclient import Configuration, ApiClient from apiclient.api import authentication_service_edges_api, measurement_service_measurements_api, measurement_service_data_points_api import struct import base64 import numpy as np import pandas as pd import geopandas as gpd import fmm from math import radians, sin, cos, sqrt, atan2 import osmnx as ox from shapely.geometry import Polygon import folium import boto3 logger = logging.getLogger() if not logger.hasHandlers(): handler = logging.StreamHandler() logger.addHandler(handler) logger.setLevel(logging.INFO) # 環境変数 os.environ['MPLCONFIGDIR'] = '/tmp' # ローカルストレージ data_path = "/tmp/" # intdash API設定 BASE_URL = "http://example.intdash.jp/api" API_TOKEN = "YOUR_API_TOKEN" # S3バケット設定 BUCKET_NAME = 'YOUR_BUCKET_NAME' AWS_REGION = 'YOUR_AWS_REGION' ACCESS_KEY = 'YOUR_ACCESS_KEY' SECRET_KEY = 'YOUR_SECRET_KEY' # Slack Webhook URL SLACK_WEBHOOK_URL = 'YOUR_SLACK_WEBHOOK_URL' def get_edge(edge_uuid): """ エッジ情報取得 @param edge_id: エッジUUID @return: エッジ情報 """ configuration = Configuration(host=BASE_URL, api_key={"IntdashToken": API_TOKEN}) with ApiClient(configuration) as api_client: api_instance = authentication_service_edges_api.AuthenticationServiceEdgesApi(api_client=api_client) edge = api_instance.get_edge(edge_uuid=edge_uuid) return edge def get_meas(project_uuid, meas_uuid): """ 計測情報取得 @param project_uuid: プロジェクトUUID @param meas_uuid: 計測UUID @return: 計測情報 """ configuration = Configuration(host=BASE_URL, api_key={"IntdashToken": API_TOKEN}) with ApiClient(configuration) as api_client: api_instance = measurement_service_measurements_api.MeasurementServiceMeasurementsApi(api_client=api_client) meas = api_instance.get_project_measurement(project_uuid=project_uuid, measurement_uuid=meas_uuid) return meas def get_gnss(project_uuid, meas_uuid): """ GNSSデータ取得 計測データからGNSS (1/gnss_coordinates) を抽出 @param project_uuid: プロジェクトUUID @param meas_id: 計測UUID @return: [(x, y)] """ configuration = Configuration(host=BASE_URL, api_key={"IntdashToken": API_TOKEN}) with ApiClient(configuration) as api_client: api_instance = measurement_service_data_points_api.MeasurementServiceDataPointsApi(api_client=api_client) stream = api_instance.list_project_data_points(project_uuid=project_uuid, name=meas_uuid) coordinates = [] while True: line = stream.readline() if not line: break resp = json.loads(line.decode()) if 'data' in resp and 'd' in resp['data']: base64_encoded = resp['data']['d'] bin_data = base64.b64decode(base64_encoded) data_id = resp.get('data_id', 'N/A') if data_id == '1/gnss_coordinates': x, y = struct.unpack('>dd', bin_data) coordinates.append((x, y)) return coordinates def find_bounds(coordinates): """ 緯度経度境界探索 緯度経度の最小値-0.01, 最大値+0.01で境界を作成 @param coordinates: GPS座標のリスト ([(lat, lon)]) @return: [(min_lat-0.01, min_lon-0.01), (max_lat+0.01, max_lon+0.01)] のタプル """ lats = [lat for lat, lon in coordinates] lons = [lon for lat, lon in coordinates] min_lat, max_lat = min(lats), max(lats) min_lon, max_lon = min(lons), max(lons) bounds = [(min_lat - 0.01, min_lon - 0.01), (max_lat + 0.01, max_lon + 0.01)] return bounds def save_shape(G, filepath=None, encoding="utf-8"): """ シェープファイル保存 @param G: グラフオブジェクト @param filepath: 保存先のディレクトリ @param encoding: ファイルエンコーディング """ if filepath is None: filepath = os.path.join(ox.settings.data_folder, "graph_shapefile") if not os.path.exists(filepath): os.makedirs(filepath) filepath_nodes = os.path.join(filepath, "nodes.shp") filepath_edges = os.path.join(filepath, "edges.shp") gdf_nodes, gdf_edges = ox.convert.graph_to_gdfs(G) gdf_nodes = ox.io._stringify_nonnumeric_cols(gdf_nodes) gdf_edges = ox.io._stringify_nonnumeric_cols(gdf_edges) gdf_edges["fid"] = np.arange(0, gdf_edges.shape[0], dtype='int') gdf_nodes.to_file(filepath_nodes, encoding=encoding) gdf_edges.to_file(filepath_edges, encoding=encoding) def sort_coordinates(matched_edges, cpath_ids): """ 経路並び替え cpath_idsに基づいて経路を並び替える @param matched_edges: マッチングされた経路のGeoDataFrame @param cpath_ids: FMMのマッチ結果であるcpathのIDリスト @return: 並び替えられたGPS座標のリスト ([(lat, lon)]形式) """ ordered_coordinates = [] for fid in cpath_ids: edge = matched_edges[matched_edges['fid'] == fid] if not edge.empty: linestring = edge.iloc[0]['geometry'] coords = [(lat, lon) for lon, lat in list(linestring.coords)] ordered_coordinates.extend(coords) return ordered_coordinates def calculate_distance(coord1, coord2): """ GPS座標間距離計算 @param coord1: (lat, lon) 1つ目の座標 @param coord2: (lat, lon) 2つ目の座標 @return: 2点間の距離 (km) """ R = 6371 lat1, lon1 = coord1 lat2, lon2 = coord2 dlat = radians(lat2 - lat1) dlon = radians(lon2 - lon1) a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2 c = 2 * atan2(sqrt(a), sqrt(1 - a)) return R * c def total_distance(coordinates): """ 総走行距離計算 @param coordinates: GPS座標のリスト ([(lat, lon)]) @return: 総走行距離 (km) """ distance = 0.0 for i in range(1, len(coordinates)): distance += calculate_distance(coordinates[i - 1], coordinates[i]) return distance def upload_file(filepath, bucket_name, key): """ S3アップロード @param filepath: アップロードするファイルのパス @param bucket_name: S3バケット名 @param key: S3内でのファイルキー @return: アップロードされたファイルの公開URL """ s3_client = boto3.client('s3', region_name=AWS_REGION, aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY) try: s3_client.upload_file(filepath, bucket_name, key, ExtraArgs={'ContentType': 'text/html'}) url = f"https://{bucket_name}.s3.{AWS_REGION}.amazonaws.com/{key}" return url except Exception as e: logger.error(f"Error uploading to S3: {e}") return None def send_notification(project_uuid, edge, meas, distance_origin, distance_matched, map_url): """ Slack通知 @param project_uuid: プロジェクトUUID @param edge: エッジ情報 @param meas: 計測情報 @param distance_origin: 補正前の総走行距離 (km) @param distance_matched: 補正後の総走行距離 (km) @param map_url: 地図のURL (OpenStreetMap) @return: なし """ edge_uuid = edge['uuid'] edge_name = edge['name'] meas_uuid = meas['uuid'] jst = pytz.timezone('Asia/Tokyo') base_time = meas['basetime'].astimezone(jst) start_time = base_time.strftime('%Y/%m/%d %H:%M:%S') duration = meas['duration'] / 1_000_000 end_time = (base_time + datetime.timedelta(seconds=duration)).strftime('%Y/%m/%d %H:%M:%S') message = { 'attachments': [ { 'color': '#00bfff', 'author_name': 'Fast Map Matching', "author_icon": "YOUR_ICON", 'title': "GPSデータを補正しました", "fields": [ { "title": "エッジ", "value": f"<https://example.intdash.jp/console/admin/edges/{edge_uuid}|{edge_name}>" }, { "title": "計測", "value": f"<https://example.intdash.jp/console/measurements/{meas_uuid}/?projectUuid={project_uuid}|{start_time} - {end_time}>" }, { "title": "総走行距離 補正前", "value": f"{distance_origin:.2f} km" }, { "title": "総走行距離 補正後", "value": f"{distance_matched:.2f} km" }, { "title": "走行ルート", "value": f"<{map_url}|OpenStreetMap>" } ], "footer": "計測したら即分析!WebhookでGPS補正を起動してみた", "footer_icon": "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTkbv74KdmChO7FenKcskkqIZTIYEMjJSisLjZVk5_O7pe-QKEBb1Kntdx-grb7dvdsNDs&usqp=CAU", 'mrkdwn_in': ['text', 'fields'] } ] } try: response = requests.post( SLACK_WEBHOOK_URL, data=json.dumps(message), headers={'Content-Type': 'application/json'} ) if response.status_code != 200: logger.error(f"Failed to send Slack notification. Status Code: {response.status_code}, Response: {response.text}") logger.info("Slack notification sent successfully.") except Exception as e: logger.error(f"Exception occurred while sending Slack notification: {e}") def lambda_handler(event, context): logger.info(f"Received event: {event}") try: ## 入力 # GPSデータ読み込み project_uuid = event.get('project_uuid') meas_uuid = event.get('measurement_uuid') coordinates = get_gnss(project_uuid, meas_uuid) logger.debug(f"Coordinates: {coordinates}") if not coordinates: return { 'statusCode': 200, 'body': json.dumps({'message': 'No coordinates data found'}) } fmm_coordinates = [[lon, lat] for lat, lon in coordinates] # fmmは[[lon, lat]]形式 # OSMデータの取得とシェープファイルへの保存 bounds = find_bounds(coordinates) (min_lat, min_lon), (max_lat, max_lon) = bounds boundary_polygon = Polygon([(min_lon, min_lat), (max_lon, min_lat), (max_lon, max_lat), (min_lon, max_lat)]) G = ox.graph_from_polygon(boundary_polygon, network_type='drive') save_shape(G, filepath=f'{data_path}') # 道路ネットワークデータの読み込み edges = gpd.read_file(f"{data_path}/edges.shp") # 経路: 道路ネットワークの2つのノード(交差点など)を結ぶ線 network = fmm.Network(f"{data_path}/edges.shp", "fid", "u", "v") graph = fmm.NetworkGraph(network) logger.info(f"Loaded {network.get_edge_count()} edges and {network.get_node_count()} nodes from shapefiles.") ## 地図マッチング # UBODTの生成・読み込み ubodt_gen = fmm.UBODTGenAlgorithm(network, graph) ubodt_gen.generate_ubodt(f"{data_path}/ubodt.txt", 0.02, binary=False, use_omp=True) ubodt = fmm.UBODT.read_ubodt_csv(f"{data_path}/ubodt.txt") # FMMモデルマッチング model = fmm.FastMapMatch(network, graph, ubodt) coordinates_str_list = [f"{lon} {lat}" for lon, lat in fmm_coordinates] wkt = f"LINESTRING({','.join(coordinates_str_list)})" fmm_config = fmm.FastMapMatchConfig(5, 20, 15) # 候補数, 探索半径(m), GPS誤差(m) result = model.match_wkt(wkt, fmm_config) logger.info(f"Matched {len(result.cpath)} edges.") ## 出力 # 地図作成 m = folium.Map(location=[np.mean([bounds[0][0], bounds[1][0]]), np.mean([bounds[0][1], bounds[1][1]])], zoom_start=12) # 補正前GPSデータをプロット folium.PolyLine(coordinates, color='deepskyblue', weight=8, opacity=0.5, popup='Original Route').add_to(m) folium.Marker(location=coordinates[0], popup='Start', icon=folium.Icon(color='lightred', icon='play', prefix='fa')).add_to(m) folium.Marker(location=coordinates[-1], popup='Goal', icon=folium.Icon(color='lightred', icon='flag', prefix='fa')).add_to(m) # 補正後のルートをプロット matched_edges = edges[edges.fid.isin(result.cpath)] matched_coordinates = sort_coordinates(matched_edges, result.cpath) folium.PolyLine(matched_coordinates, color='deeppink', weight=7, opacity=0.6, popup='Matched Route').add_to(m) # マップ保存・S3アップロード map_file = f"map_{meas_uuid}.html" map_path = f"{data_path}/{map_file}" m.save(map_file) map_url = upload_file(map_file, BUCKET_NAME, map_file) if not map_url: logger.error("Map upload failed.") logger.info(f"Map uploaded to S3: {map_url}") # 総走行距離を計算 total_distance_origin = total_distance(coordinates) total_distance_matched = total_distance(matched_coordinates) logger.info(f"補正前の総走行距離: {total_distance_origin:.2f} km") logger.info(f"補正後の総走行距離: {total_distance_matched:.2f} km") # Slack通知 meas = get_meas(project_uuid, meas_uuid) edge = get_edge(meas['edge_uuid']) send_notification(project_uuid, edge, meas, total_distance_origin, total_distance_matched, map_url) return { 'statusCode': 200, 'body': json.dumps({'message': 'FMM completed and Slack notification sent'}) } except Exception as e: logger.error(f"Error processing event: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } if __name__ == "__main__": # ローカルテスト用 if len(sys.argv) > 1: with open(sys.argv[1], 'r') as f: event = json.load(f) lambda_handler(event, None) else: print("Usage: python3 app.py <event.json>")
GPS補正Lambda bootstrapシェル
#!/bin/bash # ------------------------------------------------------------------- # bootstrap # ------------------------------------------------------------------- # カスタムランタイム # # コンテナ起動時に実行 # LambdaランタイムAPIからイベントリクエスト取得 # リクエストイベントを引数にlambda_handler起動 # 結果をLambdaランタイムAPIに登録 # ------------------------------------------------------------------- set -euo pipefail # エラー発生時にスクリプトを終了し、未定義変数をエラーにする while true; do # LambdaランタイムAPIからイベントリクエスト取得 RESPONSE=$(curl -i -s "http://${AWS_LAMBDA_RUNTIME_API}/2018-06-01/runtime/invocation/next") HEADERS=$(echo "$RESPONSE" | sed -n '/^\r$/q;p') REQUEST_ID=$(echo "$HEADERS" | grep -Fi 'Lambda-Runtime-Aws-Request-Id' | awk '{print $2}' | tr -d '\r') if [[ -z "$REQUEST_ID" ]]; then echo "Error: REQUEST_ID is empty" exit 1 fi EVENT_DATA=$(echo "$RESPONSE" | sed '1,/^\r$/d') echo "REQUEST_ID: $REQUEST_ID EVENT_DATA: $EVENT_DATA" # lambda_handler起動 RESPONSE=$(python3 -c " import json from app import lambda_handler event = json.loads('''$EVENT_DATA''') response = lambda_handler(event, None) print(json.dumps(response)) ") echo "RESPONSE: $RESPONSE" if [[ -z "$RESPONSE" ]]; then echo "Error: RESPONSE is empty. Request ID: $REQUEST_ID" exit 1 fi # 結果をLambdaランタイムAPIに登録 curl -s -X POST "http://${AWS_LAMBDA_RUNTIME_API}/2018-06-01/runtime/invocation/${REQUEST_ID}/response" \ -d "$RESPONSE" done
GPS補正Lambda コンテナDockerfile
# ------------------------------------------------------------------- # ベースイメージ: Ubuntu 22.04 LTS # ------------------------------------------------------------------- FROM ubuntu:22.04 # ------------------------------------------------------------------- # 基本ツールのインストール # - ビルドツール、CMake、Git、Python、npm ... # ------------------------------------------------------------------- RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ cmake \ g++ \ git \ vim \ python3=3.10.6-1~22.04.1 \ python3-dev=3.10.6-1~22.04.1 \ python3-pip=22.0.2+dfsg-1ubuntu0.4 \ wget \ curl \ libgdal-dev=3.4.1+dfsg-1build4 \ libboost-all-dev=1.74.0.3ubuntu7 \ libosmium2-dev=2.18.0-1 \ swig \ expat \ bzip2 \ default-jre=2:1.11-72build2 \ npm \ jq \ && rm -rf /var/lib/apt/lists/* \ && apt-get clean # ------------------------------------------------------------------- # Python3 シンボリックリンク設定 # - Python3 と PIP3 をデフォルトに設定 # ------------------------------------------------------------------- RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 \ && update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 # ------------------------------------------------------------------- # FMMに必要なPython パッケージインストール # - Numpy、Pandas、Osmnx、Shapely .. # ------------------------------------------------------------------- RUN pip install --upgrade pip \ && pip install --no-cache-dir \ numpy==1.26.4 \ pandas==2.2.2 \ osmnx==1.9.4 \ folium==0.17.0 \ shapely==2.0.6 \ polars==1.9.0 # ------------------------------------------------------------------- # FMM のクローンとビルド # - GitHub から FMM をクローンし、ビルドしてインストール # ------------------------------------------------------------------- RUN git clone https://github.com/cyang-kth/fmm.git /home/fmm WORKDIR /home/fmm/build RUN cmake .. \ && make -j"$(nproc)" \ && make install # ------------------------------------------------------------------- # intdash SDK のインストール準備 # - Node.js 設定と npm パッケージのインストール # - openapi-generator-cli のインストール # ------------------------------------------------------------------- WORKDIR /home RUN npm install -g n@10.0.0 \ && n 20.18.0 \ && hash -r \ && npm install --save @openapitools/openapi-generator-cli # ------------------------------------------------------------------- # intdash SDK インストール # - API クライアントコードを生成、Python パッケージとしてインストール # - コンテナで起動するため、dist-packagesにコピー # ------------------------------------------------------------------- ENV VERSION=v2.7.0 RUN ./node_modules/.bin/openapi-generator-cli version-manager set 6.1.0 \ && ./node_modules/.bin/openapi-generator-cli generate \ -g python -i https://docs.intdash.jp/api/intdash-api/${VERSION}/openapi_public.yaml \ --package-name=apiclient RUN cp -r /home/apiclient /usr/local/lib/python3.10/dist-packages/ # ------------------------------------------------------------------- # アプリに必要なPython パッケージをインストール # - 地図、AWS SDK for Python、HTTP # ------------------------------------------------------------------- ENV PACKAGE_PREFIX=/var/task RUN pip install --no-cache-dir \ folium==0.17.0 \ boto3==1.35.32 \ requests==2.32.3 # ------------------------------------------------------------------- # ローカルファイルのコピー # - アプリケーションファイル # - ローカルテスト用ファイル # ------------------------------------------------------------------- COPY bootstrap ${PACKAGE_PREFIX}/bootstrap RUN chmod +x /var/task/bootstrap COPY app.py ${PACKAGE_PREFIX}/app.py COPY event.json /home/event.json # ------------------------------------------------------------------- # 環境変数の設定 # - GDAL、GEOS、およびその他の必要なライブラリのパスを設定 # ------------------------------------------------------------------- ENV \ GDAL_DATA=${PACKAGE_PREFIX}/share/gdal \ PROJ_LIB=/usr/share/proj/ \ GDAL_CONFIG=${PACKAGE_PREFIX}/bin/gdal-config \ GEOS_CONFIG=${PACKAGE_PREFIX}/bin/geos-config \ PATH=${PACKAGE_PREFIX}/bin:$PATH \ PYTHONPATH=${PACKAGE_PREFIX} # ------------------------------------------------------------------- # ENTRYPOINT、CMD、WORKDIR の設定 # - FMM実行時にcacheにJSONファイル作成のため、/tmp を親ディレクトリに設定 # ------------------------------------------------------------------- WORKDIR /tmp ENTRYPOINT ["/var/task/bootstrap"] CMD ["python3", "/var/task/app.py"]
レスポンス返却Lambda Pythonプログラム
import json import hmac import hashlib import base64 import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) # 定数定義 SECRET_KEY = 'YOUR_SECRET_KEY' LAMBDA_CLIENT = boto3.client('lambda') def verify_hmac(secret, payload, received): """ HMAC検証 Args: secret (str): シークレットキー payload (str): ペイロード received (str): 受信したHMAC Returns: bool: 検証結果(True: 成功, False: 失敗) """ hmac_obj = hmac.new(secret.encode(), payload.encode(), hashlib.sha256) computed = base64.b64encode(hmac_obj.digest()).decode() return hmac.compare_digest(computed, received) def lambda_handler(event, context): """ エントリポイント Args: event (dict): イベント context (LambdaContext): コンテキスト Returns: dict: APIレスポンス """ headers = event.get('headers', {}) body = event.get('body', '{}') logger.info(f"Received event: {event}") body_dict = json.loads(body) payload = json.dumps(body_dict, separators=(',', ':')) # HMAC検証 signature = headers.get('x-intdash-signature-256', '') if not verify_hmac(SECRET_KEY, payload, signature): logger.warning("HMAC Verification Failed.") return { 'statusCode': 401, 'body': json.dumps({'message': 'HMAC verification failed'}) } # リソースタイプとアクションの判定 resource_type = body_dict.get('resource_type') action = body_dict.get('action') if resource_type != 'measurement' or action != 'completed': logger.info(f"Ignored: resource_type={resource_type}, action={action}") return { 'statusCode': 200, 'body': json.dumps({'message': 'Resource_type or action ignored'}) } # FMM Lambdaを非同期起動 try: response = LAMBDA_CLIENT.invoke( FunctionName='intdash-fmm', InvocationType='Event', Payload=json.dumps(body_dict) ) logger.info(f"FMM Lambda invoked successfully: {response}") except Exception as e: logger.error(f"Failed to invoke FMM Lambda: {e}") return { 'statusCode': 500, 'body': json.dumps({'message': 'Failed to invoke FMM Lambda'}) } return { 'statusCode': 200, 'body': json.dumps({'message': 'Webhook received and FMM Lambda invoked'}) }
- 完了はエッジ側からすべての計測データを受け取りきったときに自動で遷移します。終了はユーザー操作によりサーバー側で計測が閉じられたときの状態です。↩
- SlackのIncoming Webhookとはリクエストのフォーマットが異なるため、intdash Webhookは直接連携できません。↩
- サンプルプログラムを本記事の最後に掲載しています。↩
- レスポンスが返らない場合は通知が失敗したとみなされ、intdash次回起動時に再送されます。↩
- サンプルプログラムを本記事の最後に掲載しています。↩
- サンプルシェルを本記事の最後に掲載しています。↩
- サンプルファイルを本記事の最後に掲載しています。↩
- サンプルプログラムを本記事の最後に掲載しています。↩