こんにちは、LegalOn Technologies検索・推薦チームの志水です。
私たちのチームでは Elasticsearch へのデータのインデキシングをCloud PubSub を queue として挟んで非同期にしており、その処理にDataflowを活用しています。Dataflow(=Google managed Apache Beam) はサーバーレスでスケーラブルなデータ処理を得意としています。
Python を用いたDataflowの活用については、例えばこちらの記事が網羅的です。
この記事では似たような観点に加え、Docker上で動かす工夫やintegration testの工夫も合わせて書いていければと思います。
全体像
以下が処理の全体像です。
PubSubをデータソースとし、ストリーミングデータをパイプライン(Dataflow)が処理し、結果をElasticsearchに保存します。パイプラインの中身はデータの読み取り、データの変換、データの書き出しになります。
パッケージ構成
私たちのチームではモノレポを採用しているため、例えば以下のようなパッケージ構成になります。
# tree -L 4 . ├── CODEOWNERS ├── .github │ └── workflows │ └── deploy-dataflow.yml ├── README.md ├── my-indexer │ ├── docker-compose.yml │ ├── Dockerfile │ ├── Makefile │ ├── my_indexer │ │ ├── __init__.py │ │ ├── pipeline.py │ │ ├── run_dataflow.py │ │ └── my_proto.py │ ├── tests │ │ ├── __init__.py │ │ └── test_pipeline.py │ ├── poetry.lock │ ├── pyproject.toml │ └── README.md └──── search-lib ├── Makefile ├── poetry.lock ├── pyproject.toml ├── README.md ├── search_lib/ └── tests/
indexerのアプリケーションコードは my-indexer
に、そして実際の開発を進めるにあたっては自作ライブラリに依存させるパターンが多いと思いますので、その一例として search-lib
を加えています。
pyproject.toml (Poetry)
依存管理のために Poetry を使います。Poetryを活用することで poetry.lock ファイルを生成し、全てのdependencyとそのversionを明示的に管理することができます。Dataflow(apache-beam)を活用するためのpyproject.toml
は例えば以下のようになります。
# pyproject.toml [tool.poetry] name = "my-indexer" version = "0.1.0" description = "my indexer" [tool.poetry.dependencies] python = "^3.11" apache-beam = { version = "~2.55.1", extras = ["gcp"] } google-cloud-pubsub = "^2.17.1" search-lib = { path = "../search-lib", develop = true }
Google Cloud (GCP) 上で動かすための Apache Beam の依存に加え、
pubsub を活用するための依存を加えています。また my-indexer
のアプリケーションコードと自作パッケージの search-lib
を加えています。
Pipeline
処理の本体を書くのが、分散処理可能な関数であるDoFnになります。DoFnを組み合わせてパイプラインを書きます。
パイプラインの役割はメッセージのparseとインデキシングになります。詳細を割愛し、例えば以下のようになります。pipeline.run()
するタイミングを制御するためクラスにしています。
# pipeline.py import logging import lzma from apache_beam import DoFn, ParDo, io, pvalue from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import Pipeline from apache_beam.runners.runner import PipelineResult from my_indexer import Request, Response from search_lib import get_repo class ConvertPubsubMessageToProto(DoFn): def process(self, element: io.PubsubMessage): decompressed_data = lzma.decompress(element.data) proto_msg = Request() proto_msg.ParseFromString(decompressed_data) yield proto_msg class Indexer(DoFn): def __init__(self, *, index_name: str) -> None: """index name of Elasticsearch (a placeholder for specifc repository/storage info) """ self.index_name = index_name def setup(self): """For each batch, the setup method is invoked.""" self.repo = get_repo(self.index_name) def teardown(self): """For each batch, the teardown method is invoked.""" self.repo.close_connection() def process( self, element: MyProto, *args, **kwargs): """Index/delete contents to Elasticsearch Args: element: A protobuf message containing the index/delete request Refer to proto definitions for the structure of element """ response = Response(element) self.repo.index(element.request_body) yield response class PipelineRunner: def __init__( self, subscription: str, index: str, pipeline_args: list[str] = [], ): self.subscription = subscription self.index = index self.pipeline_args = pipeline_args self.pipeline = None self.pipeline_result: Optional[PipelineResult] = None def run(self) -> PipelineResult: # Set `save_main_session` to True so DoFns can access globally imported modules. pipeline_options = PipelineOptions( self.pipeline_args, streaming=True, save_main_session=True ) logging.info("Start running pipeline") self.pipeline = Pipeline(options=pipeline_options) ### Step1: Read data from Pub/Sub ### pubsub_result = ( self.pipeline | "Read data from Pub/Sub" >> io.ReadFromPubSub( subscription=self.subscription, with_attributes=True, ) ) ### Step2: Convert Pub/Sub message to proto ### convert_result = pubsub_result | "Convert Pub/Sub to proto" >> ParDo( ConvertPubsubMessageToProto() ) ### Step3: Index content ### _ = ( convert_result | "Index content" >> ParDo(Indexer(self.index)) ) self.pipeline_result = self.pipeline.run() return self.pipeline_result
Integration test
DoFnのテスト
我々のチームでは検索インデックスというステートフルなものを扱うため、DoFnのテストはインテグレーションテスト(Middle size test)が多くなります。ローカルに立ち上げたElasticsearchに対してインデキシングをした結果をassertします。詳細を割愛し、例えば以下のようになります。
# tests/test_pipeline.py from my_indexer import Indexer, get_test_indexer from search_lib import get_test_es_repo from apache_beam import ParDo, io # Rename TestPipeline to _TestPipeline to avoid conflict with pytest from apache_beam.testing.test_pipeline import TestPipeline as _TestPipeline from apache_beam.testing.util import assert_that, equal_to from apache_beam.transforms import Create def test_index(): indexer: Indexer = get_test_indexer() repo = get_test_es_repo() input_data = [{"request_type": "index", "transaction_id": "content01", "data": "foo"}] expected_output = [{"request": "successful"}] # Run the pipeline with _TestPipeline() as pipeline: # Create a PCollection with the input data input_collection = pipeline | Create(input_data) # Call the Indexer class with the input data output_collection = input_collection | ParDo(indexer) # Assert that the output collection matches the expected output assert_that(output_collection, equal_to(expected_output), reify_windows=False) # check Elasticsearch for relevant data repo.es.refresh(index=indexer.index_name) num_docs = repo.es.count_documents( index=indexer.index_name, query={"query": {"term": {EsProperties.CONTENT_ID: "content01"}}}, )["count"] assert num_docs == 1
DirectRunnerとPubSub emulator を用いたテスト
開発中にPubSubの読み取り部分でコケる場合なども多かったので、その部分を公式のPubSubのemulator(https://cloud.google.com/pubsub/docs/emulator)を使ってローカルでテストできるようにしました。
docker-compose.yml に以下を記述しemulatorを起動します。
services: pubsub-emulator: image: google/cloud-sdk:latest platform: linux/amd64 ports: - 8086:8086 command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8086 environment: - PUBSUB_EMULATOR_HOST=0.0.0.0:8086 volumes: - pubsub_data:/opt/pubsub/data volumes: pubsub_data:
DirectRunner(ローカルでパイプラインを動かすオプション)と emulatorを使ったテストは以下のようになります。
# integration_tests/test_pipeline.py import logging from google.cloud import pubsub_v1 from my_indexer import Request, Response, PipelineRunner def run_pipeline_in_background(subscription): runner = PipelineRunner( subscription=subscription.name, index=index_name, pipeline_args=["--runner=DirectRunner"], ) runner.run() yield runner logging.info("Stopping pipeline...") runner.cancel() def test_pipeline_index_content(): publisher = pubsub_v1.PublisherClient() subscriber = pubsub_v1.SubscriberClient() # create topics topic_path = publisher.topic_path(project_id, topic_name) topic = publisher.create_topic(request={"name": topic_path}) subscription_path = subscriber.subscription_path(project_id, subscription_name) subscription = subscriber.create_subscription( request={ "name": subscription_path, "topic": topic.name, } ) # run pipeline in background run_pipeline_in_background(subscription) # Publish message to topic. index_request = Request(transaction_id = "bar") data = lzma.compress(index_request.SerializeToString()) publisher.publish(topic.name, data) # Pull message from subscription and check response. request = pubsub.PullRequest( subscription=response_subscription.name, max_messages=1, ) response = subscriber.pull(request=request, timeout=3) # Deserialize received message response_message = Response() response_message.ParseFromString(response.received_messages[0].message.data) assert response_message.transaction_id == index_request.request_body.transaction_id assert response_message.num_trials == 1
Docker
上記のように自作ライブラリにDataflowのパイプラインを依存させる場合、主に以下の2つがあります。
setup.py
にアプリケーションコードや依存ライブラリなどをまとめる方法- Dockerコンテナ化してその中に依存等を含める方法
当初は setup.py
を採用していました。ただし setup.py だと Worker が実行する段階で lock で制御されないpip install が発生し、時間がかかる上に依存関係が壊れることがありました。Dockerコンテナの中に含めてしまった方が厳密に依存関係を解決・指定することができます(また予期しない依存を防ぐこともできます)。そこで私たちのチームではDocker化に effort をかけることが長期的にペイすると判断し、Dockerに移行しました。
例えば以下のようにDockerfile を定義します。
# Dockerfile FROM python:3.11-slim-bookworm as builder WORKDIR /app ENV POETRY_VIRTUALENVS_IN_PROJECT=true \ POETRY_VIRTUALENVS_CREATE=true \ POETRY_HOME=/opt/poetry \ POETRY_VERSION=1.6.1 RUN groupadd builder && \ useradd -m -s /bin/bash -g builder builder && \ chown builder:builder /app && \ # Install dependencies apt-get update && \ apt-get install -y --no-install-recommends curl git && \ # Install poetry curl -sSL https://install.python-poetry.org | python3 - && \ chmod +x /opt/poetry/bin/poetry USER builder:builder # copy dependencies info and search-lib COPY --chown=builder:builder my-indexer/pyproject.toml my-indexer/poetry.lock ./ COPY --chown=builder:builder search-lib/ /search-lib RUN /opt/poetry/bin/poetry config installer.parallel false && \ /opt/poetry/bin/poetry install --no-interaction FROM python:3.11-slim-bookworm WORKDIR /app ENV VIRTUAL_ENV=/app/.venv \ PATH="/app/.venv/bin:$PATH" \ RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1 # Copy files from official SDK image, including script/dependencies. COPY --from=apache/beam_python3.11_sdk:2.55.1 /opt/apache/beam /opt/apache/beam COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV} COPY my-indexer/ ./ # Set the entrypoint to Apache Beam SDK launcher. ENTRYPOINT ["/opt/apache/beam/boot"]
Python の base image を用い poetry をインストールしていることと、apache/beam_python の base image から beam に関連する部分をコピーしていることがポイントになります。このあたりはドキュメンテーションを参考にしています。
またマルチステージビルドを用いることで、実行に必要な Python 環境のみを最終的なイメージに含ませ、イメージサイズを小さくしています。
デプロイ
デプロイは GitHub Actions の workflow dispatch を使って手動で行っています。要素としては以下の3つからなります。
pipeline.run
pipeline.run() のためのコードは以下になります。
# run_dataflow.py import argparse import logging from my_indexer import PipelineRunner if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) logging.info("Starting pipeline") parser = argparse.ArgumentParser() parser.add_argument( "--subscription", help="The Cloud Pub/Sub subscription to read from." '"projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>".', ) parser.add_argument( "--index", help="Elasticsearch index name." 'you can check index name with `curl -X GET "localhost:9200/_cat/indices?v"` command.', ) known_args, pipeline_args = parser.parse_known_args() pipeline = PipelineRunner( subscription=known_args.subscription, index=known_args.index, pipeline_args=pipeline_args, ) pipeline_result = pipeline.run()
Dataflow はコマンドラインオプションが多く開発中に追加することも多いため、known_args 以外も受け取れるように残しています。
Makefile
デプロイするための make コマンドは以下になります。環境構築をしていればローカルからでもデプロイできますが、デプロイの記録を他の人に見やすくするためにも基本的には workflow dispatch からのデプロイをチームの決め事としています。
.PHONY: run-dataflow run-dataflow: poetry run python run_dataflow.py \ # necessary options --runner=DataflowRunner \ --sdk_container_image $(GCR_IMAGE_NAME):$(IMAGE_TAG) \ --sdk_location=container \ --save_main_session \ # project specific options (to align with the example code) --subscription=projects/$(PROJECT_ID)/subscriptions/$(PUBSUB_SUB) \ --index=$(ES_INDEX) \ # not related to the blog topic, but necessary in most cases --dataflow_service_options=enable_google_cloud_profiler \ --job_name=$(JOB_NAME) \ --service_account_email=$(SERVICE_ACCOUNT) \ --temp_location=gs://$(BUCKET_NAME)/temp
Docker化するに辺り、最初の2つのオプションを追加することになりました。特に sdk_location=container
があると起動時にWorkerへのインストールが発生せず、イメージの環境を使うことで環境の再現性をあげ、インストールを省略し起動を高速化することができます。
後半のオプションは本文の趣旨とは関係ないですが、最初の方で動かない・不便になる躓きポイントだったので後学のために載せています。詳しいオプションに関してはドキュメンテーションをご覧ください。
GitHub Actions
GitHub Actions の workflow dispatch でデプロイするときの workflow file は以下のようになります。環境の選択やimage_tagの指定ができるようにしています。prd へのデプロイはリリースタグからのみにするなどの工夫があります。
name: Dataflow deployment workflow on: workflow_dispatch: inputs: stage: description: 'Environment to deploy to' required: true type: choice options: ['dev', 'stg', 'prd'] image_tag: description: 'Tag of the Indexer Docker image' required: true type: string jobs: dispatch: runs-on: ubuntu-latest permissions: contents: read id-token: write steps: - name: Check for production deployment if: inputs.stage == 'prd' && !matches(github.ref, 'refs/tags/v[0-9]+\\.[0-9]+\\.[0-9]+') run: | echo "Error: Production deployment must be from release tag" exit 1 - uses: actions/checkout@v4 - uses: ./.github/actions/setup-action with: workdir: ${{ env.WORKDIR }} - name: Authenticate to GCP id: auth uses: google-github-actions/auth@v2 with: workload_identity_provider: ${{ your_provider }} service_account: ${{ your_sa }} - name: Set up Google Cloud uses: google-github-actions/setup-gcloud@v2 - name: deploy the pipeline working-directory: my-indexer timeout-minutes: 5 # should take ~30 seconds if successful run: | echo "Deploying to ${{ inputs.stage }} environment." make run-dataflow STAGE=${{ inputs.stage }} IMAGE_TAG=${{ inputs.image_tag }}
Acknowledgment
実際のコード、ブログのレビューなど、ほとんどの部分をチームで取り組みました。この取り組みは、私一人の成果ではなく、チームとしての成果です。
Dataflow調査やIndexerのコードを書いた jusui、インテグレーションテストを書いたmaomao905、Docker化を進めたKosukeArase、様々な改善をしてくれたsmiyazato、レビューしてくれたWinField95、acknowledgmentを入れたいと言い出してくれたチームリーダーのtakuyaa、チームを立ち上げたmocobetaには特に感謝です!
まとめ
駆け足になりましたが、私たちのチームで Dataflow を使い開発する際の全体像について紹介いたしました。依存を管理し、テストを充実させ、実行環境をコンテナ化しデプロイ方法を整備しました。このことにより外界の変化に強く、すばやい開発が可能になり、価値提供に繋げることができています。
現在デプロイは(workflow dipatch経由ですが)手動で行なっているため、今後ダウンタイムなしでのCDなどに挑戦していきたいと考えています。
ご意見やご感想等あればぜひお待ちしております。
終わりに
私たちのチームでは一緒に働く仲間を募集しております!もしご興味があれば以下のサイトから応募いただくか、カジュアル面談のお申し込みをしていただければ幸いです!