Amazon SQS と Cloud Pub/Sub のインターフェースを統一する
はじめに
小品ですが、ライブラリをつくっています。 その特徴やつくっている背景、解決したいことについてまとめています。
特徴
名前のとおり、キューのハブです。
- Amazon SQS と Google Cloud Pub/Sub を同一のインターフェースで扱うことができる
- 複数の Topic に同じメッセージを送信できる
- 複数の Subscription から、指定した優先度にしたがってメッセージを受信できる
- 別のキューにメッセージを転送できる
- 本番環境に流れているメッセージをテスト環境へ複製する
- 障害時に後続処理を別の経路に逃がす、などの用途
以下、パブリッククラウドサービスについては、AWSとGCPを対象として書いています。また想定しているシステムは、いわゆるデータ基盤です。
前提
データ基盤の内部コンポーネントを、その果たす役割で以下のように分類してみました。
- オーケストレータ
- メッセージング
- コンピューティング
- ストレージ
パブリッククラウドに構築しているデータ基盤では、これらの要素が結集しているモノリスな構成ではなく、少なくともどれかは分離した形で構成されていると思います。このうち、オーケストレータ、コンピューティング、ストレージの3つをつなぐのがメッセージングです。IoTだとMQTTがメジャーだと思われますが、いわゆるWeb界隈ではHTTPを用いることがほとんどだと思いますので、ここではHTTPのみを対象として話を進めます。また、コンポーネント間のメッセージングを焦点としていますので、メッセージングはキューイングと同じことを指していると考えてもらえればと思います。
背景
メッセージングは、文字通り何らかのメッセージを送受信することが目的ですが、直接的なデータそのものを受け渡すだけではなく、「何らかの操作・処理が行われた」というイベント情報や、後続処理に渡したいパラメータを伝える役割もあります。後者の用途により、先に挙げたその他3つのコンポーネントと比較して、メッセージングは他のサービスに対する独立性が低い(結合度が高い)傾向にあります。
例として、Cloud Pub/Sub の Push Subscription を考えます。その設定の中で送信先となるHTTPエンドポイントを指定しますが、これはHTTPリクエストをトリガーに動作する Google App Engine や Cloud Functions、Cloud Run などを意識した仕様だと考えます。この Destination を無理やりAWSにする場合、最小構成でもAmazon API Gateway & AWS Lambda という構成になり、かつパブリックネットワークを経由するためにアクセス制限を別途検討したりと話が大きくなってしまいます。
同様にAmazon SQSも、AWS SAMのような開発サービスを使いつつ、Destination をAWSのサービスにしたほうが楽に開発ができます。このことからメッセージングサービスは、それ自身が属するパブリッククラウドへの依存度が相対的に高い、といえます。
課題
こうした背景により、メッセージングサービスの受け持つ機能やインターフェースは、パブリッククラウドごとに大きく異なります。仮に、別のパブリッククラウドのコンピューティングサービスがよさそうに思えても、そのサービスとやりとりをするためのメッセージングの実装において、これまでと異なる仕様を読み取りながら新たにプログラムを書くことになります。これは、マルチクラウド構成のアーキテクチャを検討する際に、工数的にも心理的にもハードルが上がる要因になります。
提案
前置きが長くなりましたが、こうした課題に取り組むべく、冒頭で紹介したqueuing-hub
をつくりはじめました。
両クラウドのキューイングサービスである Amazon SQS と Cloud Pub/Sub に対するインターフェースを統一し、"Publisherによる送信"と"Subscriberによる受信"を簡単に行えるようにしました。メッセージそのものをどのように扱うか*1はTopicやSubscriptionの設定に依存するものとして、送受信処理のみを統一しています。
加えて、別リージョン・マルチクラウドでの冗長構成*2もできるように、複数のTopicへの送信や、複数のSubscriptionからの優先度つき受信の機能を備えています。送受信先の指定は個別指定も可能ですが、使用しているIAM Roleやサービスアカウントがアクセスできるものすべてに送信、もしくはすべてから受信が数行で実装できます。IAMで送受信先をコントロールできてソースコードの修正が不要になるので、個人的にはこちらがおすすめです。
また、補助的な機能として、別のキューへの転送や一括パージ機能もつけています。本番環境で流れているメッセージを一時的に検証環境へ流すなど、キューに入ったメッセージを操作しやすくする機能になっていると思います。
Publisher
from queuing_hub.publisher import Publisher pub = Publisher() # Send a message to all queues accessible by default response = pub.push(topic_list=pub.topic_list, body='Hello world!')
Subscriber
from queuing_hub.subscriber import Subscriber sub = Subscriber() # Receive messages with list ascending priority from queues accessible by default response = sub.pull(sub_list=sub.sub_list, max_num=1, ack=True)
Forwarder
from queuing_hub.forwarder import Forwarder fwd = Forwarder(sub=sub.sub_list[0], topic=pub.topic_list[0], max_num=1) # copy message response_0 = fwd.pass_through() # move message response_1 = fwd.transport()
今後の展望
とはいえ、つくったばかりなので改善点は多くあります。現在検討しているものをいくつか挙げます。
- シンプルさを優先したため、もともと備わっている機能を削ぎ落としていること
- Subscription の優先度指定ルールが単純であること
- あわよくば、STP(Spanning Tree Protocol) と同じようなことを実現してみたい
- 複数のSubscriptionを指定している場合、どれか一つのメッセージを受信したとしても、その他のSubscriptionで同じメッセージが残ること
- メッセージの保持期限が過ぎて消えるのを待つか、同じメッセージでもう一度処理が走るかのどちらか
- Amazon SQS と Cloud Pub/Sub の後続処理は冪等であることが前提とはいえ、無為に動かすのは本意ではない
- まだpypiに公開していないこと...
今後も実際に動かしながら、他にも考慮すべきポイントがないかを検討していきます。少しずつ改良を続けていって、より楽しいデータ基盤開発につながる何かが生まれたらいいなと思っています。