Amazon Web Services ブログ

MQTTのLWTを利用してリアルタイムにAWS IoTに接続するデバイスの切断を検出する

この記事は Syed Rehan によって投稿された Monitor AWS IoT connections in near-real time using MQTT LWT を翻訳したものです。

コネクテッドデバイスでは、ほぼリアルタイムに機器を監視してエラーを検出し、アクションを軽減する必要がある場合がありますが MQTT の Last Will and Testament(LWT) 方式はこの課題に対応しています。 LWT は MQTT プロトコル仕様の標準的な方式で、デバイスの突然の切断を検出し、他のクライアントにそのことを通知することを可能にします。

IoTデバイスは、信頼性の低いネットワーク接続の環境で使用されることが多く、電源不足、バッテリー切れ、接続切れ、その他の理由によってデバイスが切断されることがあります。そのため、クライアントが強制的に切断したのか、本当に突然切断したのかがわからず、ブローカーから突然切断されてしまいます。そこで LWT では AWS IoT Core に接続する際にクライアントが認証情報と一緒にメッセージ (testament) を提供するようにしています。もしクライアントが後のある時点で突然切断した場合(停電など) AWS IoT Core に他のクライアントにメッセージ (testament) を配信させ、この突然の切断を知らせ るLWT メッセージを配信することができます。

MQTT Version 3.1.1 では MQTT メッセージの一部として LWT 機能を提供しており AWS IoT Core もサポートしていますので、突然切断するようなクライアントはブローカーへの接続時に MQTT トピックとともにその LWT メッセージを指定することができるようになっています。クライアントが突然切断するとブローカー (AWS IoT Core) は、そのクライアントが接続時に提供した LWT メッセージを、この LWT トピックをサブスクライブしているすべてのデバイスにパブリッシュすることになります。

MQTT LWT 機能により AWS IoT の接続をほぼリアルタイムで監視し、修復のためのアクションを講じることができるようになります。ステータスの確認、接続の復元、エッジベース(デバイス側)のアクションまたはクラウドベースのアクションのいずれかを実行して、このデバイスの突然の切断を調査および軽減することによって、突然の切断イベントに対応することができます。

このブログでは、以下の手順で説明します。

  1. ダミーデバイス lwtThing がKeep-alive時間を指定して AWS IoT Core に接続する
  2. lwtThing デバイスは AWS IoT Core への接続時に、以下を指定します
    1. LWT 用のトピック (例: /last/will/topic)
    2. LWT メッセージ
    3. QoS 0 または 1
  3. lwtThing デバイスが AWS IoT Core から突然切断されます
  4. AWS IoT Core はこれを検知し LWT メッセージをトピック (/last/will/topic) のすべてのサブスクライバにパブリッシュします
  5. AWS IoTのルールエンジンのルールがトピックへパブリッシュされたメッセージにより実行され Amazon Simple Notifications Service(SNS) を呼び出します
  6. Amazon SNSが通知メールを送信します

CloudFormation のテンプレートを使って仮想環境を設定し (AWS IoT ワークショップの設定方法を利用) 、仮想 IoT Thing lwtThing を実行して、物理デバイスの実機シミュレーションを行うことにします

アーキテクチャ

以下に提供するスクリプトを使用してエッジデバイスをシミュレートし LWT メッセージを送信して突然の切断を示し AWS IoTルールをトリガーし、その後 Amazon SNS を呼び出して E メールを送信します。

セットアップ

以下のワークショップのセットアップを使用して LWT を素早くブートストラップし、テストします。このリンクから AWS Cloud9 の環境を構築できます(最寄りのリージョンを選択してください)。

ワークショップの AWS CloudFormation テンプレートを使って環境をセットアップしたら AWS IoT Core(AWS MQTT broker on the cloud) を使って ungraceful disconnects をテストしてみましょう。

Cloud9 のターミナルを開き(こちら) Python SDK をセットアップします。

Cloud9 のターミナルウィンドウを使って IoT の接続に使用するフォルダを作成します。

mkdir -p /home/ubuntu/environment/lwt/certs
cd /home/ubuntu/environment/lwt/

Python IoT SDK のセットアップ方法については、こちらをご覧ください。

手順

git clone https://github.com/aws/aws-iot-device-sdk-python.git
cd aws-iot-device-sdk-python
python setup.py install

AWS IoT Thing をセットアップするには、こちらに記載されている手順に従います。

作成した証明書を Cloud9 インスタンスにアップロードし、そこから接続できるようにしましょう。

新しく作成した証明書と RootCA を以下のフォルダ(先ほど作成したフォルダ)にアップロードしてください。

/home/ubuntu/environment/lwt/certs

LWT メッセージ

Python のコードを Cloud9 にコピーして AWS IoT のシミュレーション対象として実行しましょう。

以下のコマンドをコピーしてください。

touch lwtTest.py

ファイルを開き、以下のコードをコピーして貼り付けてください。

'''
/*
 * # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * # SPDX-License-Identifier: MIT-0
 * 
 */


 '''
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import argparse
import json

AllowedActions = ['both', 'publish', 'subscribe']

# Custom MQTT message callback
def customCallback(client, userdata, message):
    print("Received a new message: ")
    print(message.payload)
    print("from topic: ")
    print(message.topic)
    print("--------------\n\n")

# LWT JSON payload
payload ={
  "state": {
    "reported": {
      "last_will": "yes",
      "trigger_action": "on",
      "client_id": "lwtThing"
        }
    }
}
 
# conversion to JSON done by dumps() function
jsonPayload = json.dumps(payload)
 
# printing the output
#print(jsonPayload)


# Read in command-line parameters
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
                    help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub",
                    help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="sdk/test/Python", help="Targeted topic")
parser.add_argument("-m", "--mode", action="store", dest="mode", default="both",
                    help="Operation modes: %s"%str(AllowedActions))
parser.add_argument("-M", "--message", action="store", dest="message", default="AWS IoT Thing connected message to IoT Core",
                    help="Message to publish")

args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
port = args.port
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic

if args.mode not in AllowedActions:
    parser.error("Unknown --mode option %s. Must be one of %s" % (args.mode, str(AllowedActions)))
    exit(2)

if args.useWebsocket and args.certificatePath and args.privateKeyPath:
    parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
    exit(2)

if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
    parser.error("Missing credentials for authentication.")
    exit(2)

# Port defaults
if args.useWebsocket and not args.port:  # When no port override for WebSocket, default to 443
    port = 443
if not args.useWebsocket and not args.port:  # When no port override for non-WebSocket, default to 8883
    port = 8883

# Configure logging - we will see messages on STDOUT
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

#########
# Will Topic
# Input parameters are: Topic, Last will message and finally QoS
myAWSIoTMQTTClient.configureLastWill('/last/will/topic', jsonPayload, 0)
#########


# Connect and subscribe to AWS IoT
# keep-alive connect parameter - setting 30s
myAWSIoTMQTTClient.connect(30) 
print("Connected!")
loopCount = 1
while loopCount < 2:
    if args.mode == 'both' or args.mode == 'publish':
        message = {}
        message['message'] = args.message
        messageJson = json.dumps(message)
        myAWSIoTMQTTClient.publish(topic, messageJson, 1)
        if args.mode == 'publish':
            print('Published topic %s: %s\n' % (topic, messageJson))
            loopCount +=1
#lets put the device to sleep so it creates disconnect after 60s
print("--- Putting device to sleep now, so IoT core keep-alive time expires. ---")
print("--- We will abruptly disconnect the device after 60seconds. ---")
time.sleep(60) 

LWT トピック、 JSON ペイロード、および使用する QoS のレベルを設定するためのすべての作業を行っている次の行を見てみましょう。

myAWSIoTMQTTClient.configureLastWill('/last/will/topic', jsonPayload, 0)
  • トピックは /last/will/topic
  • QoS は 0
  • jsonPayload の変数には以下の内容が格納されています
{
  "state": {
    "reported": {
      "last_will": "yes",
      "trigger_action": "on",
      "client_id": "lwtThing"
        }
    }
}

上記の設定は LWT で利用するトピック、送信するメッセージを定義しており、デバイスが突然切断されると LWT メッセージが送信され AWS IoT ルールによって実行されます (“Last Will” メッセージはクライアントとの接続が不意に切れたときにサーバーから発行されます) 。AWS IoT ルールでは Amazon SNS アクションをトリガーし Amazon SNS よりメールを送信します。その他のオプションについては、SDK のドキュメントに詳しく記載されていますのでご覧ください。

AWS IoT Core への接続時に keep-alive を 30 秒に設定すると、指定された時間だけセッションを維持するようにします。タイムアウトが発生すると、セッションは失効します。

60秒が経過するとデバイスは切断されたと判断され、AWS IoT Core から Last Will Testament(LWT) トリガーが発生し、この LWT トピックをサブスクライブしているすべてのトピックサブスクライバにメッセージが公開されます。

Amazon SNSのセットアップ

Amazon SNS では通知を受け取るとそのメッセージをメールとして送信するように設定しましょう。Amazon SNS のコンソールから、次のようにします。

  • Topicsを選択します
    • Create Topicを選択
      • Standard を選択
      • Name を入力(例: lwtSNSTopic)
      • Display name を入力(例: lwtSNSTopic)
      • Create topic を選択
    • トピックが作成されたら
      • Create subscription を選択
      • プロトコルのドロップダウンでは Email を選択
      • Endpoint には受信したいメールアドレスを指定
      • Create subscription を選択

登録が終わると、メールが届くはずです。メールの本文に書かれている subscription の確認をお願いします。 subscription を確認されていない場合は、メールを受信することができません。

AWS IoT Core のルールをセットアップ

AWS IoT Core のコンソールから以下を設定します。

  • Message Routing を選択
  • Rules を選択
  • Create Rule を選択
  • Rule name に lastWillRule と入力し、 description に My first LWT rule を入力
  • SQL statement には以下の SQL を入力します
SELECT * FROM '/last/will/topic'
where state.reported.last_will = 'yes' and state.reported.trigger_action = 'on'
  • Next で次に進みます
  • Action で Simple Notification Service(SNS) を選択します
  • SNS topic では、先ほど作成した Simple Notification Service(SNS) のトピックを選択します
  • Message format では JSON を選択します
  • IAM role では Create new role を選択します
    • Role では lwtRuleRole のような名前を入力します

ここでもう一つアクションを追加して、受信した LWT メッセージを別のトピックに再パブリッシュして、その受信を確認することにしましょう。

  • Add rule action を選択します
  • Republish a message to an AWS IoT topic を選択します
  • Topic には /lwt/executed と入力します
  • QoS はデフォルトのままにします
  • IAM role では先程作成した lwtRuleRole を選択します
  • 最後に Next を選択し、確認画面で Create を選択して作成します

以上でルールの設定は終了です。次に LWT メッセージの送信設定と、設定の実行を行います。

LWT メッセージの送信

python コードを使用してシミュレートしたデバイスを実行する前に AWS IoT Core コンソールでトピックをサブスクライブしてみましょう。


これで準備が整ったので IoT Thing を実行してみましょう。サンプルの実行コマンドを使用しますが thingID や、証明書のパスが異なる場合がありますので、その場合は自分の環境に合わせて修正してください。
コマンド (xxx は自分の環境に合わせて修正してください)

python lwtTest.py -e xxxxxxxxxxxxxx-ats.iot.us-east-1.amazonaws.com \
-r /home/ubuntu/environment/lwt/certs/AmazonRootCA1.pem \
-c /home/ubuntu/environment/lwt/certs/xxxxxxxxxxxxxxxxxxxxxxxxxxxx-certificate.pem.crt \
-k /home/ubuntu/environment/lwt/certs/xxxxxxxxxxxxxxxxxxxxxxxxxxxx-private.pem.key \
-id lwtThing \
-t /lwt/connected/topic \
-m publish

入力パラメータとしてコードに渡しているものは、以下の通りです。

  • -e は AWS IoT Core のエンドポイントを指定しています
  • -r は Amazon Root CA の証明書のパスを指定しています
  • -c は Thing の証明書のパスを指定しています
  • -k は Thing の秘密鍵のパスを指定しています
  • -id は AWS IoT Core と接続する際に利用するクライアント ID を指定しています (この値はAWS IoT Core で Thingを作った際の Thing 名と一致している必要があります)
  • -t AWS IoT Core のメッセージを送信するトピックを指定しています
  • -m は モードを設定しており、ここでは publish のみを行う等にしています (mode では publish, subscribe, both が指定できます)

コマンドの実行を見てみましょう。 LWT が設定され AWS IoT Core にどのようなメッセージを発行したかが確認できるはずです。また、60 秒後に abrupt disconnect が表示されていることがわかります。

AWS IoT Core のコンソールに切り替えて、受信メッセージを確認するには、以下のトピックをサブスクライブしてください。ルール実

  • 行時にメッセージの republish に使用するトピック(デバッグとして使用):/lwt/executed
  • クライアントの正常ではない切断時に LWT メッセージを発行する際に使用するトピック:/last/will/topic
  • トピック /lwt/connected/topic では、その Thing がパブリッシュしたメッセージを見ることができます。これは、クライアントが AWS IoT Core に接続し、ブローカーにメッセージを送信したときに発生します。

トピック /last/will/topic の下に、デバイスが正しく切断され無かった場合に AWS IoT Core が実行するメッセージが表示されます。

AWS IoT ルールが LWT に対して実行されると /lwt/executed にメッセージがパブリッシュされることがわかります。先ほど、デバイスの突然の切断時にAWS IoTのルールが実行され、このトピックへ再送信するように構成しています。

AWS IoT ルールの実行に成功すると Amazon SNS のメール通知もトリガーされ、先に正しく設定されていれば、受信トレイに同様のメールが表示されます。

まとめ

このブログでは AWS IoT Core を使って、デバイスのエラーや障害、突然の切断を検出し、突然の切断時に Amazon SNS メール通知をトリガーして、サポートチームが迅速に調査して障害を軽減し、問題を大きく解決する方法について見てきました。 AWS IoT Core は適切または推奨される方法で接続を終了すると接続時に設定した LWT を無視します。 LWT を利用することで、クライアントの接続が切断され、その接続チェーンに他のクライアントが依存している場合のエラー処理シナリオを多く実装することができます。例えば、工場内のセンサーデータを収集する産業用ゲートウェイが AWS IoT Core から突然切断された場合、それらの切断を監視し、下流への二次的な影響を減らすための修復のためのアクションを取ることができます。 MQTTSNS については、リンク先で詳しく解説しています。

著者

Syed Rehan は、Amazon Web Services のグローバル Sr. スペシャリストソリューションアーキテクトで、ロンドンを拠点としています。IoT ソリューションアーキテクトとして、世界中のお客様をサポートしています。IoT とクラウドに関する深い知識を持ち、スタートアップから企業まで幅広い顧客に対して、AWS エコシステムによる IoT ソリューションの構築を支援する役割を担っています。

 

 

この記事はプロトタイピング ソリューション アーキテクトの市川が翻訳しました。