public note

Cloud Pub/Sub でキューの長さを確認する

Cloud Pub/Sub で作成したTopicのサイズを確認する方法を調べていたのだが、ちょっと癖があったのでメモ。

Python での実装

import os
from datetime import datetime

from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

PROJECT = os.environ['GCP_PROJECT']
METRIC = 'pubsub.googleapis.com/subscription/num_undelivered_messages'

def qsize():
    client = monitoring_v3.MetricServiceClient()
    pubsub_query = query.Query(
        client,
        PROJECT,
        metric_type=METRIC,
        end_time=datetime.now(),
        minutes=2   # if set 1 minute, we get nothing while creating the latest metrics.
    )

    for content in pubsub_query:
        subscription_id = content.resource.labels['subscription_id']
        count = content.points[0].value.int64_value
        print(f'{subscription_id}: {count}')

ポイント

  • Cloud Pub/Sub には該当する API がなく、Cloud Monitoring 経由で取得すること
  • モニタリングの周期は1分単位であること
  • 最新の結果を集計するのにタイムラグがあること
    • 常に最新の結果を得ようとminutes=1 にすると、xx分00秒になってから数秒の間は「直前の期間におけるサイズを集計している状態」となり、結果は空で返ってくる。
    • そのため、この実装ではminutes=2 に設定して、過去2分間(=2回分)を取得して、その中から直近のモニタリング結果を取得するようにした。

(参考) Amazon SQS キューの長さを確認する方法

import boto3

session = boto3.Session()
client = session.client('sqs')

ATTRIBUTE_NAMES = [
    'ApproximateNumberOfMessages'
]

def qsize():
    queue_url_list = client.list_queues()['QueueUrls']
    for queue_url in queue_url_list:
        response= clinet.get_queue_attributes(
            QueueUrl=queue_url,
            AttributeNames=ATTRIBUTE_NAMES
        )
        size = response['Attributes']['ApproximateNumberOfMessages']
        print(f'{queue_url}: {size}')

わかりやすい… AWSのやさしさ...

参考にしたページ

Monitoring Client Libraries  |  Google Cloud

Google Cloud metrics  |  Cloud Monitoring

Google PubSub - Counting messages in topic - Stack Overflow

以下は個人的なメモ書き

Cloud Pub/Sub の特徴

  • ストリーミングデータの受け口として使うことができる
  • ファンイン・ファンアウトともに可能で、両方組み合わせることもできる
  • FIFOキューも指定できる
  • 可視性タイムアウトの設定ができる
  • デッドレターキューの設定ができる
  • Topic のエンドポイントがグローバルである

    Service APIs Overview  |  Cloud Pub/Sub  |  Google Cloud

すごく器用で色々な使い方ができる。AWS でいうと、Amazon Kinesis Stream , Amazon SNS, Amazon SQS が一つのサービスに統合されている、といったところ。また、グローバルエンドポイントというのは、設計する上でリージョンをあまり考えなくてよくなるのでよさそうに思った。一方で、器用すぎるので実装イメージがしにくく、どう使えばよいかわからない、となりそう。上述のAWSサービスに心得があれば、だいたい同じ考え方でいけると思われる。

使い方

重要なポイントだけに絞ると、以下のようになる。

  • メッセージングの構成を考える
    • Publisher, Topic, Subscriber の数はそれぞれいくつなのかを決める
  • Topic をつくる
  • Subscription をつくる
    • Push 型かPull 型かを決める
    • Push 型の場合、メッセージを送り込む Endpoint URL(Cloud Functions、Cloud Run など)を指定する
    • Pull 型の場合、Subscriber にてメッセージ受信処理を実装する
      • 同期 Pull か 非同期 Pull かを選ぶ
  • Publisherにて、Topic へのメッセージ送信処理を実装する