第57回 業務イベント駆動で回すAI機能の実装:Pythonで作るトリガー・キュー・リトライ・DLQの実務手順

はじめに — つまずきに寄り添う一言

業務でAIを呼び出すとき、”いつ呼ぶか”、”失敗したらどうするか”、”二重実行をどう防ぐか”といった実運用の細かい問題でつまずきがちです。本記事では、第56回のモデル資産カタログ化の続きとして、カタログ化されたモデルを業務イベントに結びつけ、安定して回すための実務的な設計とPythonでの実装例を示します。読み終えれば、すぐに試せるコード片と運用Runbookスニペットが手に入ります。

対象ユースケースと全体像

想定する典型ユースケース:

  • 問い合わせ自動振り分け(同期API or 非同期ワーカー)
  • OCR→確認ワークフロー(非同期バッチ処理)
  • レコメンド非同期生成(イベント発行→キュー→ワーカー)

ここでの基本パターンは「イベント発生→(場合によりキュー)→ワーカー(AI呼び出し)→結果保存/通知」。重要なのは各段階での可観測性・リトライ・DLQ・idempotencyの設計です。

トリガー設計:同期 vs 非同期 の使い分け

選択基準は主に応答性(レイテンシ)、可用性、処理の複雑さ、コストです。下表は簡潔な比較です。

同期API呼び出し 非同期メッセージ駆動(イベント→キュー→ワーカー)
適した場面 即時応答が必要な場面(チャットボット、検索など) バッチ処理、長時間処理、負荷平準化が必要な場面
利点 簡単、即時結果 耐障害性、スケーラビリティ、リトライ制御
注意点 タイムアウト、リソース競合、コスト高になることも 設計・運用の複雑度が上がる(DLQ、再処理運用など)

Pythonでのエントリポイント例(簡易)

同期APIならFlask/FastAPIのエンドポイントで直接モデル呼び出し。非同期ならイベントを発行してレスポンスは即時返却。

実装メモ: コード例は環境に合わせて調整してください。

キュー選定と最小ワーカー実装例

代表的キューの選定基準:

選択項目 SQS RabbitMQ Kafka Redis
メッセージ保証 at-least-once at-least-once(ackベース) at-least-once(設定次第) ベーシック(パーシステンスで改善)
運用負荷 低(マネージド) 高(スケール/運用) 低〜中(簡単に始められる)
向く用途 クラウド連携、DLQ機能が整備 柔軟なルーティングが必要な場合 ストリーミング大量データ、イベントソーシング 開発・小規模バッチ、簡易通知

最小ワーカー例(SQS:boto3)

実装メモ: コード例は環境に合わせて調整してください。

RabbitMQ(kombu)/Kafka(confluent-kafka)/Redis(redis-py)ミニ例

実装メモ: コード例は環境に合わせて調整してください。

リトライ・バックオフ・DLQ戦略

ポイントは恒久失敗(データ不備、認証不可)と一時失敗(ネットワーク、レート制限)を判定し、DLQへ送る条件を明確にすることです。

  • 指数バックオフ(例:sleep = base * 2**attempt、上限を設定)
  • 最大リトライ回数を定め、越えたらDLQへ
  • DLQのメタ情報に失敗理由・attempt数・trace_idを含める

Pythonでの簡単なリトライ例(自前実装):

実装メモ: コード例は環境に合わせて調整してください。

DLQの運用設計

DLQは単なるゴミ箱ではなく、再処理と原因分析のための情報保管場所です。運用フローの例:

  • DLQに送る際は元イベント、attempt数、エラーメッセージ、trace_idを付与
  • 定期的(例:毎日)にDLQをスキャンして原因分類を行う
  • 再処理は人手での確認(データ修正)→ 再投入 or 自動再処理(条件を厳密に)

多重実行対策(idempotency)と整合性

イベントのat-least-once特性により二重処理は避けられません。実務的対策は以下です:

  • idempotency key(例:イベントID / リクエストのハッシュ)を設計する
  • データベースにユニーク制約を設け、挿入時にDuplicateエラーで二重処理を検知する
  • キャッシュ(RedisのSETNX)を一時ロックに使う

簡単な実装例(DBユニーク制約を利用):

実装メモ: コード例は環境に合わせて調整してください。

観測性とアラート連携

各イベントにtrace_idを付与し、ログ・トレース・メトリクスで追跡できるようにします。計測すべき主要メトリクス:

メトリクス 目的 推奨アラート閾値(例)
処理時間(p50/p95/p99) レイテンシ監視 p95 > 3s で警告
失敗率 全体の健全性 5%以上で調査
DLQ率 恒久失敗の増加を検知 1%以上でアラート

第39回の可観測性設計に従い、ログにtrace_idを含め、分散トレーシングとメトリクスを紐付けてください。また、重要な閾値をアラートとしてSLAチームに通知する仕組みを用意します。

テストとローカル開発

ローカルでの検証は必須です。推奨手順:

  • LocalStackでSQSやS3を模擬する
  • docker-composeでRabbitMQ/Kafka/Redisを立てる
  • pytestでワーカーの振る舞い(リトライ・DLQ送信・idempotency)をテストする

pytestのDLQ動作テスト(簡略):

実装メモ: コード例は環境に合わせて調整してください。

運用Runbookスニペット(すぐ使える手順)

障害対応の切り分けと再処理手順を簡潔に示します。

状況 初動手順 次のアクション
ワーカーが高負荷で遅延 メトリクスp95を確認、スケール可能か確認 オートスケール、または一時的にレート制限をかける
DLQ増加 DLQの最新メッセージをサンプルで確認(trace_id, error) 恒久エラーなら原因修正後に再投入、あるいは通知
外部APIレート制限 一時的に処理を停止、バックオフ緩和を検討 リトライポリシー見直し・SLA連絡

次の一歩チェックリスト & テンプレリポジトリ

  • 導入前チェック:同期/非同期の選択基準を明確にする
  • 短期試験設計:サンプルイベント50件でエンドツーエンド検証
  • 観測性準備:trace_idと主要メトリクスの収集を開始
  • テンプレリポジトリ(最小実装)を用意しました:
    https://github.com/manageai/example-event-driven-ai

まとめ

イベント駆動でAI機能を安定して運用するためには、トリガーの選択(同期/非同期)、適切なキューの選定、リトライとDLQの厳格な設計、idempotencyの担保、観測性の確保が不可欠です。この記事では、これらを実務観点で整理し、Pythonの具体例と運用Runbookを提示しました。まずは小さなパイロット(50件程度)でエンドツーエンドを検証し、DLQやメトリクスを見ながら段階的に拡張することをお勧めします。

次回は「運用中のコスト最適化とモデルバージョン管理」を予定しています。Manage AIのシリーズ『AIとPythonの実務』を続けてお読みください。