public note

Apache Beam の Side input パターンで LEFT JOIN する

Apache Beam で複数のデータソースから取得したデータを、SQL でいう LEFT JOIN する方法を試しました。 一方がストリーミングデータで、もう一方がデータウェアハウスから取得したデータといった場面を想定しています。

どうやるのか調べたところ、1つのパイプラインにデータを2系統流して、それを統合する方法をとるようです。公式ドキュメントでは Side input パターンと呼ばれていました。

beam.apache.org

このページの中で CROSS JOIN の例が紹介されているのですが、やりたいのは LEFT JOIN だったので実装してみました。ソースコードはこちらにあります。

practice-apache-beam/side_input.py at main · tosh2230/practice-apache-beam · GitHub

def left_join(main_element: dict, side_input: list[dict], key: str):
    side_input_keys: list = [element[key] for element in side_input]
    if main_element[key] not in side_input_keys:
        yield main_element
    for side_element in side_input:
        joined_element: dict = {}
        if main_element[key] == side_element[key]:
            joined_element = {**main_element, **side_element}
            yield joined_element


class LeftJoin(PTransform):
    def __init__(self, key: str, side_input: PCollection):
        self.key = key
        self.side_input = side_input

    def expand(self, pcoll: PCollection):
        return pcoll | FlatMap(
            fn=left_join, side_input=AsList(self.side_input), key=self.key
        )

テストコードはこちら。

practice-apache-beam/test_side_input.py at main · tosh2230/practice-apache-beam · GitHub

class TestLeftJoin:
    def test_left_join(self):
        options = PipelineOptions()
        standard_options = options.view_as(StandardOptions)
        standard_options.streaming = False
        standard_options.runner = "DirectRunner"
        with TestPipeline(options=options) as p:
            main_input = p | "Create main input" >> Create(
                [{"id": "a", "num1": 1}, {"id": "b", "num1": 3}, {"id": "c", "num1": 5}]
            )
            side_input = p | "Create side input" >> Create(
                [{"id": "a", "num2": 2}, {"id": "b", "num2": 4}, {"id": "d", "num2": 6}]
            )

            expected = [
                {"id": "a", "num1": 1, "num2": 2},
                {"id": "b", "num1": 3, "num2": 4},
                {"id": "c", "num1": 5},
            ]
            actual = main_input | "ApplyLeftJoin" >> LeftJoin(
                key="id", side_input=side_input
            )
            assert_that(
                actual=actual,
                matcher=equal_to(expected=expected),
                reify_windows=False,
            )

left_join 関数のあたりはもっと Beam っぽい方法があるような気がしていますが、ひとまずやりたいことはできたのでよしとします。

またこれは余談ですが、Side input はマスタデータにあたる情報である場合が多いはずで、その場合は更新頻度は低いと思われます。そんなときはデータ取得処理の手前に PeriodicImpulse を設定するのだそうです。便利そう。