public note

Cloud Dataflow と Apache Beam に入門した

BigQuery にストリーミングインサートしたい気持ちが高まってきて Cloud Dataflow と Apache Beam に入門しました。Cloud Pub/Sub -> Cloud Dataflow -> BigQuery のルートで取り込むにあたり、事前知識を得ることが目的です。

Apache Beam

beam.apache.org

特徴

以下のような特徴があるようです。

  • バッチ/ストリーミング の両方に対応
  • バックエンドとなる Runner を選ぶことができる(Apache Spark, Apache Flink, Cloud Dataflow...)
  • メッセージ単位での加工
  • 指定ウィンドウ単位での集約
  • Java, Python, Go に対応

Tour of Beam

Beam のお作法を知るうえで役立ったのが、チュートリアルである Tour of Beam です。Google Colaboratory で、PCollection や PTransform の概念、パイプライン構築について丁寧に解説しています。

beam.apache.org

Transform

Tour of Beam で挙げられていたものを紹介します。Python の Transform 一覧はこちらです。

beam.apache.org

Map

Collection に含まれる element ごとに処理を行い、element 単位での入力と出力の関係は 1 : 1 です。

import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(lambda text: text.strip('# \n'))
      | beam.Map(print))

https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/documentation/transforms/python/elementwise/map-py.ipynb

FlatMap

Collection に含まれる element ごとに処理を行い、element 単位での入力と出力の関係は 1 : n です。

import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
          ['🍅Tomato', '🥔Potato'],
      ])
      | 'Flatten lists' >> beam.FlatMap(lambda elements: elements)
      | beam.Map(print))

https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/documentation/transforms/python/elementwise/flatmap-py.ipynb

Filter

指定した条件が True になる element のみを抽出し、collection として出力します。

import apache_beam as beam

with beam.Pipeline() as pipeline:
  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
      ])
      | 'Filter perennials' >>
      beam.Filter(lambda plant: plant['duration'] == 'perennial')
      | beam.Map(print))

https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/documentation/transforms/python/elementwise/filter-py.ipynb

Partition

ひとつの Collection に含まれる element を、複数の Collection に分割します。

import apache_beam as beam

durations = ['annual', 'biennial', 'perennial']

with beam.Pipeline() as pipeline:
  annuals, biennials, perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
      ])
      | 'Partition' >> beam.Partition(
          lambda plant, num_partitions: durations.index(plant['duration']),
          len(durations),
      )
  )

  annuals | 'Annuals' >> beam.Map(lambda x: print('annual: {}'.format(x)))
  biennials | 'Biennials' >> beam.Map(
      lambda x: print('biennial: {}'.format(x)))
  perennials | 'Perennials' >> beam.Map(
      lambda x: print('perennial: {}'.format(x)))

https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/documentation/transforms/python/elementwise/partition-py.ipynb

ParDo

データベースやネットワークへの接続など、分散した処理ごとにセッションを作るとボトルネックになる場合に beam.ParDo を使います。これによって、接続をワーカー単位にまとめることができます。

https://beam.apache.org/documentation/programming-guide/#pardo

ParDo で指定するクラスでは、以下のように5つの関数を作成します。

import apache_beam as beam

class DoFnMethods(beam.DoFn):
  def __init__(self):
    print('__init__')
    self.window = beam.window.GlobalWindow()

  def setup(self):
    print('setup')

  def start_bundle(self):
    print('start_bundle')

  def process(self, element, window=beam.DoFn.WindowParam):
    self.window = window
    yield '* process: ' + element

  def finish_bundle(self):
    yield beam.utils.windowed_value.WindowedValue(
        value='* finish_bundle: 🌱🌳🌍',
        timestamp=0,
        windows=[self.window],
    )

  def teardown(self):
    print('teardown')

with beam.Pipeline() as pipeline:
  results = (
      pipeline
      | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
      | 'DoFn methods' >> beam.ParDo(DoFnMethods())
      | beam.Map(print))

https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/documentation/transforms/python/elementwise/pardo-py.ipynb

setup()

DoFn インスタンスがデシリアライズされるたびに呼び出されます。データベースやネットワークなどのリソースへの接続に使います。

start_bundle()

Beam では、Runner によって collection を bundle という単位に分割して処理を行います。この bundle の最初の element に対する process() が実行される前に一度だけ実行されます。使用例には、bundle に 含まれる element のトレーシングがあるようです。

beam.apache.org

process()

第一引数である element ごとに処理を行います。

finish_bundle()

bundle の最後の element に対する process() が実行された後に一度だけ実行されます。この仕様を活かして、bundle に含まれる element 全体に対してバッチ処理を実行する、という使い方もできるようです。

teardown()

DoFn インスタンスがシャットダウンされるときに呼び出されます。setup() でコネクションを生成していた場合、こちらでクローズします。teardown() はなるべく一度だけ実行するという仕様だそうで、これはベストエフォートであって保証はされないとのことです。

Custom Transform

関数に @beam.ptransform_fn デコレータ をつけると Custom Transform として扱うことができます。最初の引数は PCollection で、戻り値ももちろん PCollection です。

また、PipelineOptions の type_check_additional で型ヒントを指定できます。

型ヒントを有効にして PTransform に対して以下のようにデコレータをつけると、in/out の PCollection の型をヒントとして指定できます。

@beam.ptransform_fn
@beam.typehints.with_input_types(beam.pvalue.PBegin)
@beam.typehints.with_output_types(Dict[str, str])
def ReadCsvFiles(pbegin: beam.pvalue.PBegin, file_patterns: List[str]) -> beam.PCollection[Dict[str, str]]:
...

beam.apache.org

Windowing

ストリーミング処理で処理対象をグルーピングするウインドウを指定できます。デフォルトは Global Window で、すべての Input collection が対象となります。

https://beam.apache.org/documentation/programming-guide/#windowing

Fixed windows

指定した window_size でウインドウ幅 を固定します。

Sliding windows

Window がその前後と重複するように設定できます。window_size と window_period を指定し、(window_size - window_period) の期間だけ、前の Window と重複するようになります。直近 n 時間の集計や移動平均を出すときに便利です。

Session windows

最後にウインドウに入ったイベントから指定した期間を経過した場合に、別のウインドウに切り替わります。データがない場合はウインドウの間に空白期間が生まれます。

Beam パイプラインの設計

Tour of Beam をとおして、以下のような工程で設計すればよいのかな?という感覚をつかめたのがよかったです。

  • データ加工の全体像を決める
  • ステップごとの Collection の状態をイメージする(絵を書くとか)
  • その状態をつくれる Transform を決める
  • どこで Map して Reduce するのかを決める
  • パイプラインの入口をつくる
  • ステップごとのTransformについて、(よさそうなのを探す|カスタマイズしたのをつくる)
  • パイプラインの出口をつくる

Cloud Dataflow

cloud.google.com

特徴

  • サーバーレスな Beam Runner
  • 負荷に応じた水平自動スケーリング
  • Java, Python に対応
  • デッドレター転送
  • Pub/Subと組み合わせることで Exactly once を実現 cloud.google.com

こちらの Google Cloud Next '18 のセッションがわかりやすかったです。

www.youtube.com

Pub/Sub to BigQuery sample

Cloud Pub/Sub(JSON) -> Cloud Dataflow -> BigQuery のサンプルコードを GCP が 提供しています。チュートリアルをやったあとなら理解できそうです。

github.com

Exactly once の実現

ReadFromPubSub クラスのソースコードにあるとおり、メッセージを一意に特定できる ID を引数の id_label を指定すると、ReadFromPubSub クラスがメッセージの重複を排除してくれます。Pub/Sub は At least once なので、Dataflow から Subscribe することで楽に Exactly Once を実現することができます。


2021/07/29 追記: Exactly Once の実現について、こちらに続きを書きました。

ts223.hatenablog.com