public note

Debezium Server による Change Data Capture: from MySQL to Google Cloud Pub/Sub

この記事は、datatech-jp Advent Calendar 2022 の19日目の記事です。


@tosh2230 と申します。以前から気になっていた Debezium Server による Change Data Capture(CDC) をやってみました。 具体的には、MySQL のレコードに変更が加わった際に、その情報を Google Cloud Pub/Sub に送信する構成を試しています。

ソースコードはこちらにあります。

github.com

Debezium Server

Debezium は、データベースにおけるレコードレベルの変更をイベントストリームに変換する、オープンソースの CDC プラットフォームです。Apache Kafka Connect に準拠した Source connector として動作します(以下、画像は公式ページより引用)。

https://debezium.io/documentation/reference/stable/_images/debezium-architecture.png

それに対して Debezium Server は、変更イベントの情報を Amazon Kinesis, Google Cloud Pub/Sub などのメッセージングサービスへ直接送信するアプリケーションです。

https://debezium.io/documentation/reference/stable/_images/debezium-server-architecture.png

Debezium provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Amazon Kinesis, Google Cloud Pub/Sub, Apache Pulsar or Redis (Stream).

Debezium Server :: Debezium Documentation

2022年12月時点では incubating state ですが、構成がシンプルである点や、パブリッククラウドサービスとの連携がしやすくなる点が魅力です。

debezium/debezium-examples

この Debezium および Debezium Server ですが、GitHub に設定のサンプルが公開されています。

GitHub - debezium/debezium-examples: Examples for running Debezium (Configuration, Docker Compose files etc.)

こちらには、各種データソースに対する設定ファイルのサンプルのみならず、Docker Compose ファイルも用意されています。コンテナイメージの元となる Dockerfile は、 debezium/container-images で公開されています。*1

今回試したかった MySQL -> Debezium Server -> Cloud Pub/Sub はサンプル実装がなかったので、PostgreSQL -> Debezium Server -> Cloud Pub/Sub のサンプル と、MySQL コンテナイメージを元につくっていきました。

準備

docker-compose.yml

MySQL と Debezium Server の二つで構成されています。

MySQL への接続設定は、コンテナイメージの中で設定しているユーザー情報をもとにしています。

また Debezium Server に対しては、Google Cloud サービスアカウントの認証情報と Debezium 設定ファイルである application.properties を渡しています。

application.properties

application.properties ファイルでは、データソースである MySQL に関する設定と、イベント送信先である Cloud Pub/Sub に関する設定を記述しています。

debezium.source

connector.class

io.debezium.connector.mysql.MySqlConnector を指定します。 接続情報は、コンテナイメージがビルドされる時に行われているレプリケーション設定をもとに入力しています。

tasks.max

必ず 1 が設定されます。これは、MySQL Connector は常にシングルタスクとして動作するためです。

table.include.list と topic.prefix

table.include.list は、CDC 対象とするテーブルを指定するプロパティです。database_name.table_name の書式で記述し、カンマ区切りで複数指定できます。

また、topic.prefix というプロパティで sink 先の名称の接頭辞を指定できます。今回の場合、topic.prefix=debezium-server-test, debezium.source.table.include.list=inventory.customers と設定しているので、Pub/Sub Topic の名称は debezium-server-test.inventory.customers と決まります。

その他

設定可能なプロパティは、MySQL Connector のプロパティ で確認できます。Debezium Server の場合は、各プロパティの Prefix に debezium.source. をつけるルールになっています。

debezium.sink

type

pubsub と指定します。上述のとおり、source の設定でテーブルごとの sink 先は決まっているので、sink 設定はだいぶシンプルです。

pubsub.project.id

このプロパティは、docker-compose.yml で指定している .env ファイルから読み込んだ値を使うようにしています。プロパティと同名の環境変数が設定されている場合、application.properties の設定よりも優先されます。下記は .env ファイルの記述例です。

DEBEZIUM_SINK_PUBSUB_PROJECT_ID=your-gcp-project-id

GCP リソース作成

Pub/Sub Topic

sink 設定によって決まった、debezium-server-test.inventory.customers という名称の Topic を作成します。

Pub/Sub Subscription

今回作成した Topic debezium-server-test.inventory.customers に紐付けた Subscription を作成します。検証を簡単にするため、Pull 型で作成します。

サービスアカウント

Google Cloud のサービスアカウントを作成し、上記 Topic への Publish 権限(roles/pubsub.publisher) を付与します。Key ファイルを作成し、docker-compose.yml で指定している ./gcp_service_account.json とリネームして配置します。

動作確認

ここからは、動作確認の手順と注意事項を記します。

コンテナ起動

$ docker-compose up -d

Debezium Server 起動確認

コンテナ起動直後は、MySQL や Pub/Sub との接続に失敗することがあります。debezium-server コンテナを何回か再起動するとうまくいきます。

$ docker-compose restart debezium-server

イベント待ちの状態になると、下記のようなログが出力されます。

$ docker-compose logs -f debezium-server
(中略)
debezium-server    | {"timestamp":"2022-12-18T14:21:35.326Z","sequence":205,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.mysql.MySqlStreamingChangeEventSource","level":"INFO","message":"Keepalive thread is running","threadName":"debezium-mysqlconnector-debezium-server-test-change-event-source-coordinator","threadId":23,"mdc":{"dbz.taskId":"0","dbz.connectorName":"debezium-server-test","dbz.connectorType":"MySQL","dbz.connectorContext":"streaming"},"ndc":"","hostName":"53d29206a50a","processName":"io.debezium.server.Main","processId":1}

MySQL コンテナへのログイン

$ docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD'

inventory.customers テーブルへのレコード挿入

適当な値で INSERT 文を発行します。

mysql> USE inventory;
mysql> insert into customers(first_name, last_name, email) values('test', '02', 'test02@example.com');

このイベントの捕捉に成功すると、debezium-server コンテナで下記のようなログが出力されます。

debezium-server    | {"timestamp":"2022-12-18T14:24:18.564Z","sequence":206,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"1 records sent during previous 00:02:45.272, last recorded offset of {server=debezium-server-test} partition is {transaction_id=null, ts_sec=1671373458, file=mysql-bin.000003, pos=236, row=1, server_id=223344, event=2}","threadName":"pool-7-thread-1","threadId":16,"mdc":{},"ndc":"","hostName":"53d29206a50a","processName":"io.debezium.server.Main","processId":1}

メッセージ確認

Subscription から pull するスクリプトで、Debezium Server から送信されたメッセージの内容を確認します。*2

$ python pubsub_pull.py | jq .             
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "debezium-server-test.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "debezium-server-test.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "debezium-server-test.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1007,
      "first_name": "test",
      "last_name": "02",
      "email": "test02@example.com"
    },
    "source": {
      "version": "2.0.1.Final",
      "connector": "mysql",
      "name": "debezium-server-test",
      "ts_ms": 1671294516000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 728,
      "row": 0,
      "thread": 22,
      "query": null
    },
    "op": "c",
    "ts_ms": 1671330519454,
    "transaction": null
  }
}

schema には変更イベント前後のスキーマ情報が、payload には変更前後のレコード情報が入っていることがわかります。

また、末尾のほうにある op には c(create) とありますので、ここで変更イベントの種類を特定できます。

まとめ

MySQL -> Debezium Server -> Cloud Pub/Sub を試してみました。Pub/Sub へ送信できれば、Cloud Dataflow や BigQuery など複数のサービスにつなげることができるので、応用がきく嬉しい仕組みです。

Incubating なので大々的に活用するには慎重な検討が必要そうですが、スポット活用ならば有力な選択肢になるのではと考えました。引き続き注目していこうと思います。

*1:このリポジトリを見つけるまで、コンテナの初期状態が分からないので Debezium の source 設定値をどのようにすればよいかも分からない...となっており、それはそれは苦労しました

*2:検証目的のスクリプトですので、あえて ACK 処理をコメントアウトしています