intdashを活用したシステム開発

f:id:aptpod_tech-writer:20201223152430j:plain

こんにちは。ソリューションアーキテクトの尾澤です。

唐突ですが、いつも自分が呼吸している空気の二酸化炭素濃度を意識していますか?

温室効果ガス世界資料センターによると、2019年の世界の平均二酸化炭素濃度は410.5ppmだそうです(出典)。また、厚生労働省が定める建築物環境衛生管理基準では、室内の二酸化炭素濃度の基準を1000ppm以下としており、それを超えると倦怠感、頭痛、耳鳴り、息苦しさ等の症状がでてきて、視覚による疲労の度合いを測るフリッカー値も著しく低下すると言われています(出典)。

今年に入って多くの人がリモートワークや外出自粛などの影響を受けて室内で過ごす時間が増えています。気づかないうちにベストなパフォーマンスを出せない状態に陥っている可能性はないでしょうか?

aptpod Advent Calendar 2020 24日目の今回は、intdashを活用したシステム開発のイメージを掴んでいただくため、室内の二酸化炭素濃度に応じて換気を促す簡単な仕組みを作ってみようと思います。

開発ツール

intdashはもともと自動車の制御データの収集と可視化を目的として開発されたプロダクトであるため、CANデータの収集と可視化のための標準的な機能がAutomotive Proとしてパッケージ化されています。しかし、それ以外のユースケースでintdashを活用したい場合は、目的のデータを収集することも、収集したデータを利用することも、プロダクトの開発元である我々にお任せいただくしか方法がありませんでした。

アプトポッドでは近年、ユーザ自身がintdashをツールとして活用し、自由にデータを収集して分析処理を行なったりシステム化したりすることを可能にするため、intdashをとりまく開発ツールを整備しています。

今回はその中でも2020年9月30日にリリースした、データの収集を仲介する「intdash Edge Agent」と、収集したデータにアクセスする「intdash SDK for Python」を使用します。

www.aptpod.co.jp

tech.aptpod.co.jp

intdash Edge Agent

intdash Edge Agentは、データの自動再送や流量制御といったintdashクライアントの基本機能を提供するエッジデバイス用のソフトウェアです。ユーザは、データソースに応じたプラグインであるDevice Connectorを実装するだけで、様々なデバイスをintdashに接続することができます。

intdash SDK for Python

intdash SDK for Pythonは、intdashサーバに転送した時系列データや各種リソースにアクセスするためのPythonライブラリです。来年以降もJavaScriptやSwift、Golangなどの様々なプログラム言語に対応したライブラリを順次リリース予定です。ユーザはこれらのライブラリを使って、intdashを活用したシステムを簡単に構築することができます。

f:id:aptpod_tech-writer:20201223093049p:plain
今回のシステム構成

データ収集

ここから二酸化炭素濃度のデータを収集していきます。

intdash Edge Agentのインストール

intdash Edge Agentデベロッパーガイド

まずはapt-getコマンドでintdash Edge Agentをインストールします(2020年12月の現時点では、パッケージリソースは弊社のプライベートリポジトリに配置しており、誰でも取得できるわけではなくBASIC認証によってアクセス制限をかけています → BASIC認証によるアクセス制限はなくし、リポジトリを公開しています)。

$ curl -s --compressed -u USERNAME:PASSWORD "https://private-repository.aptpod.jp/intdash-edge/linux/raspbian/gpg" | sudo apt-key add -
$ echo "deb [arch=armhf] https://USERNAME:PASSWORD@private-repository.aptpod.jp/intdash-edge/linux/raspbian $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/intdash-edge.list
$ sudo apt-get update
$ sudo apt-get install intdash-edge

intdashサーバへの接続情報を環境変数として与えてintdash Edge Agentを起動します。

$ sudo \
    LD_LIBRARY_PATH=/opt/vm2m/lib \
    INTDASH_EDGE_UUID=XXXXXXXXXXXXXXXXXXXXXXXXXXX \
    INTDASH_EDGE_TOKEN=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX \
    INTDASH_EDGE_SERVER=dev.intdash.jp \
    ...
    /opt/vm2m/sbin/intdash-edge-manager -C /etc/opt/intdash/manager.conf

intdash Edge Agentが起動してintdashサーバに接続すると、Webコンソールで接続状況とストリーミングしているデータの概要を確認することができます。intdash Edge Agentはデフォルトでいくつかのシステムメトリクスを送信するように設定されています。

f:id:aptpod_tech-writer:20201223154919p:plain
intdash Edge Agentのデフォルト接続状態

Device Connectorの実装

今回のターゲットは二酸化炭素濃度です。さまざまな環境センサがありますが、ネット上に記事が多く、使いやすいpythonのライブラリも存在するCO2センサ「MH-Z19」を使用します。

f:id:aptpod_tech-writer:20201223000714p:plain
MH-Z19とRaspberry Pi

$ pip3 install mh-z19
$ python3
>>> import mh_z19
>>> mh_z19.read_all()
{'co2': 404, 'temperature': 14, 'TT': 54, 'SS': 0, 'UhUl': 35584}

pypi.org

このライブラリを使ってCO2センサ用Device Connectorを実装していきます。intdash Edge Agentにデータを渡す具体的な方法は、データペイロードを所定のフォーマットでFIFOに書き込むだけです。

#!/usr/bin/env -S python3 -B
# coding: utf-8

import signal
import time
import mh_z19
import LoggerFIFO

SIGNUM = 0

def signalHandler(signum, frame):
    global SIGNUM
    SIGNUM = signum

if __name__ == '__main__':

    signal.signal(signal.SIGINT, signalHandler)
    signal.signal(signal.SIGTERM, signalHandler)

    ch = 1
    logger_fifo = LoggerFIFO.LoggerFIFO('/var/run/intdash/', [ch])

    while SIGNUM == 0:
        data = mh_z19.read_all()

        now = logger_fifo.GetMonotonicTime_Nsec()
        logger_fifo.SendFloat64(ch, now, 'co2', float(data["co2"]))
        logger_fifo.SendFloat64(ch, now, 'temp', float(data["temperature"]))

        time.sleep(1)

Device Connectorの登録

用意したCO2センサ用Device Connectorを、intdash Edge Agentの設定ファイルに登録します。intdash Edge Agentは登録されたDevice Connectorを子プロセスとして実行してFIFO経由で受け取ったデータをintdashサーバに転送します。

...
  "loggers": [
    {
      "devicetype": "customized",
      "path": "/home/pi/intdash-edge/co2.py",
      "conf": "",
      "status": "/var/run/intdash/logger_001.stat",
      "connections": [
        {
          "channel": 1,
          "fifo_tx": "/var/run/intdash/logger_001.tx",
          "fifo_rx": "/var/run/intdash/logger_001.rx"
        }
      ],
      "details": {
        "plugin": "fifo",
        "plugin_with_process": true
      }
    },
...

CO2センサ用Decive Connectorを登録してからintdash Edge Agentを再起動して、先程のWebコンソールを確認すると、新しくデータが送信されていることを確認できます。

f:id:aptpod_tech-writer:20201223154943p:plain
CO2センサ用Device Connectorを登録したintdash Edge Agentの接続状態

データ可視化

intdashの標準ツールであるData Visualizerを使って、二酸化炭素濃度の推移を可視化してみました。

f:id:aptpod_tech-writer:20201223001534p:plain
閉め切ったときの二酸化炭素濃度の推移

上のスクリーンショットは、閉め切った6畳くらいの部屋でひとりでデスクに向かいながら計測した結果です。換気した直後の400ppmから1時間ちょっとで1000ppmを超えました。

f:id:aptpod_tech-writer:20201223003900p:plain
窓を開けたときの二酸化炭素濃度の推移

上のスクリーンショットは、1000ppmを超えた状態から窓をひとつだけ開けて換気しながら計測した結果です。二酸化炭素濃度が低下しはじめてから約10分で400ppmまで落ちました。

この結果から、狭い部屋では1時間に1回10分程度の換気をすることで、室内の二酸化炭素濃度を1000ppm以下に保つことができると言えるでしょう。

このように、Data Visualizerを使えばintdashに転送したすべてのデータをグラフやメーターなどの様々な視覚表現で手軽に可視化することができます。

www.aptpod.co.jp

データ処理

ここからはSDKを使ってデータをリアルタイムに処理してみましょう。

intdash SDK for Pythonのインストール

intdash SDKはPyPlからpipコマンドでインストールできます。

$ pip3 install intdash

pypi.org

SDKを使用したスクリプトの実装

intdash SDKを使って二酸化炭素濃度をリアルタイムで取得し、1000ppmを超えたら換気するようアラートし、450ppmを下回ったら換気できたことを知らせてくれる簡単なSlackのbotを作成します。

#!/usr/bin/env -S python3 -B
# coding: utf-8

import intdash
import json
import asyncio
import urllib.parse
import urllib.request

def post(text):
    try:
        req = urllib.request.Request(
            'https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXX',
            json.dumps({'text': text}).encode('utf-8'),
            {'Content-Type': 'application/json'}
        )
        with urllib.request.urlopen(req) as res:
            return res.read()
    except Exception as e:
        print(e)

def callback(unit):
    try:
        global alart_flg

        # Skip basetime.
        if unit.data.data_type.value == intdash.DataType.basetime.value:
            return

        if unit.data.data_id != 'co2':
            return

        if (unit.data.value > 1000):
            if not alart_flg:
                post('CO2濃度が基準値を超えました。換気をしましょう!')
            alart_flg = True
        elif (unit.data.value < 450):
            if alart_flg:
                post('CO2濃度が基準値をクリアしました。')
            alart_flg = False

    except:
        pass

def main():

    client = intdash.Client(
        url = "https://dev.intdash.jp",
        username = "bot",
        edge_token = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
    )

    src_edge = client.edges.list(name='raspi')[0]
    wsconn = client.connect_websocket()
    wsconn.open_downstreams(
        specs = [
            intdash.DownstreamSpec(src_edge_uuid = src_edge.uuid, filters = [
                intdash.DataFilter(data_type=intdash.DataType.float.value, data_id='co2', channel=1),
               ]),
        ],
        callbacks = [callback],
    )

    loop = asyncio.get_event_loop()
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(e)
    finally:
        print("the process cancelled...")
        wsconn.close()

if __name__ == '__main__':
    alart_flg = False
    main()

実際に、締め切ったままで作業を続けて、アラートにしたがって窓を開けて換気した際のSlack通知は以下のようになりました。

f:id:aptpod_tech-writer:20201223005601p:plain
Slackへの通知

このように、intdash SDKを使うことで、異常検知、他システムとの連携、リアルタイム加工など、収集したデータを使った様々なユースケースに対応することができます。

まとめ

intdashを活用したデータの収集と分析処理が、比較的簡単に少ない実装で実現できるようになっていることを実感いただけたでしょうか?

今回は、intdashを使った開発のイメージを掴むためだけに、分かりやすいテーマとして二酸化炭素濃度の測定というテーマを選びましたが、これはintdashの強みを存分に活かすユースケースとは言えません。1秒あたりの二酸化炭素濃度といった低頻度で小さいデータであれば、AWS IoTのような他社のプラットフォームでも十分に収集可能であり、intdashが持つ以下の特徴のどのメリットも享受できていないためです。

  • モバイル網などの不安定な通信環境でのデータ完全回収
  • 秒間数千発もの高頻度で大量なデータの低遅延データ転送
  • 複数のデータソースの時間軸をそろえたデータ収集

読者の皆さまが、上記のようなメリットを存分に活かしてご自身のDXを実現していただけるよう、共創パートナーとしてお手伝いできることを楽しみにしております。また、本記事がintdashの開発ツールを利用するための一助になれば幸いです。