Cloud Pub/Sub で作成したTopicのサイズを確認する方法を調べていたのだが、ちょっと癖があったのでメモ。
Python での実装
import os from datetime import datetime from google.cloud import monitoring_v3 from google.cloud.monitoring_v3 import query PROJECT = os.environ['GCP_PROJECT'] METRIC = 'pubsub.googleapis.com/subscription/num_undelivered_messages' def qsize(): client = monitoring_v3.MetricServiceClient() pubsub_query = query.Query( client, PROJECT, metric_type=METRIC, end_time=datetime.now(), minutes=2 # if set 1 minute, we get nothing while creating the latest metrics. ) for content in pubsub_query: subscription_id = content.resource.labels['subscription_id'] count = content.points[0].value.int64_value print(f'{subscription_id}: {count}')
ポイント
- Cloud Pub/Sub には該当する API がなく、Cloud Monitoring 経由で取得すること
- モニタリングの周期は1分単位であること
- 最新の結果を集計するのにタイムラグがあること
- 常に最新の結果を得ようと
minutes=1
にすると、xx分00秒になってから数秒の間は「直前の期間におけるサイズを集計している状態」となり、結果は空で返ってくる。 - そのため、この実装では
minutes=2
に設定して、過去2分間(=2回分)を取得して、その中から直近のモニタリング結果を取得するようにした。
- 常に最新の結果を得ようと
(参考) Amazon SQS キューの長さを確認する方法
import boto3 session = boto3.Session() client = session.client('sqs') ATTRIBUTE_NAMES = [ 'ApproximateNumberOfMessages' ] def qsize(): queue_url_list = client.list_queues()['QueueUrls'] for queue_url in queue_url_list: response= clinet.get_queue_attributes( QueueUrl=queue_url, AttributeNames=ATTRIBUTE_NAMES ) size = response['Attributes']['ApproximateNumberOfMessages'] print(f'{queue_url}: {size}')
わかりやすい… AWSのやさしさ...
参考にしたページ
Monitoring Client Libraries | Google Cloud
Google Cloud metrics | Cloud Monitoring
Google PubSub - Counting messages in topic - Stack Overflow
以下は個人的なメモ書き
Cloud Pub/Sub の特徴
- ストリーミングデータの受け口として使うことができる
- ファンイン・ファンアウトともに可能で、両方組み合わせることもできる
- FIFOキューも指定できる
- 可視性タイムアウトの設定ができる
- デッドレターキューの設定ができる
Topic のエンドポイントがグローバルである
すごく器用で色々な使い方ができる。AWS でいうと、Amazon Kinesis Stream , Amazon SNS, Amazon SQS が一つのサービスに統合されている、といったところ。また、グローバルエンドポイントというのは、設計する上でリージョンをあまり考えなくてよくなるのでよさそうに思った。一方で、器用すぎるので実装イメージがしにくく、どう使えばよいかわからない、となりそう。上述のAWSサービスに心得があれば、だいたい同じ考え方でいけると思われる。
使い方
重要なポイントだけに絞ると、以下のようになる。
- メッセージングの構成を考える
- Publisher, Topic, Subscriber の数はそれぞれいくつなのかを決める
- Topic をつくる
- Subscription をつくる
- Push 型かPull 型かを決める
- Push 型の場合、メッセージを送り込む Endpoint URL(Cloud Functions、Cloud Run など)を指定する
- Pull 型の場合、Subscriber にてメッセージ受信処理を実装する
- 同期 Pull か 非同期 Pull かを選ぶ
- Publisherにて、Topic へのメッセージ送信処理を実装する