Cloud Dataflow と Cloud Pub/Sub で Exactly-once なストリーミングインサートを
前回の記事 では、Cloud Dataflow と Apache Beam に入門しました。その続きとして、今回は BigQuery に対してストリーミングインサートをしてみます。
また、Cloud Pub/Sub と組み合わせることで Exactly-once を実現できるので、合わせて検証します。
ソースコード
今回実装したものはこちらにあります。
Docker で Beam 環境をつくりましたので、GCP プロジェクトをお持ちであればさっと試せるようになっています。
エラーハンドリングは無なので、目をつぶっていただけると...
データソース
データソースは、GCP が公開している Topic である taxirides-realtime
を使用しました。
データ構造は以下です。
ride_id:STRING point_idx:INTEGER latitude:FLOAT64 longitude:FLOAT64 timestamp:TIMESTAMP meter_reading:FLOAT64 meter_increment:FLOAT64 ride_status:STRING passenger_count:INTEGER
ちなみに、データはこちらのプログラムによって作成されているそうです。
https://github.com/GoogleCloudPlatform/nyc-taxirides-stream-feeder
事前準備
cd ./gcp
bash ./setup.sh
gcp/setup.sh で以下のリソースを作成します。
taxirides-realtime
Topic を Pull する Subscription- データ保存先である BigQuery Dataset
イメージのビルド
cd ./dataflow docker build -t pubsub-dataflow-bigquery .
google/cloud-sdk:349.0.0-slim をベースに apache_beam[gcp]
をインストールしています。
処理内容
docker run コマンドで Beam パイプラインをデプロイします。ローカルに作成済なはずの configuration をコンテナにマウントしています。
docker run --rm -v ~/.config:/root/.config -e GCP_PROJECT=your-project-id pubsub-dataflow-bigquery bash /tmp/start_streaming.sh
start_streaming.sh では、ストリーミングジョブを実行しています。
python3 /tmp/taxirides-realtime.py \ --project=${GCP_PROJECT} \ --input_subscription="projects/${GCP_PROJECT}/subscriptions/${SUBSCRIPTION}" \ --output_dataset=${DATASET} \ --output_table=${SUBSCRIPTION} \ --output_error_table=${SUBSCRIPTION}_error \ --runner DataflowRunner \ --enable_streaming_engine \ --region='us-central1' \ --worker_machine_type='n1-standard-2' \ --num_workers=1 \ --max_num_workers=3
--enable_streaming_engine
以降は、Dataflow ジョブのデプロイ設定です。ジョブ実行モデルを Streaming Engine にして、データ量に応じてワーカー数を 1 ~ 3 でオートスケールする設定にしています。
-autoscaling_algorithm=NONE
で、オートスケーリングを明示的に無効にすることもできます。
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline
Beam パイプライン
Beam パイプラインの本体を抜粋します。
options = PipelineOptions(beam_args, save_main_session=True, streaming=True) pipeline = Pipeline(options=options) rows, error_rows = ( pipeline | "Read from Pub/Sub" >> ReadFromPubSub( subscription=input_subscription, with_attributes=True, id_label='message_id' ) | "Parse JSON messages" >> ParDo(ParseMessage()) .with_outputs(ParseMessage.OUTPUT_ERROR_TAG,main='rows') ) _ = rows | "Write messages to BigQuery" >> WriteToBigQuery( output_table, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_APPEND, insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR, schema=output_table_schema, additional_bq_parameters={ "timePartitioning": { "type": "DAY", "field": "timestamp", } }, )
Pipeline
Beam パイプラインを非同期実行しています。サンプルによく出てくる with beam.Pipeline(options=options) as p:
という風にしていないのですが、こうすると同期実行になってしまうのです。ストリーミング処理で終わりがなく、ずっと待ちになるのであえて使用していません。with を使ったほうが見やすいので、回避方法があれば知りたいです。
ReadFromPubSub
ReadFromPubSub を使って、Cloud Pub/Sub からデータを読み込みます。今回は、先の手順で作成した Subscription を指定します。
id_label を指定した項目をチェックして、重複 pull したメッセージを削除します。この設定をすることで Exactly-once になります。 参考までに、id_label を指定せずにジョブを数分間実行してみたところ、テーブルに入力された 183,890 件 のうち 2,942 件が重複していました。
この仕組みを使うにあたって taxirides-realtime
のスキーマを確認したところ、ride_id
と timestamp
の組合せで一意になるので、単一の項目しか指定できない id_label には利用できないことがわかりました。
そのため、今回は message_id を使うことにしました。message_id は、Pub/Sub Topic に送信されたメッセージに自動採番される ID です。この ID は Pub/Sub Topic における attributes のひとつですので、with_attributes=True
の指定が必須です。これらの設定により、メッセージの重複登録を回避できることを確認しました。
message_id を利用する方法には、"Pub/Sub Topic に同じメッセージが重複入力されていた場合は別のメッセージと認識してしまう" という注意点があります。そのため、できるかぎりメッセージ内にユニークキーを設けるようにして、そのキーを id_label に指定するのがよいと思われます。
ParDo(ParseMessage())
メッセージ単位で処理を行うように指定し、message.data を JSON にパースしています。with_attributes=True
としているので、message.data, message.attributes というデータ構造になります。*1このあたりで BigQuery のテーブルに合わせて整形するのですが、今回はそのままでよいので何もしていません。
WriteToBigQuery
出力テーブルの指定をします。日次パーティションを timestamp 列に設定してみました。
スキーマについては、今回はファイル読み込みをしていますが、apache_beam.io.gcp.internal.clients.bigquery
の TableSchema や TableFieldSchema を使って定義することもできます。
beam/bigquery_schema.py at 034ccdf93a0c5dfe6629501b456105ac47047e44 · apache/beam · GitHub
まとめ
分散ストリーミング処理での Exactly-once をこんなに簡単にできるものかと驚きました。Transform を BigQuery & SQL でやるにしても、バッチ連携で課題になりやすい ”データ生成 〜 Load のタイムラグ” を軽減する強力な手段だと思います。
また、Cloud Pub/Sub を使用しているので、Subscription を複数作って機能拡張ができます。今回使わなかった Window で直近の速報値集計や機械学習での特徴量生成 -> 推論実行などができそうなので、夢が広がります。
*1:with_attributes=False の場合は、message.data の部分だけを取得するので、パース処理を変更する必要があります