Amazon Web Services ブログ

Amazon MQ for ActiveMQ での JMS 2.0 を使ったトランザクションの実装

このブログでは、Java Messaging System (JMS) 2.0 API を使って書かれたプロデューサクライアントアプリケーションを使用して、Amazon MQ の ActiveMQ ブローカーのトランザクション機能について説明します。JMS 2.0 API は使いやすく、前のバージョンに比べてインターフェースが少なくなっています。ActiveMQ の JMS 2.0 サポートの詳細に関しては、ActiveMQ の JMS2.0 に関するドキュメントを参照してください。また、JMS 2.0 の新機能も合わせてご確認ください。

Amazon MQActiveMQ 5.18 をサポートするようになりました。また、Amazon MQ では新しいセマンティックバージョニングシステムが導入され、マイナーバージョン (例: バージョン5.18) を表示し、同一のマイナーバージョン内の新しいパッチ (例: バージョン5.18.4) でブローカーを最新の状態に保つことができます。ActiveMQ 5.18 は、JMS 2.0、Spring 5.3.x のサポートと、いくつかの依存関係の更新とバグ修正が加えられています。詳細については、Active MQ 5.18.x リリースシリーズの リリースノート を参照してください。

概要

分散システムにおけるメッセージングパターン

メッセージブローカーベースの分散メッセージングにメッセージングサービスを実装する多くの場合、発信と受信を切り離すファイア・アンド・フォーゲットメカニズムが伴います。このとき、プロデューサはメッセージをブローカーに送信し、ブローカーがコンシューマーへのメッセージ配信を保証する責任があります。非トランザクションの利用事例では、各メッセージは互いに独立しています。ただし、時にはメッセージのグループを 1 つのトランザクションの一部としてコンシューマーに配信する必要があります。つまり、グループ内のすべてのメッセージがコンシューマーに配信されるか、またはそれらのメッセージが全く配信されないかのいずれかを意味します。

ActiveMQ 5.18 では、JMS トランザクションと XA トランザクションの 2 つのレベルのトランザクションサポートを提供しています。

JMS トランザクションは、複数のメッセージをアトミックな単位として ActiveMQ ブローカーに送信する必要がある場合に使用されます。このトランザクション機能は、Session(JMS 1.xの場合)またはJMSContext(JMS 2.0の場合)オブジェクトでcommit()およびrollback()メソッドを呼び出すことで有効になります。すべてのメッセージが正常に送信された場合、commit() メソッドによってトランザクションをコミットし、メッセージがユニットとして処理されることを保証できます。また、送信プロセス中に問題が発生した場合は、rollback() メソッドによってトランザクションをロールバックして、メッセージの部分的な配信を防ぐことができます。このようなトランザクション機能は、データの整合性を維持し、複雑なメッセージング操作が確実に実行されるために重要です。トランザクションの動作の詳細については、ActiveMQ FAQ – トランザクションの動作方法を参照してください。

XA トランザクションは、2 つ以上のメッセージを ActiveMQ ブローカーと その他の分散リソースに対してトランザクション方式で送信する必要がある場合に使用されます。これは XA リソースとしてはたらく XA セッションを使って実現されます。XA トランザクションの詳細については、ActiveMQ FAQ の Should I use XA transactions FAQ を参照してください。

注文管理システムにおけるトランザクション利用事例

このブログ記事の例では、メッセージの受け渡しを管理するソフトウェアである「メッセージブローカー」として ActiveMQ を使用した Order Management System (OMS) アプリケーションのトランザクション機能について説明します。受注時に、OMS アプリケーションはメッセージ 1 をウェアハウスキューに送信してパッキングプロセス(商品の梱包作業)を開始します。次に、アプリケーションは内部の業務処理を実行します。このプロセスが成功した場合、アプリケーションはメッセージ 2 を出荷キューに送信して配送プロセス(商品の出荷作業)を開始します。内部の業務処理が失敗した場合、メッセージ 2 が出荷キューに配信されるのを防ぎ、メッセージ 1 をウェアハウスキューからロールバック(取り消し)する必要があります。

下のフローチャートは、この例で紹介しているトランザクション利用事例のロジックを示しています。

トランザクション利用事例を説明するフローチャート

JMS クライアントは、トランザクションが commit または rollback されるまで、メッセージを両方ともメモリ内に保存します。これは、メッセージプロデューサークライアントとブローカー間のトランザクションセッションを維持することで実現されます。トランザクションセッションとは、メッセージ配信を保証するためにトランザクションを使用するセッションのことです。この例では、トランザクションセッションが次の文で作成されています。

JMSContext jmsContext = connectionFactory.createContext(adminUsername, adminPassword, Session.SESSION_TRANSACTED);

この記事のサンプルでは、メッセージプロデューサーとブローカー間でのトランザクションセッションを示しています。ブローカーとメッセージコンシューマー間のトランザクションは示していません。開発者の皆様は、同様のパターンを使って実装できます。

ActiveMQ ブローカーの作成

Amazon MQ で ActiveMQ ブローカーを作成するには、以下の前提条件が必要です。

前提条件:

ブローカーを作成する (AWS CLI):

  1. 次のコマンドを実行して、ブローカーを作成します。これは、テスト専用の公開ブローカーを作成します。本番環境用のブローカーを作成する場合は、Amazon MQ のセキュリティベストプラクティスを遵守してください。
    aws mq create-broker \ 
        --broker-name  \ 
        --engine-type activemq \ 
        --engine-version 5.18 \ 
        --deployment-mode SINGLE_INSTANCE \ 
        --host-instance-type mq.t3.micro \ 
        --auto-minor-version-upgrade \ 
        --publicly-accessible \ 
        --users Username =,Password =,ConsoleAccess = true 
    

    <broker-name> はブローカーに付けたい名前に置き換えてください。 <username> と <password> は create-broker CLI ドキュメントに従って置き換えてください。コマンドを実行すると、コマンドラインにBrokerArn と BrokerId が出力されるので、その値をメモしてください。ブローカーの作成には約 15 分かかります。

  2. 次のコマンドを実行して、ステータスを取得します。
    aws mq describe-broker --broker-id --query 'BrokerState'

    ブローカーの状態が Running になれば、次に進んでください。

  3. 次のコマンドを実行して、コンソール URL とその他のブローカーエンドポイントを取得します。
    aws mq describe-broker --broker-id --query 'BrokerInstances[0]'

    出力から ConsoleURL と SSL エンドポイントをメモしてください。

メッセージプロデューサークライアントの設定

このブログのサンプルコードでは、JMS 2.0 API を使って書かれたサンプルのメッセージプロデューサクライアントを使用して、ActiveMQ ブローカーにメッセージを送信しています。

  • トランザクションが成功した場合、プロデューサークライアントはメッセージを最初のキューに送信し、15 秒待機します。次に、メッセージを 2 番目のキューに送信し、さらに 15 秒待機します。最後に、トランザクションをコミットします。
  • トランザクションが失敗した場合、プロデューサークライアントは最初のメッセージを送信し、15 秒待機します。次に、コードで意図的な失敗を発生させ、トランザクションをロールバックさせます。15 秒の待機時間があれば、プログラムがトランザクション フローを進めている間にブローカー側のメッセージ数を確認できます。成功したトランザクションの場合でも、プロデューサーがコミットするまではメッセージはブローカーに送信されません。

サンプルクライアントをダウンロードして設定するには:

  1. Amazon MQ トランザクションサンプル Jar を GitHub リポジトリの指定のリンクから Jar ファイルをダウンロードします。
  2. サンプルクライアントを実行するには、jar ファイルにカプセル化されたプログラムを実行する -jar オプションを付けて java コマンドを使用します。サンプルクライアントを実行するための構文は次のとおりです。
    java -jar / username password ssl-endpoint first-queue second-queue message is-transaction-successful 

    使い方:
    <path-to-jar-file> – jar ファイルをダウンロードしたローカルマシン上のパス。
    <jar-filename> – jar ファイル名。
    <username> – ブローカーの作成時に選択したユーザー名。
    <password> – ブローカーの作成時に選択したパスワード。
    <ssl-endpoint> – 上記の手順で控えた SSL エンドポイント。
    <first-queue> – トランザクション内の最初のキューの名前。
    <second-queue> – トランザクション内の 2 つ目のキューの名前。
    <message> – メッセージのテキスト。
    <is-transaction-successful> – トランザクションが成功するかどうかをプロデューサークライアントに伝えるフラグ。

正常な取引のテスト

ActiveMQ での正常なトランザクションをテストするには、次の手順を実行します。

  1. ActiveMQ コンソールでキューとメッセージ数を確認する
    1. Amazon MQ コンソールにアクセスし、ActiveMQブローカーを選択します。
    2. Connectionsパネルから ActiveMQ Web Console にログインします。
    3. Manage ActiveMQ brokerをクリックします。
    4. ブローカー作成時に作成したユーザーのユーザー名とパスワードを入力します。
    5. トップナビゲーションバーでQueuesをクリックします。
    6. warehouse-queueshipping-queueがリストに表示されていないことを確認します。
  2. 以下のコマンドを実行して、order1 のメッセージを両方のキューに正常に送信します:
    java -jar <path-to-jar-file>/<jar-filename> <username> <password> <ssl-endpoint> warehouse-queue shipping-queue order1 true

    上記のコマンド説明にあるようにプレースホルダーを置き換えてください。このコマンドで、サンプルのプロデューサークライアントが最初のメッセージを warehouse-queue に送信し、次のメッセージをコンソールに出力して 15 秒待ちます。

    Sending message: order1 to the warehouse-queue 
     Message: order1 is sent to the queue: warehouse-queue but not yet committed.

    この 15 秒の待機中に、ブラウザを更新すると、warehouse-queue がリスト表示されますが、保留中またはエンキューされたメッセージはありません。15 秒後、プロデューサークライアントが 2 つ目のメッセージを shipping-queue に送信し、次のメッセージをコンソールに出力して 15 秒待ちます。

    Sending message: order1 to the shipping-queue 
     Message: order1 is sent to the queue: shipping-queue but not yet committed.
    

    この 15 秒の待機中に、ブラウザウィンドウを再度更新すると、shipping-queue がリストに表示されますが、warehouse-queue と同様に、保留中またはエンキューされたメッセージはありません。

    最後に、プロデューサークライアントが両方のメッセージをコミットし、次のメッセージを出力します。

    Committing 
     Transaction for Message: order1 is now completely committed.
    
  3. ブラウザを更新すると、warehouse-queue と shipping-queue に保留中とエンキューされたメッセージがそれぞれ 1 つずつあることを確認できます。リストは次のようになります:

    shipping と warehouse キュー

この手順を繰り返し、さらに成功したトランザクションをテストしてください。

失敗したトランザクションのテスト

  1. 各キュー内の保留中のメッセージとエンキューされたメッセージの最初の数をメモします。
  2. 次のコマンドを実行し、 false を渡して人為的な障害を導入します。
    java -jar <path-to-jar-file>/<jar-filename> <username> <password> <ssl-endpoint> warehouse-queue shipping-queue failedorder1 false

    このコマンドにより、サンプルのプロデューサークライアントは最初のメッセージをwarehouse-queueに送信し、以下のメッセージをコンソールに表示して15秒間待機します。

    Sending message: failedorder1 to the warehouse-queue 
     Message: failedorder1 is sent to the queue: warehouse-queue but not yet committed.
    

    15秒間の待機中にブラウザを更新し、warehouse-queueとshipping-queueのカウントが変更されていないことを確認してください。 最後にクライアントは人為的にエラーを発生させ、トランザクションをロールバックし、以下を表示します:

    Message: failedorder1 cannot be delivered because of an unknown error. Hence the transaction is rolled back.
  3. ブラウザを更新して、両方のキューのカウントが変更されていないことを確認してください。この例では、失敗したトランザクション後も各キューに1つずつメッセージがある状態から始まります。

    コンソール上 shipping キューと warehouse キューのカウントが変更されていない

成功と失敗の両方の場合において、トランザクションの一部としてキューに送信されるメッセージは、クライアント側でインメモリに格納されることに注意してください。これらのメッセージは、トランザクションがコミットされた時にのみブローカーに送信されます。

クリーンアップ

  1. 次のコマンドを実行してブローカーを削除します
    aws mq delete-broker --broker-id 

まとめ

この記事では、ActiveMQ バージョン 5.18 の Amazon MQ ブローカーを作成しました。また、Amazon MQ で導入された新しいセマンティックバージョニングについても学びました。ActiveMQ 5.18.x は JMS 2.0、Spring 5.3.x、および依存ライブラリの更新をサポートしています。最後に、ActiveMQ 5.18.x ブローカーのトランザクション機能を示しているサンプルアプリケーションを JMS 2.0 API を使用して作成しました。Amazon MQ の詳細については、https://aws.amazon.com/amazon-mq/ をご覧ください。

このブログは、シニアテクニカルアカウントマネージャーの Paras Jain とシニアソリューションアーキテクトの Vinodh Kannan Sadayamuthu によって執筆されたものをソリューションアーキテクト三厨が翻訳しました。原文はこちら