public note

AWS Database Migration Service による Change Data Capture: 前編

AWS Database Migration Service (以下、DMS) で Change Data Capture(CDC) を試してみました。

目的

目的は、Private Subnet にある RDS の変更レコードを、Google BigQuery へ転送することです。 CDC を行う OSS には Debezium がありますが、運用でなるべく楽をしたかったので、DMS を選んでみました。

構成

f:id:ts223:20220325233403p:plain
構成図

EC2 から RDS(MySQL 8.0) に対してレコードの変更を行い、それを DMS が捕捉して S3 バケットへ出力します。VPC は本番っぽい雰囲気にしてみました。出力先を S3 にした理由は、AWS から外部にデータを転送するのは S3 が一番安いこと*1Kinesis Stream よりもエラー発生時のハンドリングが楽そう、の二点です。

このあと、S3 バケットへの Put をトリガーに Lambda Function で BigQuery へ Streaming Insert を行う予定です。前編では S3 バケットへのファイル出力までを扱います。

ソースコードは下記のリポジトリで公開しています(CloudFormation で作成しています)。

github.com

検討にあたっては、以下の記事を参考にさせていただきました。

zenn.dev

DMS のしくみ

DMS の概要図

DMS は、Source や Target と接続するための Endpoint と、Replication Instance、そこで実行される Replication Task で構成されています。今回は Source を RDS for MySQL, Target を S3 としています。

https://docs.aws.amazon.com/dms/latest/userguide/images/datarep-Welcome.png

What is AWS Database Migration Service? - AWS Database Migration Service

MigrationType

MigrationType は3つの中から選べます。今回は CDC only を試しました。

  • Full load (Migrate existing data) – If you can afford an outage long enough to copy your existing data, this option is a good one to choose. This option simply migrates the data from your source database to your target database, creating tables when necessary.
  • Full load + CDC (Migrate existing data and replicate ongoing changes) – This option performs a full data load while capturing changes on the source. After the full load is complete, captured changes are applied to the target. Eventually, the application of changes reaches a steady state. At this point, you can shut down your applications, let the remaining changes flow through to the target, and then restart your applications pointing at the target.
  • CDC only (Replicate data changes only) – In some situations, it might be more efficient to copy existing data using a method other than AWS DMS. For example, in a homogeneous migration, using native export and import tools might be more efficient at loading bulk data. In this situation, you can use AWS DMS to replicate changes starting when you start your bulk load to bring and keep your source and target databases in sync.

Components of AWS DMS - AWS Database Migration Service

環境構築のポイント

RDS パラメータグループ

MySQL の binlog_format を ROW にします。

  DBParameterGroup:
    Type: AWS::RDS::DBParameterGroup
    Properties:
      Description: Parameter group for mysql 8.0
      Family: "mysql8.0"
      Parameters:
        "binlog_format": "ROW"

RDS への接続情報は SecretsManager に入れると便利

DMS Endpoint から RDS へ接続するための情報は、SecretsManager に入れると管理しやすいと思います。

こんな感じにシークレットとロールと作成して*2

  RdsSecret:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: !Ref StackName
      SecretString: !Sub '{
          "username": "${RdsMasterUsername}",
          "password": "${RdsMasterUserPassword}",
          "engine": "mysql",
          "host": "${RdsInstance.Endpoint.Address}",
          "port": ${RdsInstance.Endpoint.Port},
          "dbname": "${RdsDbName}",
          "dbInstanceIdentifier": "${RdsInstance}"
        }'

  DmsRdsEndpointAccessRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: dms-rds-endpoint-access-role
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - dms.ap-northeast-1.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: dms-rds-endpoint-access-policy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - secretsmanager:GetSecretValue
                Resource:
                  - !Ref RdsSecret

Endpoint 作成のときにそれぞれの ARN を指定すればよいので便利です。

  DmsEndpointSourceMySql:
    Type: AWS::DMS::Endpoint
    Properties:
      EndpointIdentifier: source-endpoint-mysql
      EngineName: mysql
      EndpointType: source
      MySqlSettings:
        SecretsManagerSecretId:
          Fn::ImportValue: !Sub ${VpcResourceStack}-RdsSecretArn
        SecretsManagerAccessRoleArn:
          Fn::ImportValue: !Sub ${VpcResourceStack}-DmsRdsEndpointAccessRoleArn

Replication Instance 用の Subnet Group を作成するときに特定の Role が必要

Replication Instance 用の Subnet Group をマネジメントコンソールで作成すると、AmazonDMSVPCManagementRole をもつロールが裏側で自動生成されます。 CloudFormation で作成する場合はこのロールは作成されないため、CreateReplicationSubnetGroup のステップでエラーになります。そのため、以下のように個別に作成します。

  DmsVpcRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: dms-vpc-role
      Path: /
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - dms.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonDMSVPCManagementRole

https://forums.aws.amazon.com/thread.jspa?messageID=921775

S3 への出力設定

Target Endpoint の設定で、S3 への出力方式を指定できます。出力ファイル形式は、CSVApache Parquet から選べます。

以下のように設定した場合、だいたい60秒ごとに s3://bucketName/schemaName/tableName/YYYY/MM/DD/YYYYMMDD-hhmmssSSS.parquet として出力されます。

  DmsEndpointTargetS3:
    Type: AWS::DMS::Endpoint
    Properties:
      EndpointIdentifier: target-endpoint-s3
      EngineName: s3
      EndpointType: target
      S3Settings:
        BucketName:
          Fn::ImportValue: !Sub ${S3Stack}-S3BucketName
        ServiceAccessRoleArn:
          Fn::ImportValue: !Sub ${S3Stack}-DmsS3EndpointAccessRoleArn
        EncryptionMode: SSE_S3
        CdcMaxBatchInterval: 60
        DataFormat: parquet
        ParquetVersion: PARQUET_2_0
        CompressionType: GZIP
        DatePartitionEnabled: true
        DatePartitionSequence: YYYYMMDD
        DatePartitionDelimiter: SLASH

捕捉対象とするスキーマとテーブルの指定

(一般にRDBMSでいう)スキーマとテーブルの指定は、ReplicationTask の TableMappings の設定で実現できます。以下の例では、RDS にあるすべてのスキーマ・テーブルを対象にしています。

  ReplicationTask:
    Type: AWS::DMS::ReplicationTask
    Properties:
      ReplicationTaskIdentifier: !Ref AWS::StackName
      MigrationType: cdc
      SourceEndpointArn: !Ref DmsEndpointSourceMySql
      TargetEndpointArn: !Ref DmsEndpointTargetS3
      ReplicationInstanceArn:
        Fn::ImportValue: !Sub ${VpcResourceStack}-ReplicationInstanceArn
      TableMappings: '{
          "rules": [{
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": { "schema-name": "%", "table-name": "%" },
            "rule-action": "include"
          }]
        }'

Selection rules and actions - AWS Database Migration Service

検証

EC2 から mysqlslap を実行

EC2 からRDS に向けてレコードの操作を行います。今回は MySQL の負荷エミュレーションクライアントであり、MySQL Client に入っている mysqlslap を使いました。

MySQL :: MySQL 8.0 Reference Manual :: 4.5.8 mysqlslap — A Load Emulation Client

このようにして大量のクエリを発行します。mysqlslap が行った変更内容が S3 に出力されていれば OK です。

mysqlslap \
    --user=dbuser \
    --password=dbpassword \
    --host=endpointarn.region.rds.amazonaws.com \
    --engine=innodb \
    --concurrency=5 \
    --iterations=100 \
    --auto-generate-sql \
    --auto-generate-sql-load-type=mixed \
    --number-char-cols=5 \
    --number-int-cols=5 \
    --auto-generate-sql-write-number=1000

S3 出力結果

出力された s3://bucketname/mysqlslap/t1/2022/03/25/20220325-105314539.parquet の内容を、PyArrow を使って見てみます。

import pyarrow.parquet as pq

df = pq.read_table("20220325-105314539.parquet").to_pandas()
df.info()
print(df.loc[:, ["Op", "CdcTimestamp", "intcol1", "charcol1"]].head())
$ python pq_reader.py
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6762 entries, 0 to 6761
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   Op            6762 non-null   object
 1   CdcTimestamp  6762 non-null   object
 2   intcol1       6762 non-null   int32 
 3   intcol2       6762 non-null   int32 
 4   intcol3       6762 non-null   int32 
 5   intcol4       6762 non-null   int32 
 6   intcol5       6762 non-null   int32 
 7   charcol1      6762 non-null   object
 8   charcol2      6762 non-null   object
 9   charcol3      6762 non-null   object
 10  charcol4      6762 non-null   object
 11  charcol5      6762 non-null   object
dtypes: int32(5), object(7)
memory usage: 502.0+ KB
  Op                CdcTimestamp     intcol1                                           charcol1
0  I  2022-03-25 10:52:08.000000   364531492  SuKo4M5OM7ldvisSc6WK9rsG9E8sSixocHdgfa5uiiNTGF...
1  I  2022-03-25 10:52:09.000000   831083598  uM4AMHY0BmXJ4TTmfOiqCj68zYIGxDFjbJlpd9pgvCr0iK...
2  I  2022-03-25 10:52:09.000000  1120090212  dqYCp1NIXNHwBkTsAXo6jksWbzaOJXppDE12FHCmgLar53...
3  I  2022-03-25 10:52:09.000000  1798340127  7nZN3RdI96aQOP4Y0z7dlO4wbQF1OpvYdLnggGrqMP8mjC...
4  I  2022-03-25 10:52:09.000000  1066832056  TpfJIwjeemwIeMLL8De6AogdtGl5wTZPA4kTtR7X5wpjIA...

Insert した内容が出力できています。これで CDC データを S3 に出力できるところまで確認できました。

(後編に続く)


2022-04-07: 後編を書きました

ts223.hatenablog.com

*1:さらにコストを抑える方法があれば教えていただけると嬉しいです

*2:本来は、CDC 専用のユーザを作成するのが適切だと思います