BigQuery にストリーミングインサートしたい気持ちが高まってきて Cloud Dataflow と Apache Beam に入門しました。Cloud Pub/Sub -> Cloud Dataflow -> BigQuery のルートで取り込むにあたり、事前知識を得ることが目的です。
Apache Beam
特徴
以下のような特徴があるようです。
- バッチ/ストリーミング の両方に対応
- バックエンドとなる Runner を選ぶことができる(Apache Spark, Apache Flink, Cloud Dataflow...)
- メッセージ単位での加工
- 指定ウィンドウ単位での集約
- Java, Python, Go に対応
Tour of Beam
Beam のお作法を知るうえで役立ったのが、チュートリアルである Tour of Beam です。Google Colaboratory で、PCollection や PTransform の概念、パイプライン構築について丁寧に解説しています。
Transform
Tour of Beam で挙げられていたものを紹介します。Python の Transform 一覧はこちらです。
Map
Collection に含まれる element ごとに処理を行い、element 単位での入力と出力の関係は 1 : 1 です。
import apache_beam as beam with beam.Pipeline() as pipeline: plants = ( pipeline | 'Gardening plants' >> beam.Create([ '# 🍓Strawberry\n', '# 🥕Carrot\n', '# 🍆Eggplant\n', '# 🍅Tomato\n', '# 🥔Potato\n', ]) | 'Strip header' >> beam.Map(lambda text: text.strip('# \n')) | beam.Map(print))
FlatMap
Collection に含まれる element ごとに処理を行い、element 単位での入力と出力の関係は 1 : n です。
import apache_beam as beam with beam.Pipeline() as pipeline: plants = ( pipeline | 'Gardening plants' >> beam.Create([ ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'], ['🍅Tomato', '🥔Potato'], ]) | 'Flatten lists' >> beam.FlatMap(lambda elements: elements) | beam.Map(print))
Filter
指定した条件が True になる element のみを抽出し、collection として出力します。
import apache_beam as beam with beam.Pipeline() as pipeline: perennials = ( pipeline | 'Gardening plants' >> beam.Create([ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}, {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'}, {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}, {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}, {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}, ]) | 'Filter perennials' >> beam.Filter(lambda plant: plant['duration'] == 'perennial') | beam.Map(print))
Partition
ひとつの Collection に含まれる element を、複数の Collection に分割します。
import apache_beam as beam durations = ['annual', 'biennial', 'perennial'] with beam.Pipeline() as pipeline: annuals, biennials, perennials = ( pipeline | 'Gardening plants' >> beam.Create([ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}, {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'}, {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}, {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}, {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}, ]) | 'Partition' >> beam.Partition( lambda plant, num_partitions: durations.index(plant['duration']), len(durations), ) ) annuals | 'Annuals' >> beam.Map(lambda x: print('annual: {}'.format(x))) biennials | 'Biennials' >> beam.Map( lambda x: print('biennial: {}'.format(x))) perennials | 'Perennials' >> beam.Map( lambda x: print('perennial: {}'.format(x)))
ParDo
データベースやネットワークへの接続など、分散した処理ごとにセッションを作るとボトルネックになる場合に beam.ParDo
を使います。これによって、接続をワーカー単位にまとめることができます。
https://beam.apache.org/documentation/programming-guide/#pardo
ParDo で指定するクラスでは、以下のように5つの関数を作成します。
import apache_beam as beam class DoFnMethods(beam.DoFn): def __init__(self): print('__init__') self.window = beam.window.GlobalWindow() def setup(self): print('setup') def start_bundle(self): print('start_bundle') def process(self, element, window=beam.DoFn.WindowParam): self.window = window yield '* process: ' + element def finish_bundle(self): yield beam.utils.windowed_value.WindowedValue( value='* finish_bundle: 🌱🌳🌍', timestamp=0, windows=[self.window], ) def teardown(self): print('teardown') with beam.Pipeline() as pipeline: results = ( pipeline | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔']) | 'DoFn methods' >> beam.ParDo(DoFnMethods()) | beam.Map(print))
setup()
DoFn インスタンスがデシリアライズされるたびに呼び出されます。データベースやネットワークなどのリソースへの接続に使います。
start_bundle()
Beam では、Runner によって collection を bundle という単位に分割して処理を行います。この bundle の最初の element に対する process() が実行される前に一度だけ実行されます。使用例には、bundle に 含まれる element のトレーシングがあるようです。
process()
第一引数である element ごとに処理を行います。
finish_bundle()
bundle の最後の element に対する process() が実行された後に一度だけ実行されます。この仕様を活かして、bundle に含まれる element 全体に対してバッチ処理を実行する、という使い方もできるようです。
teardown()
DoFn インスタンスがシャットダウンされるときに呼び出されます。setup() でコネクションを生成していた場合、こちらでクローズします。teardown() はなるべく一度だけ実行するという仕様だそうで、これはベストエフォートであって保証はされないとのことです。
Custom Transform
関数に @beam.ptransform_fn
デコレータ をつけると Custom Transform として扱うことができます。最初の引数は PCollection で、戻り値ももちろん PCollection です。
また、PipelineOptions の type_check_additional
で型ヒントを指定できます。
型ヒントを有効にして PTransform に対して以下のようにデコレータをつけると、in/out の PCollection の型をヒントとして指定できます。
@beam.ptransform_fn @beam.typehints.with_input_types(beam.pvalue.PBegin) @beam.typehints.with_output_types(Dict[str, str]) def ReadCsvFiles(pbegin: beam.pvalue.PBegin, file_patterns: List[str]) -> beam.PCollection[Dict[str, str]]: ...
Windowing
ストリーミング処理で処理対象をグルーピングするウインドウを指定できます。デフォルトは Global Window で、すべての Input collection が対象となります。
https://beam.apache.org/documentation/programming-guide/#windowing
Fixed windows
指定した window_size でウインドウ幅 を固定します。
Sliding windows
Window がその前後と重複するように設定できます。window_size と window_period を指定し、(window_size - window_period) の期間だけ、前の Window と重複するようになります。直近 n 時間の集計や移動平均を出すときに便利です。
Session windows
最後にウインドウに入ったイベントから指定した期間を経過した場合に、別のウインドウに切り替わります。データがない場合はウインドウの間に空白期間が生まれます。
Beam パイプラインの設計
Tour of Beam をとおして、以下のような工程で設計すればよいのかな?という感覚をつかめたのがよかったです。
- データ加工の全体像を決める
- ステップごとの Collection の状態をイメージする(絵を書くとか)
- その状態をつくれる Transform を決める
- どこで Map して Reduce するのかを決める
- パイプラインの入口をつくる
- ステップごとのTransformについて、(よさそうなのを探す|カスタマイズしたのをつくる)
- パイプラインの出口をつくる
Cloud Dataflow
特徴
- サーバーレスな Beam Runner
- 負荷に応じた水平自動スケーリング
- Java, Python に対応
- デッドレター転送
- Pub/Subと組み合わせることで Exactly once を実現 cloud.google.com
こちらの Google Cloud Next '18 のセッションがわかりやすかったです。
Pub/Sub to BigQuery sample
Cloud Pub/Sub(JSON) -> Cloud Dataflow -> BigQuery のサンプルコードを GCP が 提供しています。チュートリアルをやったあとなら理解できそうです。
Exactly once の実現
ReadFromPubSub クラスのソースコードにあるとおり、メッセージを一意に特定できる ID を引数の id_label
を指定すると、ReadFromPubSub クラスがメッセージの重複を排除してくれます。Pub/Sub は At least once なので、Dataflow から Subscribe することで楽に Exactly Once を実現することができます。
2021/07/29 追記: Exactly Once の実現について、こちらに続きを書きました。