Apache Beam の Side input パターンで LEFT JOIN する
Apache Beam で複数のデータソースから取得したデータを、SQL でいう LEFT JOIN する方法を試しました。 一方がストリーミングデータで、もう一方がデータウェアハウスから取得したデータといった場面を想定しています。
どうやるのか調べたところ、1つのパイプラインにデータを2系統流して、それを統合する方法をとるようです。公式ドキュメントでは Side input パターンと呼ばれていました。
このページの中で CROSS JOIN の例が紹介されているのですが、やりたいのは LEFT JOIN だったので実装してみました。ソースコードはこちらにあります。
practice-apache-beam/side_input.py at main · tosh2230/practice-apache-beam · GitHub
def left_join(main_element: dict, side_input: list[dict], key: str): side_input_keys: list = [element[key] for element in side_input] if main_element[key] not in side_input_keys: yield main_element for side_element in side_input: joined_element: dict = {} if main_element[key] == side_element[key]: joined_element = {**main_element, **side_element} yield joined_element class LeftJoin(PTransform): def __init__(self, key: str, side_input: PCollection): self.key = key self.side_input = side_input def expand(self, pcoll: PCollection): return pcoll | FlatMap( fn=left_join, side_input=AsList(self.side_input), key=self.key )
テストコードはこちら。
practice-apache-beam/test_side_input.py at main · tosh2230/practice-apache-beam · GitHub
class TestLeftJoin: def test_left_join(self): options = PipelineOptions() standard_options = options.view_as(StandardOptions) standard_options.streaming = False standard_options.runner = "DirectRunner" with TestPipeline(options=options) as p: main_input = p | "Create main input" >> Create( [{"id": "a", "num1": 1}, {"id": "b", "num1": 3}, {"id": "c", "num1": 5}] ) side_input = p | "Create side input" >> Create( [{"id": "a", "num2": 2}, {"id": "b", "num2": 4}, {"id": "d", "num2": 6}] ) expected = [ {"id": "a", "num1": 1, "num2": 2}, {"id": "b", "num1": 3, "num2": 4}, {"id": "c", "num1": 5}, ] actual = main_input | "ApplyLeftJoin" >> LeftJoin( key="id", side_input=side_input ) assert_that( actual=actual, matcher=equal_to(expected=expected), reify_windows=False, )
left_join 関数のあたりはもっと Beam っぽい方法があるような気がしていますが、ひとまずやりたいことはできたのでよしとします。
またこれは余談ですが、Side input はマスタデータにあたる情報である場合が多いはずで、その場合は更新頻度は低いと思われます。そんなときはデータ取得処理の手前に PeriodicImpulse を設定するのだそうです。便利そう。