LegalOn Technologies Engineering Blog

LegalOn Technologies 開発チームによるブログです。

Dataflow 実践開発セットアップ

こんにちは、LegalOn Technologies検索・推薦チームの志水です。

私たちのチームでは Elasticsearch へのデータのインデキシングをCloud PubSub を queue として挟んで非同期にしており、その処理にDataflowを活用しています。Dataflow(=Google managed Apache Beam) はサーバーレスでスケーラブルなデータ処理を得意としています。

Python を用いたDataflowの活用については、例えばこちらの記事が網羅的です。

この記事では似たような観点に加え、Docker上で動かす工夫やintegration testの工夫も合わせて書いていければと思います。

全体像

以下が処理の全体像です。

処理の全体像

PubSubをデータソースとし、ストリーミングデータをパイプライン(Dataflow)が処理し、結果をElasticsearchに保存します。パイプラインの中身はデータの読み取り、データの変換、データの書き出しになります。

パッケージ構成

私たちのチームではモノレポを採用しているため、例えば以下のようなパッケージ構成になります。

# tree -L 4
.
├── CODEOWNERS
├── .github
│  └── workflows
│     └── deploy-dataflow.yml
├── README.md
├── my-indexer
│  ├── docker-compose.yml
│  ├── Dockerfile
│  ├── Makefile
│  ├── my_indexer
│  │  ├── __init__.py
│  │  ├── pipeline.py
│  │  ├── run_dataflow.py
│  │  └── my_proto.py
│  ├── tests
│  │  ├── __init__.py
│  │  └── test_pipeline.py
│  ├── poetry.lock
│  ├── pyproject.toml
│  └── README.md
└──── search-lib
   ├── Makefile
   ├── poetry.lock
   ├── pyproject.toml
   ├── README.md
   ├── search_lib/
   └── tests/

indexerのアプリケーションコードは my-indexer に、そして実際の開発を進めるにあたっては自作ライブラリに依存させるパターンが多いと思いますので、その一例として search-lib を加えています。

pyproject.toml (Poetry)

依存管理のために Poetry を使います。Poetryを活用することで poetry.lock ファイルを生成し、全てのdependencyとそのversionを明示的に管理することができます。Dataflow(apache-beam)を活用するためのpyproject.tomlは例えば以下のようになります。

# pyproject.toml
[tool.poetry]
name = "my-indexer"
version = "0.1.0"
description = "my indexer"

[tool.poetry.dependencies]
python = "^3.11"
apache-beam = { version = "~2.55.1", extras = ["gcp"] }
google-cloud-pubsub = "^2.17.1"
search-lib = { path = "../search-lib", develop = true }

Google Cloud (GCP) 上で動かすための Apache Beam の依存に加え、 pubsub を活用するための依存を加えています。また my-indexer のアプリケーションコードと自作パッケージの search-lib を加えています。

Pipeline

処理の本体を書くのが、分散処理可能な関数であるDoFnになります。DoFnを組み合わせてパイプラインを書きます。

パイプラインの役割はメッセージのparseとインデキシングになります。詳細を割愛し、例えば以下のようになります。pipeline.run() するタイミングを制御するためクラスにしています。

# pipeline.py
import logging
import lzma

from apache_beam import DoFn, ParDo, io, pvalue
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
from apache_beam.runners.runner import PipelineResult

from my_indexer import Request, Response
from search_lib import get_repo


class ConvertPubsubMessageToProto(DoFn):
    def process(self, element: io.PubsubMessage):
        decompressed_data = lzma.decompress(element.data)
        proto_msg = Request()
        proto_msg.ParseFromString(decompressed_data)
        yield proto_msg
 
 class Indexer(DoFn):
    def __init__(self, *, index_name: str) -> None:
     """index name of Elasticsearch
  (a placeholder for specifc repository/storage info)
        """
        self.index_name = index_name

    def setup(self):
        """For each batch, the setup method is invoked."""
        self.repo = get_repo(self.index_name)

    def teardown(self):
        """For each batch, the teardown method is invoked."""
        self.repo.close_connection()

    def process(
        self, element: MyProto, *args, **kwargs):
        """Index/delete contents to Elasticsearch
        Args:
            element: A protobuf message containing the index/delete request
                Refer to proto definitions for the structure of element
        """
        response = Response(element)
        self.repo.index(element.request_body)
        yield response


class PipelineRunner:
    def __init__(
        self,
        subscription: str,
        index: str,
        pipeline_args: list[str] = [],
    ):
        self.subscription = subscription
        self.index = index
        self.pipeline_args = pipeline_args
        self.pipeline = None
        self.pipeline_result: Optional[PipelineResult] = None

    def run(self) -> PipelineResult:
        # Set `save_main_session` to True so DoFns can access globally imported modules.
        pipeline_options = PipelineOptions(
            self.pipeline_args, streaming=True, save_main_session=True
        )
        logging.info("Start running pipeline")
        self.pipeline = Pipeline(options=pipeline_options)

        ### Step1: Read data from Pub/Sub ###
        pubsub_result = (
            self.pipeline
            | "Read data from Pub/Sub"
            >> io.ReadFromPubSub(
                subscription=self.subscription,
                with_attributes=True,
            )
        )

        ### Step2: Convert Pub/Sub message to proto ###
        convert_result = pubsub_result | "Convert Pub/Sub to proto" >> ParDo(
            ConvertPubsubMessageToProto()
        )

        ### Step3: Index content ###
        _ = (
            convert_result | "Index content"
            >> ParDo(Indexer(self.index))
        )

        self.pipeline_result = self.pipeline.run()
        return self.pipeline_result

Integration test

DoFnのテスト

我々のチームでは検索インデックスというステートフルなものを扱うため、DoFnのテストはインテグレーションテスト(Middle size test)が多くなります。ローカルに立ち上げたElasticsearchに対してインデキシングをした結果をassertします。詳細を割愛し、例えば以下のようになります。

# tests/test_pipeline.py
from my_indexer import Indexer, get_test_indexer
from search_lib import get_test_es_repo

from apache_beam import ParDo, io
# Rename TestPipeline to _TestPipeline to avoid conflict with pytest
from apache_beam.testing.test_pipeline import TestPipeline as _TestPipeline
from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import Create


def test_index():
    indexer: Indexer = get_test_indexer()
    repo = get_test_es_repo()
    input_data = [{"request_type": "index", "transaction_id": "content01", "data": "foo"}]
    expected_output = [{"request": "successful"}]

    # Run the pipeline
    with _TestPipeline() as pipeline:
        # Create a PCollection with the input data
        input_collection = pipeline | Create(input_data)

        # Call the Indexer class with the input data
        output_collection = input_collection | ParDo(indexer)

        # Assert that the output collection matches the expected output
        assert_that(output_collection, equal_to(expected_output), reify_windows=False)

 # check Elasticsearch for relevant data
    repo.es.refresh(index=indexer.index_name)
    num_docs = repo.es.count_documents(
        index=indexer.index_name,
        query={"query": {"term": {EsProperties.CONTENT_ID: "content01"}}},
    )["count"]
    assert num_docs == 1

DirectRunnerとPubSub emulator を用いたテスト

開発中にPubSubの読み取り部分でコケる場合なども多かったので、その部分を公式のPubSubのemulator(https://cloud.google.com/pubsub/docs/emulator)を使ってローカルでテストできるようにしました。

docker-compose.yml に以下を記述しemulatorを起動します。

services:
  pubsub-emulator:
    image: google/cloud-sdk:latest
    platform: linux/amd64
    ports:
      - 8086:8086
    command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8086
    environment:
      - PUBSUB_EMULATOR_HOST=0.0.0.0:8086
    volumes:
      - pubsub_data:/opt/pubsub/data
volumes:
  pubsub_data:

DirectRunner(ローカルでパイプラインを動かすオプション)と emulatorを使ったテストは以下のようになります。

# integration_tests/test_pipeline.py
import logging

from google.cloud import pubsub_v1

from my_indexer import Request, Response, PipelineRunner


def run_pipeline_in_background(subscription):
    runner = PipelineRunner(
        subscription=subscription.name,
        index=index_name,
        pipeline_args=["--runner=DirectRunner"],
    )
    runner.run()
    yield runner
    logging.info("Stopping pipeline...")
    runner.cancel()


def test_pipeline_index_content():
 publisher = pubsub_v1.PublisherClient()
 subscriber = pubsub_v1.SubscriberClient()
 
 # create topics
 topic_path = publisher.topic_path(project_id, topic_name)
    topic = publisher.create_topic(request={"name": topic_path})

    subscription_path = subscriber.subscription_path(project_id, subscription_name)
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic.name,
        }
    )
    
    # run pipeline in background
    run_pipeline_in_background(subscription)

    # Publish message to topic.
    index_request = Request(transaction_id = "bar")
    data = lzma.compress(index_request.SerializeToString())
    publisher.publish(topic.name, data)

    # Pull message from subscription and check response.
    request = pubsub.PullRequest(
        subscription=response_subscription.name,
        max_messages=1,
    )
    response = subscriber.pull(request=request, timeout=3)

    # Deserialize received message
    response_message = Response()
    response_message.ParseFromString(response.received_messages[0].message.data)
    assert response_message.transaction_id == index_request.request_body.transaction_id
    assert response_message.num_trials == 1

Docker

上記のように自作ライブラリにDataflowのパイプラインを依存させる場合、主に以下の2つがあります。

  1. setup.py にアプリケーションコードや依存ライブラリなどをまとめる方法
  2. Dockerコンテナ化してその中に依存等を含める方法

当初は setup.py を採用していました。ただし setup.py だと Worker が実行する段階で lock で制御されないpip install が発生し、時間がかかる上に依存関係が壊れることがありました。Dockerコンテナの中に含めてしまった方が厳密に依存関係を解決・指定することができます(また予期しない依存を防ぐこともできます)。そこで私たちのチームではDocker化に effort をかけることが長期的にペイすると判断し、Dockerに移行しました。

例えば以下のようにDockerfile を定義します。

# Dockerfile
FROM python:3.11-slim-bookworm as builder

WORKDIR /app

ENV POETRY_VIRTUALENVS_IN_PROJECT=true \
    POETRY_VIRTUALENVS_CREATE=true \
    POETRY_HOME=/opt/poetry \
    POETRY_VERSION=1.6.1

RUN groupadd builder && \
    useradd -m -s /bin/bash -g builder builder && \
    chown builder:builder /app && \
    # Install dependencies
    apt-get update && \
    apt-get install -y --no-install-recommends curl git && \
    # Install poetry
    curl -sSL https://install.python-poetry.org | python3 - && \
    chmod +x /opt/poetry/bin/poetry

USER builder:builder

# copy dependencies info and search-lib
COPY --chown=builder:builder my-indexer/pyproject.toml my-indexer/poetry.lock ./
COPY --chown=builder:builder search-lib/ /search-lib

RUN /opt/poetry/bin/poetry config installer.parallel false && \
    /opt/poetry/bin/poetry install --no-interaction

FROM python:3.11-slim-bookworm

WORKDIR /app

ENV VIRTUAL_ENV=/app/.venv \
    PATH="/app/.venv/bin:$PATH" \
    RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1

# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_python3.11_sdk:2.55.1 /opt/apache/beam /opt/apache/beam

COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
COPY my-indexer/ ./

# Set the entrypoint to Apache Beam SDK launcher.
ENTRYPOINT ["/opt/apache/beam/boot"]

Python の base image を用い poetry をインストールしていることと、apache/beam_python の base image から beam に関連する部分をコピーしていることがポイントになります。このあたりはドキュメンテーションを参考にしています。

またマルチステージビルドを用いることで、実行に必要な Python 環境のみを最終的なイメージに含ませ、イメージサイズを小さくしています。

デプロイ

デプロイは GitHub Actions の workflow dispatch を使って手動で行っています。要素としては以下の3つからなります。

  • pipeline.run() する python code
  • パラメータの設定をまとめた Makefile
  • GitHub Actions の workflow file

pipeline.run

pipeline.run() のためのコードは以下になります。

# run_dataflow.py
import argparse
import logging

from my_indexer import PipelineRunner


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    logging.info("Starting pipeline")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--subscription",
        help="The Cloud Pub/Sub subscription to read from."
        '"projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>".',
    )
    parser.add_argument(
        "--index",
        help="Elasticsearch index name."
        'you can check index name with `curl -X GET "localhost:9200/_cat/indices?v"` command.',
    )
    known_args, pipeline_args = parser.parse_known_args()
    pipeline = PipelineRunner(
        subscription=known_args.subscription,
        index=known_args.index,
        pipeline_args=pipeline_args,
    )
    pipeline_result = pipeline.run()

Dataflow はコマンドラインオプションが多く開発中に追加することも多いため、known_args 以外も受け取れるように残しています。

Makefile

デプロイするための make コマンドは以下になります。環境構築をしていればローカルからでもデプロイできますが、デプロイの記録を他の人に見やすくするためにも基本的には workflow dispatch からのデプロイをチームの決め事としています。

.PHONY: run-dataflow
run-dataflow:
    poetry run python run_dataflow.py \
    # necessary options
    --runner=DataflowRunner \
    --sdk_container_image $(GCR_IMAGE_NAME):$(IMAGE_TAG) \
    --sdk_location=container \
    --save_main_session \
    
    # project specific options (to align with the example code)
    --subscription=projects/$(PROJECT_ID)/subscriptions/$(PUBSUB_SUB) \
    --index=$(ES_INDEX) \

    # not related to the blog topic, but necessary in most cases
    --dataflow_service_options=enable_google_cloud_profiler \
    --job_name=$(JOB_NAME) \
    --service_account_email=$(SERVICE_ACCOUNT) \
    --temp_location=gs://$(BUCKET_NAME)/temp

Docker化するに辺り、最初の2つのオプションを追加することになりました。特に sdk_location=container があると起動時にWorkerへのインストールが発生せず、イメージの環境を使うことで環境の再現性をあげ、インストールを省略し起動を高速化することができます。

後半のオプションは本文の趣旨とは関係ないですが、最初の方で動かない・不便になる躓きポイントだったので後学のために載せています。詳しいオプションに関してはドキュメンテーションをご覧ください。

GitHub Actions

GitHub Actions の workflow dispatch でデプロイするときの workflow file は以下のようになります。環境の選択やimage_tagの指定ができるようにしています。prd へのデプロイはリリースタグからのみにするなどの工夫があります。

name: Dataflow deployment workflow
on:
  workflow_dispatch:
    inputs:
      stage:
        description: 'Environment to deploy to'
        required: true
        type: choice
        options: ['dev', 'stg', 'prd']
      image_tag:
        description: 'Tag of the Indexer Docker image'
        required: true
        type: string
jobs:
  dispatch:
    runs-on: ubuntu-latest
    permissions:
      contents: read
      id-token: write
    steps:
      - name: Check for production deployment
        if: inputs.stage == 'prd' && !matches(github.ref, 'refs/tags/v[0-9]+\\.[0-9]+\\.[0-9]+')
        run: |
          echo "Error: Production deployment must be from release tag"
          exit 1
          
      - uses: actions/checkout@v4

      - uses: ./.github/actions/setup-action
        with:
          workdir: ${{ env.WORKDIR }}

      - name: Authenticate to GCP
        id: auth
        uses: google-github-actions/auth@v2
        with:
          workload_identity_provider: ${{ your_provider }}
          service_account: ${{ your_sa }}

      - name: Set up Google Cloud
        uses: google-github-actions/setup-gcloud@v2

      - name: deploy the pipeline
        working-directory: my-indexer
        timeout-minutes: 5  # should take ~30 seconds if successful
        run: |
          echo "Deploying to ${{ inputs.stage }} environment."
          make run-dataflow STAGE=${{ inputs.stage }} IMAGE_TAG=${{ inputs.image_tag }}

Acknowledgment

実際のコード、ブログのレビューなど、ほとんどの部分をチームで取り組みました。この取り組みは、私一人の成果ではなく、チームとしての成果です。

Dataflow調査やIndexerのコードを書いた jusui、インテグレーションテストを書いたmaomao905、Docker化を進めたKosukeArase、様々な改善をしてくれたsmiyazato、レビューしてくれたWinField95、acknowledgmentを入れたいと言い出してくれたチームリーダーのtakuyaa、チームを立ち上げたmocobetaには特に感謝です!

まとめ

駆け足になりましたが、私たちのチームで Dataflow を使い開発する際の全体像について紹介いたしました。依存を管理し、テストを充実させ、実行環境をコンテナ化しデプロイ方法を整備しました。このことにより外界の変化に強く、すばやい開発が可能になり、価値提供に繋げることができています。

現在デプロイは(workflow dipatch経由ですが)手動で行なっているため、今後ダウンタイムなしでのCDなどに挑戦していきたいと考えています。

ご意見やご感想等あればぜひお待ちしております。

終わりに

私たちのチームでは一緒に働く仲間を募集しております!もしご興味があれば以下のサイトから応募いただくか、カジュアル面談のお申し込みをしていただければ幸いです!