public note

Cloud Dataflow と Cloud Pub/Sub で Exactly-once なストリーミングインサートを

前回の記事 では、Cloud Dataflow と Apache Beam に入門しました。その続きとして、今回は BigQuery に対してストリーミングインサートをしてみます。

また、Cloud Pub/Sub と組み合わせることで Exactly-once を実現できるので、合わせて検証します。

ソースコード

今回実装したものはこちらにあります。

github.com

Docker で Beam 環境をつくりましたので、GCP プロジェクトをお持ちであればさっと試せるようになっています。

エラーハンドリングは無なので、目をつぶっていただけると...

データソース

データソースは、GCP が公開している Topic である taxirides-realtime を使用しました。 データ構造は以下です。

ride_id:STRING
point_idx:INTEGER
latitude:FLOAT64
longitude:FLOAT64
timestamp:TIMESTAMP
meter_reading:FLOAT64
meter_increment:FLOAT64
ride_status:STRING
passenger_count:INTEGER

ちなみに、データはこちらのプログラムによって作成されているそうです。

https://github.com/GoogleCloudPlatform/nyc-taxirides-stream-feeder

事前準備

cd ./gcp
bash ./setup.sh

gcp/setup.sh で以下のリソースを作成します。

  • taxirides-realtime Topic を Pull する Subscription
  • データ保存先である BigQuery Dataset

イメージのビルド

cd ./dataflow
docker build -t pubsub-dataflow-bigquery .

google/cloud-sdk:349.0.0-slim をベースに apache_beam[gcp] をインストールしています。

処理内容

docker run コマンドで Beam パイプラインをデプロイします。ローカルに作成済なはずの configuration をコンテナにマウントしています。

docker run --rm -v ~/.config:/root/.config -e GCP_PROJECT=your-project-id pubsub-dataflow-bigquery bash /tmp/start_streaming.sh

start_streaming.sh では、ストリーミングジョブを実行しています。

python3 /tmp/taxirides-realtime.py \
    --project=${GCP_PROJECT} \
    --input_subscription="projects/${GCP_PROJECT}/subscriptions/${SUBSCRIPTION}" \
    --output_dataset=${DATASET} \
    --output_table=${SUBSCRIPTION} \
    --output_error_table=${SUBSCRIPTION}_error \
    --runner DataflowRunner \
    --enable_streaming_engine \
    --region='us-central1' \
    --worker_machine_type='n1-standard-2' \
    --num_workers=1 \
    --max_num_workers=3

--enable_streaming_engine 以降は、Dataflow ジョブのデプロイ設定です。ジョブ実行モデルを Streaming Engine にして、データ量に応じてワーカー数を 1 ~ 3 でオートスケールする設定にしています。

-autoscaling_algorithm=NONE で、オートスケーリングを明示的に無効にすることもできます。

https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline

Beam パイプライン

Beam パイプラインの本体を抜粋します。

options = PipelineOptions(beam_args, save_main_session=True, streaming=True)
pipeline = Pipeline(options=options)
rows, error_rows = (
    pipeline
    | "Read from Pub/Sub" >> ReadFromPubSub(
            subscription=input_subscription,
            with_attributes=True,
            id_label='message_id'
        )
    | "Parse JSON messages" >> ParDo(ParseMessage())
        .with_outputs(ParseMessage.OUTPUT_ERROR_TAG,main='rows')
)

_ = rows | "Write messages to BigQuery" >> WriteToBigQuery(
    output_table,
    create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=BigQueryDisposition.WRITE_APPEND,
    insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
    schema=output_table_schema,
    additional_bq_parameters={
        "timePartitioning": {
            "type": "DAY",
            "field": "timestamp",
        }
    },
)

Pipeline

Beam パイプラインを非同期実行しています。サンプルによく出てくる with beam.Pipeline(options=options) as p: という風にしていないのですが、こうすると同期実行になってしまうのです。ストリーミング処理で終わりがなく、ずっと待ちになるのであえて使用していません。with を使ったほうが見やすいので、回避方法があれば知りたいです。

ReadFromPubSub

ReadFromPubSub を使って、Cloud Pub/Sub からデータを読み込みます。今回は、先の手順で作成した Subscription を指定します。

id_label を指定した項目をチェックして、重複 pull したメッセージを削除します。この設定をすることで Exactly-once になります。 参考までに、id_label を指定せずにジョブを数分間実行してみたところ、テーブルに入力された 183,890 件 のうち 2,942 件が重複していました。

この仕組みを使うにあたって taxirides-realtimeスキーマを確認したところ、ride_idtimestamp の組合せで一意になるので、単一の項目しか指定できない id_label には利用できないことがわかりました。

そのため、今回は message_id を使うことにしました。message_id は、Pub/Sub Topic に送信されたメッセージに自動採番される ID です。この ID は Pub/Sub Topic における attributes のひとつですので、with_attributes=True の指定が必須です。これらの設定により、メッセージの重複登録を回避できることを確認しました。

message_id を利用する方法には、"Pub/Sub Topic に同じメッセージが重複入力されていた場合は別のメッセージと認識してしまう" という注意点があります。そのため、できるかぎりメッセージ内にユニークキーを設けるようにして、そのキーを id_label に指定するのがよいと思われます。

ParDo(ParseMessage())

メッセージ単位で処理を行うように指定し、message.data を JSON にパースしています。with_attributes=True としているので、message.data, message.attributes というデータ構造になります。*1このあたりで BigQuery のテーブルに合わせて整形するのですが、今回はそのままでよいので何もしていません。

WriteToBigQuery

出力テーブルの指定をします。日次パーティションを timestamp 列に設定してみました。

スキーマについては、今回はファイル読み込みをしていますが、apache_beam.io.gcp.internal.clients.bigquery の TableSchema や TableFieldSchema を使って定義することもできます。

beam/bigquery_schema.py at 034ccdf93a0c5dfe6629501b456105ac47047e44 · apache/beam · GitHub

まとめ

分散ストリーミング処理での Exactly-once をこんなに簡単にできるものかと驚きました。Transform を BigQuery & SQL でやるにしても、バッチ連携で課題になりやすい ”データ生成 〜 Load のタイムラグ” を軽減する強力な手段だと思います。

また、Cloud Pub/Sub を使用しているので、Subscription を複数作って機能拡張ができます。今回使わなかった Window で直近の速報値集計や機械学習での特徴量生成 -> 推論実行などができそうなので、夢が広がります。

*1:with_attributes=False の場合は、message.data の部分だけを取得するので、パース処理を変更する必要があります