public note

Apache Beam Python SDK でパイプラインのテストコードを書く

Apache Beam SDK には testing パッケージが用意されており、これを使うことで Beam パイプラインに対するテストコードを書けます。 この記事では、Python SDK でのテストを試してわかったことをまとめています。誤りがありましたらご指摘いただけると嬉しいです。

検証に使ったソースコードはこちらにあります。

GitHub - tosh2230/pubsub-dataflow-bigquery: Google Cloud Dataflow for 'Exactly-Once' streaming insertion, from Google Cloud Pub/Sub to Google BigQuery.

Apache Beam そのものについては、下記の公式ページや入門したときに書いた記事を参照ください。

beam.apache.org

目的

はじめに、テストを書くモチベーションについてです。

個人的な感覚ですが、Beam パイプラインは、一般のコードと比較すると読んだだけでは挙動をイメージしにくい印象があります。 パイプラインがシンプルな構造であれば問題ないと思います。しかしストリーミング処理を目的にしていたり Window による集計処理が含まれていると、バグが潜んでいないかを見極めるのはかなり難しいです。

その理由は、ストリーミングパイプラインや Window 集計では「入力データがいつ到着したか」を考慮する必要があるためです。 処理を行う時刻や入力データの遅延状況によって期待する結果は異なりますので、パイプラインに入力するデータのバリエーションだけではなく、到着時刻のパターンと組み合わせてテストケースを検討することになります。

そうした複雑なテスト条件のすべてを E2E テストでカバーするのは難しいことから、あらかじめユニットテストでパイプラインの構成要素ごとの動作を確認しておくと、開発効率や保守性の向上を期待できます。

テストコードの基本構成

テストコードでは、通常の apache_beam.pipeline の Pipeline ではなく、apache_beam.testing.test_pipeline の TestPipeline を使います。

また、Beam パイプラインで生成されるアウトプットは基本的に PCollection なので、通常のテストコードのように expected として用意したオブジェクトとそのまま比較できません。 そのため、apache_beam.testing.util で用意されている assert_that を使います。

テストコードの大まかな流れは以下のようになります。assert_that は、with のスコープ内に記述します。

from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to

with TestPipeline() as p:
    actual = (
        p
        | # (テストデータを投入する)
        | # (テスト対象を呼び出す)
    )

    assert_that(
        actual=actual,
        matcher=equal_to(expected=expected),
        reify_windows=True,
    )

テストデータの作り方

テストデータの作成には、TimestampedValue クラスを用います。TimestampedValue は、入力データの値 (Value) と、入力データが到着した日時 (Timestamp) を組み合わせたデータ構造になっています。

この TimestampedValue で構成された itterable object をもとに PCollection を作成しますが、これには方法がいくつかあります。

Create

入力データをまとめてパイプラインに投入するときは apache_beam.transforms.core の Create に itterable object を渡します。 この方法は、主に GlobalWindow で操作する処理に対して使うものと理解しています。

from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.core import Create

with TestPipeline() as p:
    actual = (
        p
        | Create(
            values=[TimestampedValue(value=value, timestamp=timestamp)]
        )
        | # (テスト対象を呼び出す)
    )

TestStream.add_elements

先に書いたとおり、ストリーミング処理に対するテストでは、入力データの到着時刻についても考慮が必要です。 ストリームの状態を表す TestStream インスタンスをつくり、add_elements でデータを入力します。

from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue

# TestStream をつくってテストデータを入れる
stream = TestStream()
stream.advance_watermark_to(new_watermark=0)
stream.advance_processing_time(advance_by=60)
stream.add_elements(
    elements=[TimestampedValue(value=value, timestamp=timestamp)]
)

options = PipelineOptions()
standard_options = options.view_as(StandardOptions)
standard_options.streaming = True
with TestPipeline() as p:
    actual = (
        p
        | stream
        | # (テスト対象を呼び出す)
    )

advance_processing_time でストリームの現在時刻を操作して、テストデータが到着するタイミングを指定します。 また、stream.advance_watermark_to は、Window に対する遅延データの挙動をテストするときに使います。詳細は Apache Beam Programming Guide: 8.4. Watermarks and late data を参照ください。

assert_that の reify_windows フラグ

assert_that には、reify_windows というフラグがあります。

assert_that(
    actual=actual,
    matcher=equal_to(expected=expected),
    reify_windows=True,
)

これは、matcher に対して actual を TestWindowedValue に変換して渡すか(True)、もしくは actual の value のみを渡すか(False)を指定するためのものです。

TestWindowedValue は、PTransform で加工された結果である value に、timestamp と、データが属する window の情報が加わったものです。

reify_windows に指定した値によって、下記の例のように Actual が変化します。

# reify_windows=False
[
    ('ride_01', 1),
    ('ride_02', 2)
]

# reify_windows=True
[
    TestWindowedValue(value=('ride_01', 1), timestamp=Timestamp(119.999999), windows=[[0.0, 120.0)]),
    TestWindowedValue(value=('ride_02', 2), timestamp=Timestamp(119.999999), windows=[[0.0, 120.0)])
]

入力データを何らかの Window に通す処理である場合、各入力データがどの Window に入ったのかをテストしたいはずです。そうした場合は、reify_windows を True にするのが適切と思います。 一方で、GlobalWindow での処理をテストするのであれば、False にして value のみを検証対象とするので問題ないと思います。

テスト期待値の作り方

テスト期待値である expected は、先ほどの reify_windows フラグの指定に合わせて作成します。

True の場合は、TestWindowedValue で構成された itterable object です。 False の場合は、下記の例でいう value のみで構成された itterable object を用意します。

# GlobalWindow の例

from apache_beam.testing.util import TestWindowedValue
from apache_beam.transforms.window import GlobalWindow

expected = [
    TestWindowedValue(
        value=value,
        timestamp=timestamp,
        windows=[GlobalWindow()],
    )
]

GlobalWindow 以外の場合は、windows に入力データが属することを期待する Window を設定します。

# FixedWindow(size=120) の例

from apache_beam.testing.util import TestWindowedValue
from apache_beam.transforms.window import IntervalWindow

expected = [
    TestWindowedValue(
        value=value,
        timestamp=timestamp,
        windows=[IntervalWindow(start=0, end=120)],
    )
]

上記は FixedWindow を指定している場合の例です。IntervalWindow クラスを使って特定の Window を指定しています。

windows で指定する値は、利用する Window の種類に応じて変える必要があります。 Window ごとに実装されている assign 関数を確認すると、どのような値を設定すればよいかがわかります。

FixedWindow クラスの assign 関数

PTransformの切り出し

PTransform を継承したクラスを作って、パイプラインのメイン関数から切り出しておくとテストしやすくなります。 テストコードでそれをインポートすることで、パイプラインを部分的に検証できます。

下記は、PTransform を CountInFixedWindow という名称で切り出した例です。

# テスト対象
from apache_beam.transforms.combiners import Count
from apache_beam.transforms.core import Map, PTransform
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.window import Duration, FixedWindows

class CountInFixedWindow(PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | Map(fn=create_count_pair)
            | WindowInto(
                windowfn=FixedWindows(size=120),
                allowed_lateness=Duration(seconds=60),
                accumulation_mode=AccumulationMode.DISCARDING,
            )
            | Count.PerKey()
        )

pipeline = Pipeline(options=options)
count = (
    pipeline
    | "Read from Pub/Sub"
    >> ReadFromPubSub(
        subscription=input_subscription,
        with_attributes=True,
        id_label="message_id",
    )
    | CountInFixedWindow()
)

pipeline.run()
# テストコード
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline

from dataflow.taxirides_realtime_window import CountInFixedWindow

(中略)

options = PipelineOptions()
standard_options = options.view_as(StandardOptions)
standard_options.streaming = True
with TestPipeline(options=options) as p:
    actual = p | stream | CountInFixedWindow()

    assert_that(
        actual=actual,
        matcher=equal_to(expected=expected),
        reify_windows=True,
    )

検証用のコードでは Cloud Pub/Sub からメッセージを受け取っていますが、テストコードではそれをテストデータに置き換えて、PTransform に渡した結果を検証しています。

Pytest での INTERNALERROR

余談ですが、Pytest で Apache Beam の Assertion Error を拾えずに INTERNALERROR が発生するという事象に遭遇しました。ちなみに unittest だと正常に動作します。細かい原因の調査はできていないのですが、実行環境によって起きたり起きなかったりするようです。

下記の Issue にあるように、起動時に --tb=native を指定すると回避できました。

Traceback handling fails on apache_beam code · Issue #10602 · pytest-dev/pytest · GitHub