Debezium Server による Change Data Capture: from MySQL to Google Cloud Pub/Sub
この記事は、datatech-jp Advent Calendar 2022 の19日目の記事です。
@tosh2230 と申します。以前から気になっていた Debezium Server による Change Data Capture(CDC) をやってみました。 具体的には、MySQL のレコードに変更が加わった際に、その情報を Google Cloud Pub/Sub に送信する構成を試しています。
ソースコードはこちらにあります。
Debezium Server
Debezium は、データベースにおけるレコードレベルの変更をイベントストリームに変換する、オープンソースの CDC プラットフォームです。Apache Kafka Connect に準拠した Source connector として動作します(以下、画像は公式ページより引用)。
それに対して Debezium Server は、変更イベントの情報を Amazon Kinesis, Google Cloud Pub/Sub などのメッセージングサービスへ直接送信するアプリケーションです。
Debezium provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Amazon Kinesis, Google Cloud Pub/Sub, Apache Pulsar or Redis (Stream).
Debezium Server :: Debezium Documentation
2022年12月時点では incubating state ですが、構成がシンプルである点や、パブリッククラウドサービスとの連携がしやすくなる点が魅力です。
debezium/debezium-examples
この Debezium および Debezium Server ですが、GitHub に設定のサンプルが公開されています。
こちらには、各種データソースに対する設定ファイルのサンプルのみならず、Docker Compose ファイルも用意されています。コンテナイメージの元となる Dockerfile は、 debezium/container-images で公開されています。*1
今回試したかった MySQL -> Debezium Server -> Cloud Pub/Sub はサンプル実装がなかったので、PostgreSQL -> Debezium Server -> Cloud Pub/Sub のサンプル と、MySQL コンテナイメージを元につくっていきました。
準備
docker-compose.yml
MySQL と Debezium Server の二つで構成されています。
MySQL への接続設定は、コンテナイメージの中で設定しているユーザー情報をもとにしています。
また Debezium Server に対しては、Google Cloud サービスアカウントの認証情報と Debezium 設定ファイルである application.properties を渡しています。
application.properties
application.properties ファイルでは、データソースである MySQL に関する設定と、イベント送信先である Cloud Pub/Sub に関する設定を記述しています。
debezium.source
connector.class
io.debezium.connector.mysql.MySqlConnector
を指定します。
接続情報は、コンテナイメージがビルドされる時に行われているレプリケーション設定をもとに入力しています。
tasks.max
必ず 1 が設定されます。これは、MySQL Connector は常にシングルタスクとして動作するためです。
table.include.list と topic.prefix
table.include.list は、CDC 対象とするテーブルを指定するプロパティです。database_name.table_name
の書式で記述し、カンマ区切りで複数指定できます。
また、topic.prefix というプロパティで sink 先の名称の接頭辞を指定できます。今回の場合、topic.prefix=debezium-server-test
, debezium.source.table.include.list=inventory.customers
と設定しているので、Pub/Sub Topic の名称は debezium-server-test.inventory.customers
と決まります。
その他
設定可能なプロパティは、MySQL Connector のプロパティ で確認できます。Debezium Server の場合は、各プロパティの Prefix に debezium.source.
をつけるルールになっています。
debezium.sink
type
pubsub
と指定します。上述のとおり、source の設定でテーブルごとの sink 先は決まっているので、sink 設定はだいぶシンプルです。
pubsub.project.id
このプロパティは、docker-compose.yml で指定している .env ファイルから読み込んだ値を使うようにしています。プロパティと同名の環境変数が設定されている場合、application.properties の設定よりも優先されます。下記は .env ファイルの記述例です。
DEBEZIUM_SINK_PUBSUB_PROJECT_ID=your-gcp-project-id
GCP リソース作成
Pub/Sub Topic
sink 設定によって決まった、debezium-server-test.inventory.customers
という名称の Topic を作成します。
Pub/Sub Subscription
今回作成した Topic debezium-server-test.inventory.customers
に紐付けた Subscription を作成します。検証を簡単にするため、Pull 型で作成します。
サービスアカウント
Google Cloud のサービスアカウントを作成し、上記 Topic への Publish 権限(roles/pubsub.publisher) を付与します。Key ファイルを作成し、docker-compose.yml で指定している ./gcp_service_account.json
とリネームして配置します。
動作確認
ここからは、動作確認の手順と注意事項を記します。
コンテナ起動
$ docker-compose up -d
Debezium Server 起動確認
コンテナ起動直後は、MySQL や Pub/Sub との接続に失敗することがあります。debezium-server コンテナを何回か再起動するとうまくいきます。
$ docker-compose restart debezium-server
イベント待ちの状態になると、下記のようなログが出力されます。
$ docker-compose logs -f debezium-server (中略) debezium-server | {"timestamp":"2022-12-18T14:21:35.326Z","sequence":205,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.mysql.MySqlStreamingChangeEventSource","level":"INFO","message":"Keepalive thread is running","threadName":"debezium-mysqlconnector-debezium-server-test-change-event-source-coordinator","threadId":23,"mdc":{"dbz.taskId":"0","dbz.connectorName":"debezium-server-test","dbz.connectorType":"MySQL","dbz.connectorContext":"streaming"},"ndc":"","hostName":"53d29206a50a","processName":"io.debezium.server.Main","processId":1}
MySQL コンテナへのログイン
$ docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD'
inventory.customers テーブルへのレコード挿入
適当な値で INSERT 文を発行します。
mysql> USE inventory; mysql> insert into customers(first_name, last_name, email) values('test', '02', 'test02@example.com');
このイベントの捕捉に成功すると、debezium-server コンテナで下記のようなログが出力されます。
debezium-server | {"timestamp":"2022-12-18T14:24:18.564Z","sequence":206,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"1 records sent during previous 00:02:45.272, last recorded offset of {server=debezium-server-test} partition is {transaction_id=null, ts_sec=1671373458, file=mysql-bin.000003, pos=236, row=1, server_id=223344, event=2}","threadName":"pool-7-thread-1","threadId":16,"mdc":{},"ndc":"","hostName":"53d29206a50a","processName":"io.debezium.server.Main","processId":1}
メッセージ確認
Subscription から pull するスクリプトで、Debezium Server から送信されたメッセージの内容を確認します。*2
$ python pubsub_pull.py | jq . { "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "debezium-server-test.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "debezium-server-test.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "name": "event.block", "version": 1, "field": "transaction" } ], "optional": false, "name": "debezium-server-test.inventory.customers.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": 1007, "first_name": "test", "last_name": "02", "email": "test02@example.com" }, "source": { "version": "2.0.1.Final", "connector": "mysql", "name": "debezium-server-test", "ts_ms": 1671294516000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 728, "row": 0, "thread": 22, "query": null }, "op": "c", "ts_ms": 1671330519454, "transaction": null } }
schema
には変更イベント前後のスキーマ情報が、payload
には変更前後のレコード情報が入っていることがわかります。
また、末尾のほうにある op
には c(create) とありますので、ここで変更イベントの種類を特定できます。
まとめ
MySQL -> Debezium Server -> Cloud Pub/Sub を試してみました。Pub/Sub へ送信できれば、Cloud Dataflow や BigQuery など複数のサービスにつなげることができるので、応用がきく嬉しい仕組みです。
Incubating なので大々的に活用するには慎重な検討が必要そうですが、スポット活用ならば有力な選択肢になるのではと考えました。引き続き注目していこうと思います。