Amazon Web Services ブログ

AWS IoT Core でProtocol Buffers を使用してスマートアプリケーションを構築する方法

Protocol Buffers の概要

Protocol Buffers (Protobuf)は、構造化データをシリアル化するためのプラットフォームに依存しないアプローチを提供します。Protobuf は JSON に似ていますが、軽量で処理が速く、お好みのプログラミング言語でバインディングを自動的に生成できる点が異なります。

AWS IoT Core は、何十億もの IoT デバイスを接続し、何兆ものメッセージを AWS サービスにルーティングできるマネージドサービスです。これにより、アプリケーションを何百万ものデバイスにシームレスにスケーリングできます。AWS IoT Core と Protobuf の統合により、Protobuf の無駄のないデータシリアル化プロトコルと自動コードバインディング生成のメリットも得られます。

Protobuf コード生成による IoT のアジリティとセキュリティ

Protobuf を利用する主な利点は、Protobuf によるコード生成を使用したソフトウェア開発の容易さと安全性です。アプリケーションのコンポーネント間で交換されるメッセージのスキーマの記述ができます。protoc などのコード生成コンパイラが記述されたスキーマを解釈し、選択したプログラミング言語でのエンコードとデコード機能を実装します。Protobuf によるコード生成はメンテナンスが行き届いていて広く使用されているため、堅牢で実地試験済みのコードになっています。

自動コード生成により、開発者はエンコード関数やデコード関数を記述する必要がなくなり、プログラミング言語間の互換性が保証されます。AWS IoT Core のルールエンジンによる Protocol Buffer メッセージング形式のサポートが新たに開始されたことに伴い、C 言語 で記述されたプロデューサーアプリケーションをデバイス上で実行し、AWS Lambda 関数コンシューマーを Python で作成できるようになりました。これらはすべて生成されたバインディングを使用します。

AWS IoT Core で JSON よりも Protocol Buffers を使用するその他の利点は次のとおりです

  • スキーマと検証: スキーマは送信者と受信者の両方によって適用され、適切な統合が確実に実現されます。メッセージは自動生成コードによってエンコードおよびデコードされるため、バグは排除されます。
  • 適応性: スキーマは変更可能で、後方互換性と上位互換性を維持しながらメッセージコンテンツを変更できます。
  • 帯域幅の最適化: 同じコンテンツでも、Protobuf を使用すると、ヘッダーではなくデータのみを送信するため、メッセージの長さが短くなります。時間が経つにつれて、デバイスの自律性が向上し、帯域幅の使用量が少なくなります。メッセージングプロトコルとシリアル化形式に関する最近の調査では、Protobuf 形式のメッセージは、同等の JSON 形式のメッセージよりも最大 10 倍小さいことが明らかになりました。つまり、同じコンテンツを伝送するためにネットワークを経由するバイト数が少なくなるということです。
  • 効率的なデコード: Protobuf メッセージのデコードは JSON のデコードよりも効率的です。つまり、受信関数の実行時間が短くなります。Auth0 が実施したベンチマークでは、Protobuf は同等のメッセージペイロードで JSON よりも最大 6 倍もパフォーマンスが高いことが明らかになりました。

このブログ記事では、Protobuf 形式を使用して AWS IoT Core にメッセージを公開するサンプルアプリケーションをデプロイする方法について説明します。その後、メッセージは AWS IoT Core ルールエンジンのルールによって選択的にフィルタリングされます。

Protobuf の基本をおさらいしてみましょう。

Protocol Buffers の概要

メッセージスキーマは Protobuf の重要な要素です。スキーマは次のようになります。

syntax = "proto3";

import "google/protobuf/timestamp.proto";

message Telemetry
{
  enum MsgType
  {
    MSGTYPE_NORMAL = 0;
    MSGTYPE_ALERT = 1;
  }
  MsgType msgType = 1;
  string instrumentTag = 2;
  google.protobuf.Timestamp timestamp = 3;
  double value = 4;
}
C-like

スキーマの最初の行は、使用している Protocol Buffers のバージョンを定義します。この記事では proto3 バージョンの構文を使用しますが、proto2 もサポートされています。

次の行は、Telemetry という新しいメッセージ定義が記述されることを示しています。

特にこのメッセージには、次の 4 つの異なるフィールドがあります。

  • msgType フィールド: MsgType 型で、「MSGTYPE_NORMAL」または「MSGTYPE_ALERT」の列挙子のみを指定可能
  • instrumentTag フィールド: string 型で、テレメトリデータを送信する計測器を識別
  • timestamp フィールド: google.protobuf.Timestamp 型で測定の時刻を示す
  • value フィールド: double 型で計測値を示す

有効なすべてのデータ型と構文に関する追加情報については、こちらのドキュメントを参照してください。

JSON 形式で記載された Telemetry メッセージは次のようになります。

{
  "msgType": "MSGTYPE_ALERT",
  "instrumentTag": "Temperature-001",
  "timestamp": 1676059669,
  "value": 72.5
}
JSON

Protocol Buffers (表示用に base64 でエンコード) を使用した同じメッセージは、次のようになります。

0801120F54656D70657261747572652D3030311A060895C89A9F06210000000000205240

メッセージの JSON 表現は 115 バイトであるのに対し、Protobuf では 36 バイトしかないことに注意してください。

スキーマが定義されると、protoc を使用して次のことができます。

  1. お好みのプログラミング言語でのバインディングの作成
  2. AWS IoT Core が受信したメッセージをデコードするために使用する FileDescriptorSet の作成

AWS IoT Core での Protocol Buffers の利用

Protobuf は AWS IoT Core でさまざまな方法で使用できます。最も簡単な方法は、メッセージをバイナリペイロードとして公開し、受信側のアプリケーションにデコードさせることです。これは AWS IoT Core ルールエンジンですでにサポートされており、Protobuf だけでなく、あらゆるバイナリペイロードで機能します。

ただし、Protobuf メッセージをデコードしてフィルタリングや転送を行う場合は、最大のメリットが得られます。フィルターされたメッセージは Protobuf として転送することも、JSON 形式しか認識しないアプリケーションとの互換性のために JSON にデコードすることもできます。

AWS IoT Core ルールエンジンが Protocol Buffers メッセージング形式をサポートしたため、マネージドな機能を利用することでこれを実現できます。これより先のセクションでは、サンプルアプリケーションのデプロイと実行について説明します。

前提条件

こちらのサンプルアプリケーションを実行するには、次の条件を満たす必要があります。

  • protoc がインストールされたコンピュータ。公式インストール手順を参照してください。
  • AWS CLI のインストール。インストール手順はこちらを参照してください。
  • AWS アカウントと、Amazon S3、AWS IAM、AWS IoT Core、AWS CloudFormation に対するフルアクセス権限を持つ有効な認証情報
  • Python 3.7 以降のバージョン

サンプルアプリケーション:Protobuf メッセージを JSON としてフィルタリングして転送

サンプルアプリケーションをデプロイして実行するには、次の 7 つの簡単なステップを実行します。

  1. サンプルコードをダウンロードして Python の必要なパッケージをインストール
  2. IOT_ENDPOINTAWS_REGION の環境変数を設定
  3. protoc を使用して Python バインディングとメッセージ記述子を生成
  4. Python と Protobuf で生成されたコードバインディングを使用してシミュレートされたデバイスを実行
  5. AWS CloudFormation を使用して AWS リソースを作成し、Protobuf 記述子ファイルをアップロード
  6. Protobuf メッセージを照合、フィルタリング、JSON として再公開する AWS IoT ルールを確認
  7. 変換されたメッセージが再公開されていることを確認

Step 1:サンプルコードのダウンロードと Python の必要なパッケージのインストール

サンプルアプリケーションを実行するには、コードをダウンロードして、その依存関係をインストールする必要があります。

  • まず、AWS github リポジトリからサンプルアプリケーションをダウンロードして抽出します https://github.com/aws-samples/aws-iotcore-protobuf-sample
  • ZIP ファイルとしてダウンロードした場合は、解凍してください
  • 必要な Python の依存関係をインストールするには、抽出したサンプルアプリケーションのフォルダー内で以下のコマンドを実行します。
pip install -r requirements.txt
Bash

上記のコマンドを実行すると、boto3 (Python 用 AWS SDK) と protobuf という 2 つの必要な Python 依存関係がインストールされます。

Step 2:IOT_ENDPOINTAWS_REGION の環境変数の設定

シミュレートされた IoT デバイスは AWS IoT Core エンドポイントに接続して Protobuf 形式のメッセージを送信します。

Linux または Mac を実行している場合は、次のコマンドを実行します。 を必ず選択した AWS リージョンに置き換えてください。

export AWS_REGION=<AWS_REGION>
export IOT_ENDPOINT=$(aws iot describe-endpoint --endpoint-type iot:Data-ATS --query endpointAddress --region $AWS_REGION --output text)
Bash

Step 3:protoc を使用して Python バインディングとメッセージ記述子を生成

抽出されたサンプルアプリケーションには、前に示したスキーマの例に似た msg.proto という名前のファイルが含まれています。 以下のコマンドを実行して、シミュレートされたデバイスが記述子ファイルの生成に使用するコードバインディングを生成します。

protoc --python_out=. msg.proto
protoc --include_imports -o filedescriptor.desc msg.proto
Bash

これらのコマンドを実施すると、次の2つのファイルが確認できます。

filedescriptor.desc msg_pb2.py

Step 4:Python と Protobuf で生成されたコードバインディングを使用して、シミュレートされたデバイスを実行

抽出されたサンプルアプリケーションには、simulate_device.py という名前のファイルが含まれています。

シミュレートされたデバイスを起動するには、以下のコマンドを実行します。

python3 simulate_device.py
Bash

AWS コンソールの MQTT テストクライアントを使用して、メッセージが AWS IoT Core に送信されていることを確認します。

  1. AWS IoT Core サービスコンソール (https://console.aws.amazon.com/iot) にアクセスします。正しい AWS リージョンにいることを確認してください。
  2. [テスト] で、[MQTT テストクライアント] を選択します。
  3. トピックのフィルターに「test/telemetry_all」と入力します。
  4. [追加設定] セクションを展開し、[MQTT ペイロード表示] で [未加工のペイロードを表示 (バイナリデータを 16 進値として表示)] を選択します。
  5. [サブスクライブ] をクリックして、Protobuf 形式のメッセージが AWS IoT Core MQTT ブローカーに届くのを確認します。

 

Step 5:AWS CloudFormation を使用して AWS リソースを作成し、Protobuf 記述子ファイルをアップロード

抽出されたサンプルアプリケーションには、support-infrastructure-template.yaml という名前の AWS CloudFormation テンプレートが含まれています。

このテンプレートは、Amazon S3 バケット、AWS IAM ロール、および AWS IoT ルールを定義します。

次のコマンドを実行して、CloudFormation テンプレートを AWS アカウントにデプロイします。必ず、<YOUR_BUCKET_NAME><AWS_REGION> を S3 バケットと選択した AWS リージョンの名前に置き換えてください。

aws cloudformation create-stack --stack-name IotBlogPostSample \
--template-body file://support-infrastructure-template.yaml \
--capabilities CAPABILITY_NAMED_IAM \
--parameters ParameterKey=FileDescriptorBucketName,ParameterValue=<YOUR_BUCKET_NAME> \
--region=<AWS_REGION>
Bash

AWS IoT Core が Protobuf 形式のメッセージをサポートするには、protoc で生成した記述子ファイルが必要です。使用できるようにするために、作成した S3 バケットにアップロードします。以下のコマンドを実行して、記述子ファイルをアップロードします。 <YOUR_BUCKET_NAME> の値を CloudFormation テンプレートをデプロイするときに選択したのと同じ名前に置き換えてください。

aws s3 cp filedescriptor.desc s3://`<YOUR_BUCKET_NAME>`/msg/filedescriptor.desc

Step 6: Protobuf メッセージを照合、フィルタリング、JSON として再公開する AWS IoT ルールを確認

こちらのBlogでは、危険な操作条件がある可能性があることを示している MSGTYPE_ALERTmsgType を持つメッセージをフィルタリングすることを考えます。CloudFormation テンプレートは、仮想デバイスが AWS IoT Core に送信している Protobuf 形式のメッセージをデコードする AWS IoT ルールを作成します。次に、アラートとなるメッセージを選択し、別の MQTT トピックとしてサブスクライブできるように JSON 形式で再公開 (再度パブリッシュ) する AWS IoT ルールを作成します。AWS IoT ルールを確認するには、次の手順を実行します。

  1. AWS IoT Core サービスコンソールにアクセスする: https://console.aws.amazon.com/iot
  2. 左側のメニューの [メッセージのルーティング] で、[ルール] をクリックします。
  3. ルールのリストには ProtobufAlertRule という名前の AWS IoT ルールが含まれています。クリックすると詳細が表示されます。
  4. SQL ステートメントの下にある、下記 SQL ステートメントの記述を確認してください。各要素の意味については後ほど説明します。
  5. [アクション] に AWS IoT トピックに再公開するアクションが一つあることを確認してください。
SELECT
  VALUE decode(encode(*, 'base64'), "proto", "<YOUR_BUCKET_NAME>", "msg/filedescriptor.desc", "msg", "Telemetry")
FROM
  'test/telemetry_all'
WHERE
  decode(encode(*, 'base64'), "proto", "<YOUR_BUCKET_NAME>", "msg/filedescriptor.desc", "msg", "Telemetry").msgType = 'MSGTYPE_ALERT'
SQL
 

この SQL ステートメントは次の処理を行います。

SELECT VALUE decode(...) は、デコードされた Protobuf ペイロード全体が JSON ペイロードとして送信先の AWS IoT トピックに再公開されることを示します。メッセージを Protobuf 形式のまま転送したい場合は、これを単純な SELECT * に置き換えることができます WHERE decode(...).msgType = 'MSGTYPE_ALERT' は受信した Protobuf フォーマットのメッセージをデコードし、msgType の値が MSGTYPE_ALERT となっているメッセージのみを転送します

Step 7:変換されたメッセージが再公開されていることを確認

この AWS IoT ルールにあるアクションをクリックすると、メッセージが topic/telemetry_alerts トピックに再公開されることがわかります。 

再公開先のトピックである test/telemetry_alerts は、サンプルアプリケーションの AWS CloudFormation テンプレート宣言されている AWS IoT ルールアクションの定義の一部です。

トピックをサブスクライブして JSON 形式のメッセージが再発行されるかどうかを確認するには、次の手順に従います。

  1. AWS IoT Core サービスコンソールにアクセスする: https://console.aws.amazon.com/iot
  2. [テスト] で、[MQTT テストクライアント] を選択します。
  3. トピックフィルターに test/telemetry_alert と入力します
  4. [追加設定] セクションを展開し、[MQTT ペイロード表示] で [JSON ペイロードを自動フォーマット (読みやすさが向上)] オプションが選択されていることを確認します。
  5. [サブスクライブ] をクリックして、msgType MSGTYPE_ALERT という JSON 形式に変換されたメッセージが届くことを確認してください

仮想デバイスで稼働するコードを調べると、メッセージの約 20% が MSGTYPE_ALERT タイプで、メッセージは 5 秒ごとに送信されていることがわかります。警告メッセージが届くまで待つ場合があります。

クリーンアップ

このサンプルを実行した後にクリーンアップするには、以下のコマンドを実行します。

# delete the file descriptor object from the Amazon S3 Bucket
aws s3 rm s3://<YOUR_BUCKET_NAME>/msg/filedescriptor.desc

# detach all policies from the IoT service role
aws iam detach-role-policy --role-name IoTCoreServiceSampleRole \
  --policy-arn $(aws iam list-attached-role-policies --role-name IoTCoreServiceSampleRole --query 'AttachedPolicies[0].PolicyArn' --output text)

# delete the AWS CloudFormation Stack
aws cloudformation delete-stack --stack-name IotBlogPostSample
Bash

まとめ

今回の Blog でご紹介したように、AWS IoT Core で Protobuf を操作するのは、SQL ステートメントを書くのと同じくらい簡単です。Protobuf メッセージは、コスト削減 (帯域幅使用量の削減、デバイスの自律性の向上) と、protoc がサポートするプログラミング言語での開発のしやすさという点の両方において JSON よりも優れています。

AWS IoT Core ルールエンジンを使用して Protobuf 形式のメッセージをデコードする方法の詳細については、AWS IoT Core のドキュメントを参照してください。

サンプルコードは github リポジトリ https://github.com/aws-samples/aws-iotcore-protobuf-sample にあります。

decode 関数は、Amazon Kinesis Data Firehose にデータを転送する場合に特に便利です。デコードを実行するための AWS Lambda 関数を記述しなくても JSON 入力を受け付けることができるからです。

AWS IoT ルールアクションで利用できるサービス統合の詳細については、AWS IoT ルールアクションのドキュメントを参照してください。

著者紹介




José Gardiazabal José Gardiazabal は、AWS のプロトタイピングおよびクラウドエンジニアリングチームのプロトタイピングアーキテクトです。プロトタイピングにより AWS 上での可能性を示すことにより、お客様が構築したいシステムの実現ができるようサポートしています。彼は電子工学の学士号とコンピュータサイエンスの博士号を持っています。以前は医療機器とソフトウェアの開発に従事していました。




Donato Azevedo Donato Azevedo は、AWS のプロトタイピングおよびクラウドエンジニアリングチームのプロトタイピングアーキテクトです。AWS 上での可能性を示すことで、お客様の真のポテンシャルを引き出す支援をしています。制御工学の学士号を持ち、以前は石油・ガスおよび金属・鉱業企業の産業オートメーションに従事していました。

この記事はJosé GardiazabalとDonato Azevedoによって書かれた How to build smart applications using Protocol Buffers with AWS IoT Coreの日本語訳です。Solutions Architect の 西亀真之 が翻訳しました。