はじめに — つまずきに寄り添う一言
業務でAIを呼び出すとき、”いつ呼ぶか”、”失敗したらどうするか”、”二重実行をどう防ぐか”といった実運用の細かい問題でつまずきがちです。本記事では、第56回のモデル資産カタログ化の続きとして、カタログ化されたモデルを業務イベントに結びつけ、安定して回すための実務的な設計とPythonでの実装例を示します。読み終えれば、すぐに試せるコード片と運用Runbookスニペットが手に入ります。
対象ユースケースと全体像
想定する典型ユースケース:
- 問い合わせ自動振り分け(同期API or 非同期ワーカー)
- OCR→確認ワークフロー(非同期バッチ処理)
- レコメンド非同期生成(イベント発行→キュー→ワーカー)
ここでの基本パターンは「イベント発生→(場合によりキュー)→ワーカー(AI呼び出し)→結果保存/通知」。重要なのは各段階での可観測性・リトライ・DLQ・idempotencyの設計です。
トリガー設計:同期 vs 非同期 の使い分け
選択基準は主に応答性(レイテンシ)、可用性、処理の複雑さ、コストです。下表は簡潔な比較です。
| 同期API呼び出し | 非同期メッセージ駆動(イベント→キュー→ワーカー) | |
|---|---|---|
| 適した場面 | 即時応答が必要な場面(チャットボット、検索など) | バッチ処理、長時間処理、負荷平準化が必要な場面 |
| 利点 | 簡単、即時結果 | 耐障害性、スケーラビリティ、リトライ制御 |
| 注意点 | タイムアウト、リソース競合、コスト高になることも | 設計・運用の複雑度が上がる(DLQ、再処理運用など) |
Pythonでのエントリポイント例(簡易)
同期APIならFlask/FastAPIのエンドポイントで直接モデル呼び出し。非同期ならイベントを発行してレスポンスは即時返却。
実装メモ: コード例は環境に合わせて調整してください。
キュー選定と最小ワーカー実装例
代表的キューの選定基準:
| 選択項目 | SQS | RabbitMQ | Kafka | Redis |
|---|---|---|---|---|
| メッセージ保証 | at-least-once | at-least-once(ackベース) | at-least-once(設定次第) | ベーシック(パーシステンスで改善) |
| 運用負荷 | 低(マネージド) | 中 | 高(スケール/運用) | 低〜中(簡単に始められる) |
| 向く用途 | クラウド連携、DLQ機能が整備 | 柔軟なルーティングが必要な場合 | ストリーミング大量データ、イベントソーシング | 開発・小規模バッチ、簡易通知 |
最小ワーカー例(SQS:boto3)
実装メモ: コード例は環境に合わせて調整してください。
RabbitMQ(kombu)/Kafka(confluent-kafka)/Redis(redis-py)ミニ例
実装メモ: コード例は環境に合わせて調整してください。
リトライ・バックオフ・DLQ戦略
ポイントは恒久失敗(データ不備、認証不可)と一時失敗(ネットワーク、レート制限)を判定し、DLQへ送る条件を明確にすることです。
- 指数バックオフ(例:sleep = base * 2**attempt、上限を設定)
- 最大リトライ回数を定め、越えたらDLQへ
- DLQのメタ情報に失敗理由・attempt数・trace_idを含める
Pythonでの簡単なリトライ例(自前実装):
実装メモ: コード例は環境に合わせて調整してください。
DLQの運用設計
DLQは単なるゴミ箱ではなく、再処理と原因分析のための情報保管場所です。運用フローの例:
- DLQに送る際は元イベント、attempt数、エラーメッセージ、trace_idを付与
- 定期的(例:毎日)にDLQをスキャンして原因分類を行う
- 再処理は人手での確認(データ修正)→ 再投入 or 自動再処理(条件を厳密に)
多重実行対策(idempotency)と整合性
イベントのat-least-once特性により二重処理は避けられません。実務的対策は以下です:
- idempotency key(例:イベントID / リクエストのハッシュ)を設計する
- データベースにユニーク制約を設け、挿入時にDuplicateエラーで二重処理を検知する
- キャッシュ(RedisのSETNX)を一時ロックに使う
簡単な実装例(DBユニーク制約を利用):
実装メモ: コード例は環境に合わせて調整してください。
観測性とアラート連携
各イベントにtrace_idを付与し、ログ・トレース・メトリクスで追跡できるようにします。計測すべき主要メトリクス:
| メトリクス | 目的 | 推奨アラート閾値(例) |
|---|---|---|
| 処理時間(p50/p95/p99) | レイテンシ監視 | p95 > 3s で警告 |
| 失敗率 | 全体の健全性 | 5%以上で調査 |
| DLQ率 | 恒久失敗の増加を検知 | 1%以上でアラート |
第39回の可観測性設計に従い、ログにtrace_idを含め、分散トレーシングとメトリクスを紐付けてください。また、重要な閾値をアラートとしてSLAチームに通知する仕組みを用意します。
テストとローカル開発
ローカルでの検証は必須です。推奨手順:
- LocalStackでSQSやS3を模擬する
- docker-composeでRabbitMQ/Kafka/Redisを立てる
- pytestでワーカーの振る舞い(リトライ・DLQ送信・idempotency)をテストする
pytestのDLQ動作テスト(簡略):
実装メモ: コード例は環境に合わせて調整してください。
運用Runbookスニペット(すぐ使える手順)
障害対応の切り分けと再処理手順を簡潔に示します。
| 状況 | 初動手順 | 次のアクション |
|---|---|---|
| ワーカーが高負荷で遅延 | メトリクスp95を確認、スケール可能か確認 | オートスケール、または一時的にレート制限をかける |
| DLQ増加 | DLQの最新メッセージをサンプルで確認(trace_id, error) | 恒久エラーなら原因修正後に再投入、あるいは通知 |
| 外部APIレート制限 | 一時的に処理を停止、バックオフ緩和を検討 | リトライポリシー見直し・SLA連絡 |
次の一歩チェックリスト & テンプレリポジトリ
- 導入前チェック:同期/非同期の選択基準を明確にする
- 短期試験設計:サンプルイベント50件でエンドツーエンド検証
- 観測性準備:trace_idと主要メトリクスの収集を開始
- テンプレリポジトリ(最小実装)を用意しました:
https://github.com/manageai/example-event-driven-ai
まとめ
イベント駆動でAI機能を安定して運用するためには、トリガーの選択(同期/非同期)、適切なキューの選定、リトライとDLQの厳格な設計、idempotencyの担保、観測性の確保が不可欠です。この記事では、これらを実務観点で整理し、Pythonの具体例と運用Runbookを提示しました。まずは小さなパイロット(50件程度)でエンドツーエンドを検証し、DLQやメトリクスを見ながら段階的に拡張することをお勧めします。
次回は「運用中のコスト最適化とモデルバージョン管理」を予定しています。Manage AIのシリーズ『AIとPythonの実務』を続けてお読みください。