はじめに
こんにちは、株式会社LegalOn Technologies (検索・推薦チーム) の佐藤 (maomao905) です。
私たちのチームでは、Google Cloud の Pub/Sub 経由で受信したリクエストを元に、LLM (大規模言語モデル) を用いて情報抽出を行い、その結果を検索エンジンに保存するパイプラインを運用しています。
この記事では、Pub/Sub で正常に処理されたメッセージが Dead Letter Queue に送られる問題の原因と対策を解説します。LLM を利用するなど、メッセージ 1 件あたりの処理時間が長いケースで発生しやすい現象であり、一度に処理するメッセージ数を制限することで解決しました。また調査過程で得られた Python Pub/Sub Client の内部実装に関する知見もあわせて紹介します。
想定読者
- Pub/Sub を実運用している方
- Python の Pub/Sub Client を使用している方・検討している方
背景
私たちのパイプラインは、Pub/Sub を介してリクエストを受け取り、それをもとに LLM を利用した情報抽出処理を行った上で検索エンジンに保存をしています。Subscriber 側は、Python の Pub/Sub Clientライブラリ を使い、Streaming Pull でメッセージを取得しています。
処理が何度か失敗すると、メッセージは Dead Letter Queue (DLQ) に移動します。DLQ とは、正常に処理されなかったメッセージが蓄積される特別なキューです。
Python Pub/Sub Client では、Streaming Pull の際に指定した max_messages
(デフォルト: 1000) の数だけメッセージをまとめて Pull する仕組みになっています。この Pull されたメッセージ群は、Subscriber 内部のキューに蓄積され、順次処理されます。
用語の説明
まずこの後の説明で重要となる用語を説明します。
Ack 期限(Ack Deadline)
Pub/Sub Subscriber がメッセージを受け取った後、その処理完了を Pub/Sub へ通知する(Ackを返す)までの猶予時間です。
デフォルトは 10秒 に設定されており、この期間内に処理が完了して Ack を返さないと、メッセージは再配信されます。ただし、Subscriber 側でこの Ack 期限を延長することが可能です。
Ack 期限は、ModifyAckDeadlineRequest
の ack_deadline_seconds
で指定できます。
最大リース期間(Maximum Lease Duration)
Subscriber 側が Ack 期限を延長できる最大の時間です。
デフォルトは 3600秒(1時間) に設定されています。この時間を超えて Subscriber が Ack を返さない場合、Subscriber 側はそれ以上期限延長を行わず、Pub/Sub はメッセージの再配信を開始します。
最大リース期間は、FlowControl
のmax_lease_duration
パラメーターで指定できます。
事象・課題
何が起きていたのか
ある日、Pub/Sub のエラーログに以下のようなメッセージが出力され、DLQ にメッセージが溜まっていました。
Dropping 7 items because they were leased too long.
これは Pub/Sub メッセージの最大リース期間を超過したためにドロップされたことを示しています。
一方で、ログを見ると、処理自体は正常に完了し、検索エンジンにデータも正常に保存されていました。このため、処理が正常完了しているにもかかわらず DLQ に保存されるという不自然な挙動が生じていました。
Python Pub/Sub Client Subscriber のメッセージ処理
調査を進めるにあたり、Python Pub/Sub Client の挙動を詳しく調べました。
前提条件は以下の通りです。
- Client のバージョンは執筆時最新の v2.27.3
- Streaming Pull を使用
- Exactly-once オプションは無効
Pub/Sub Client の主な処理は streaming_pull_manager.py で行われ、主に以下の5つのスレッドが協調して動作します。(なお、Schedulerはさらに別で複数のワーカースレッドを管理します)
Consumer
- Pub/Sub Topic からメッセージをまとめて Pull する。
max_messages
パラメーターでPullするメッセージ最大数を指定 (デフォルト: 1000) - 取得したメッセージを Dispatcher に渡す
- Pub/Sub Topic からメッセージをまとめて Pull する。
Dispatcher
- Leaser と Scheduler にメッセージを伝搬する
- Leaser と Scheduler は Dispatcher を通して、Pub/Sub へ以下のリクエストを送る
- Ack: 処理成功
- Nack: 処理失敗で再配信を要求
- ModifyAckDeadline: Ack期限の延長要求
- Drop: 最大リース期間を超えた場合にメッセージを除外し、再配信を要求
Leaser
Scheduler
- 各メッセージごとにライブラリユーザーが定義したコールバック関数を実行する
- Scheduler はスレッドプールを管理しており、自身では直接コールバック関数を実行しない。内部キューにタスクを格納し、バックグラウンドのワーカースレッドがそのキューから順次取り出して実行する。
- コールバック関数内で明示的に Ack または Nack を行う
Heartbeater
- 定期的に Pub/Sub に対して keep-alive シグナルを送信し、接続状態を維持する
Ack 期限の自動調整
重要な点として、Ack 期限を超えた場合も、ライブラリ側がメッセージの 最大リース期間に達するまで定期的に Ack 期限を延長してくれます。
また、過去の Ack までにかかった時間の99パーセンタイルを Ack 期限となるように自動で調整されています (code, code)。
これにより、不要な Ack 期限の延長リクエストの頻度を減らし、大部分の処理時間をカバーしつつ、極端に長い処理時間の影響を最小限にする仕組みになっています。
課題解決の方法
調査の結果、LLM を利用していたことで各メッセージの処理時間が長くなり、さらに一度に Pull するメッセージ数が多かったため、処理待ちのメッセージがキューに溜まっていました。そのため、各メッセージの処理開始が遅れ、Ack を返すまでの時間が最大リース期間を超えてしまい、メッセージがドロップされていました。ドロップされたメッセージは再配信されますが、同じ理由で再びドロップされ、最終的に DLQ へ送られる状況が発生していました。
一方で、このメッセージのドロップ処理は、コールバック関数を実行する Scheduler スレッド (今回の場合 LLM による処理を行うスレッド) とは非同期に行われるため、現在処理中のメッセージはドロップされるものの、実際には処理が正常に完了し検索エンジンに保存されていました。
考えられる対策としては以下があります。
- 一度に Subscribe するメッセージ流量を十分に小さくする
- メッセージ処理自体の高速化を行い、1つの処理にかかる時間を短縮する
今回は前者の流量調整を行い、具体的には Flow Control の max_messages
パラメーターをデフォルトの1000から1にしました。
今回のケースでは、処理に時間がかかっても問題ないワークロードであり、並列処理も LLM の Rate Limit の観点から不要でした。ただし、この方法は高スループットが求められる環境ではワークしない可能性があることに注意が必要です。
結果として、メッセージごとの処理時間に十分な余裕が生まれ、メッセージの最大リース期間内に Ack が間に合うようになり、問題を解決できました。
まとめ
Pub/Sub 運用における今回のような挙動は、処理が長時間化するパイプラインではよく起こり得るものです。Pub/Sub Client の挙動を理解し、適切な調整(メッセージ数の制限、処理時間の短縮、最大リース期間の見直しなど)を行うことが安定運用につながります。
この記事が同様の課題を抱える方の参考になれば幸いです。
謝辞
問題解決する際にアドバイスいただいた KosukeArase さん、本ブログのレビューをしてくださった takuyaa さん、jusui さん、KosukeArase さん、hikaru-ida さん、takashifl さん、wreulickeさん、tokichieさん ありがとうございます。
これらの方々に深く感謝します。
仲間募集
私たちのチームでは、一緒に働く仲間を募集しています。 ご興味がある方は、以下のサイトからぜひご応募ください。 皆様のご応募をお待ちしております。