public note

AWS Database Migration Service による Change Data Capture: 後編

Amazon RDS から Google BigQuery への CDC 後編です。後編では、S3 に格納された Parquet ファイルのデータを BigQuery に登録する部分を扱います。

f:id:ts223:20220325233403p:plain
構成図

前編はこちらをご覧ください。

ts223.hatenablog.com

ソースコードは、前回同様にこちらで公開しています。

github.com

同期方法

今回は、簡易的な方法として AWS Lambda を使用しました。*1

S3 でのオブジェクト生成をトリガーに、Lambda Function が発火するようにしています。ちょうど良い機会でしたので、AWS re:Invent 2021 で発表された Amazon EventBridge による S3 イベント通知 を使ってみました。S3 バケットと Lambda Function が密結合にならず、とても嬉しいアップデートだなと実感しました。

Lambda Function

Lambda Function では Parquet ファイルの読み込みと BigQuery へのロードを行っています。 ライブラリとして google-cloud-bigquery を使用していますが、そのサイズは ZIP archive や Lambda Layer の上限を超えていますので、コンテナイメージ方式でデプロイしています。

FunctionLoadParquetToBigQuery:
  Type: AWS::Serverless::Function
  Properties:
    FunctionName: load_parquet_to_bigquery
    PackageType: Image
    MemorySize: 256
    Timeout: 60
    Role: !GetAtt RoleFunctionLoadParquetToBigQuery.Arn
    Environment:
      Variables:
        GCP_SA_SECRET_NAME: cdc-rds-bq-lambda
    Events:
      EventBridge:
        Type: EventBridgeRule
        Properties:
          Pattern:
            source:
              - aws.s3
            detail-type:
              - Object Created
            detail:
              bucket:
                name:
                  - Fn::ImportValue: !Sub ${S3Stack}-S3BucketName
              object:
                key:
                  - prefix: mysqlslap/
    DeadLetterQueue:
      TargetArn: !GetAtt DeadLetterQueue.Arn
      Type: SQS
  Metadata:
    Dockerfile: Dockerfile
    DockerContext: ./load_parquet_to_bigquery
    DockerTag: latest

アプリケーションコード

EventBridge 経由で送信されたイベント情報をもとに、S3 にある Parquet ファイルを取得しており、Bytes オブジェクトとしてメモリに直接読み出しています。*2

def get_s3_object_body(self, bucket_name: str, key: str) -> io.BytesIO:
    client = boto3.resource('s3')
    buffer = io.BytesIO()
    object = client.Object(bucket_name, key)
    object.download_fileobj(buffer)
    buffer.seek(0)
    return buffer

DMS では S3 への出力ファイルサイズの上限を設定できますので、そのサイズに合わせて Lambda Function のメモリサイズを決めるのがよいかなと思います。

BigQuery へのロードは、(前編ではストリーミングインサートという話をしていましたが...) load_table_from_file 関数に Bytes オブジェクトを渡しています。レコードの変更履歴はすべて保持していくものですので、追記形式で書き込みを行っています。

credentials = service_account.Credentials.from_service_account_info(info=service_account_info)
bq_client = bigquery.Client(credentials=credentials)

destination = f"{dataset_id}.{table_id}"
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    autodetect=True,
)

job = bq_client.load_table_from_file(
    file_obj=file_obj,
    destination=destination,
    job_config=job_config,
)
job.result()

同期結果

このように、BigQuery にテーブルが作成されるのを確認できました。

f:id:ts223:20220407223621p:plain

検証の都合で同一のレコードが複数入っていますが、もとのテーブル設計がきちんとしていれば、Primary Key をもとにレコードを抽出できそうです。このテーブルは履歴情報のようなものですので、実務上はこのテーブルをベースにして最新の状態を表現するビューを作成することになると思います。

まとめ

AWS DMS を使ってキャプチャした RDS の変更を、BigQuery に同期してみました。DMS は設定項目が多いので最初はうっ、となりますが、慣れてくると細かく設定できるし便利だという見え方に変わりました。今回使用した MySQL や S3 のほかにも DMS Endpoint は複数用意されているので、AWS でのデータ移行では有力な選択肢になりそうです。

*1:本来このようなデータ同期処理は、エラーハンドリングをしやすくするためにオーケストレーターを設けるのが理想的だと思います。

*2:buffer.seek(0) をしないと "Stream must be at beginning." のエラーが発生します。