実務でAIを回すとき、学習用に作った特徴量と推論時に使う特徴量がずれてしまい、原因調査や不具合対応で手が止まる――そんな経験はありませんか。この記事では「まずは一つの特徴量を確実に移行して運用に乗せる」ことを目標に、Pythonを使った軽量なFeature Storeの設計・実装・運用手順を示します。ローカルsqlite+FastAPIの最小構成で動くサンプルも用意し、今日から試せる形で解説します。
① 現場課題とFeature Storeの役割
現場でよく起きるつまずき:
- 学習時と推論時で特徴量生成ロジックが異なる(pandasコードと本番バッチが乖離)
- スキーマや名前のバージョン管理が甘く、デプロイ後に取り返しがつかない
- 遅延特徴量や時間ウィンドウの扱いが不明確で精度が変わる
Feature Storeが実務で果たす役割は次の3点です。まず特徴量定義を一元化して学習・推論で再利用できること、次に特徴量のバージョン・スキーマを管理して後方互換性を担保すること、最後に計算・配信・監視の運用ルールを標準化することです。
② 設計方針(オンライン/オフライン、一貫性・バージョン管理・スキーマ)
基本方針
- 軽量:まずは1つの重要な特徴量をFeature Storeで運用に乗せる。フル機能を詰め込まない。
- 一貫性優先:学習(offline)と推論(online)で同じ特徴量定義を参照する。コードとメタデータ(スキーマ)を同梱する。
- バージョン管理:特徴量定義はバージョンを付け、互換性ルールを明示する(後方互換性あり/なし)。
オフライン/オンラインの分担
- オフライン:バッチでの特徴量計算、学習用の時系列結合、品質チェック。
- オンライン:低遅延での特徴量取得API(キャッシュ併用)、推論環境向けのAPI契約を厳密化。
③ 実装手順(ストレージ設計、特徴量計算バッチ、登録API、フェッチAPI)
ストレージ設計(最小構成)
まずはローカルで動くsqliteベースを想定。実運用ではpostgresなどに置き換え可能に設計します。
| テーブル名 | 用途 | 主なカラム例 |
|---|---|---|
| feature_definitions | 特徴量定義とメタデータ(名前, バージョン, スキーマ, 作成者) | feature_name, version, schema_json, owner, created_at |
| feature_values | 計算済み特徴量(time-partitioned) | entity_id, feature_name, version, value, timestamp |
| feature_audit | 登録・更新の監査ログ | event, feature_name, version, payload, created_at |
テーブル定義テンプレート(sqlite):
CREATE TABLE feature_definitions (
feature_name TEXT,
version TEXT,
schema_json TEXT,
owner TEXT,
created_at TEXT,
PRIMARY KEY(feature_name, version)
);
CREATE TABLE feature_values (
entity_id TEXT,
feature_name TEXT,
version TEXT,
value TEXT,
timestamp TEXT,
PRIMARY KEY(entity_id, feature_name, version, timestamp)
);
CREATE TABLE feature_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event TEXT,
feature_name TEXT,
version TEXT,
payload TEXT,
created_at TEXT
);
特徴量計算バッチ(pandas)
ポイントは計算ロジックを関数化して、オフライン/オンラインで再利用できるようにすることです。簡単な例:
def compute_recency(df, ref_col='last_purchase_at', now=None):
import pandas as pd
if now is None:
now = pd.Timestamp.utcnow()
df = df.copy()
df['recency_days'] = (now - pd.to_datetime(df[ref_col])).dt.days
return df[['entity_id', 'recency_days']]
登録API(FastAPI)
管理者が新しい特徴量定義や計算済み値を登録するためのAPI。認証は省略していますが、本番では必須です。
from fastapi import FastAPI, HTTPException
import sqlite3
import json
app = FastAPI()
DB = 'feature_store.db'
@app.post('/register_definition')
async def register_definition(payload: dict):
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute('INSERT OR REPLACE INTO feature_definitions (feature_name, version, schema_json, owner, created_at) VALUES (?, ?, ?, ?, ?)',
(payload['feature_name'], payload['version'], json.dumps(payload['schema']), payload.get('owner',''), payload.get('created_at','')))
conn.commit(); conn.close()
return {'status': 'ok'}
フェッチAPI(推論向け)
低遅延を優先するため、Redisキャッシュを併用します。ここではシンプルにsqliteから値を返す例です。
@app.get('/fetch_feature')
async def fetch_feature(feature_name: str, entity_id: str, version: str = None):
conn = sqlite3.connect(DB)
cur = conn.cursor()
if version:
cur.execute('SELECT value, timestamp FROM feature_values WHERE entity_id=? AND feature_name=? AND version=? ORDER BY timestamp DESC LIMIT 1',
(entity_id, feature_name, version))
else:
cur.execute('SELECT value, timestamp FROM feature_values WHERE entity_id=? AND feature_name=? ORDER BY timestamp DESC LIMIT 1',
(entity_id, feature_name))
row = cur.fetchone(); conn.close()
if not row:
raise HTTPException(status_code=404, detail='feature not found')
return {'entity_id': entity_id, 'feature_name': feature_name, 'value': row[0], 'timestamp': row[1]}
学習時の接続(pandas + sqlite)
学習側ではオフラインの特徴量テーブルを使って結合します。例:
import pandas as pd
import sqlite3
conn = sqlite3.connect('feature_store.db')
train = pd.read_csv('train_table.csv')
features = pd.read_sql_query("SELECT * FROM feature_values WHERE feature_name='recency'", conn)
train = train.merge(features, left_on='entity_id', right_on='entity_id', how='left')
④ テストと品質ゲート(一致テスト・後方互換性テスト)
実務で重要なのは「ローカルで簡単に回せるテスト」です。以下を最低限用意します。
| テスト名 | 目的 | 期待される判定 |
|---|---|---|
| 一致テスト(offline vs online) | バッチ計算結果とAPI取得結果の同等性を検証 | 許容差以内/不一致はブロック |
| スキーマ互換性テスト | 新バージョンが既存クライアントへ影響しないか確認 | 互換なら通過、破壊的変更は明示的に承認 |
| 後方データ復元テスト | 過去データの再計算で同じ結果が得られるか | 再現可能であること |
簡単な一致テストの流れ:
- サンプルのentityリストを準備
- オフラインバッチで特徴量を計算してfeature_valuesへ登録
- APIで同じentityの特徴量を取得して比較
- 差分があればCIで失敗にする
⑤ 運用と監視(メトリクス・アラート・再計算ポリシー)
監視すべきメトリクス
- APIレイテンシ(p95, p99)、エラー率
- 特徴量欠損率(entityごとの欠損割合)
- オフラインとオンラインの差分分布(平均差、分位)
- バッチの遅延(期待完了時刻からのずれ)
アラート設計の例
- 欠損率が閾値を超えたらSlack通知・自動ロールバックを検討
- 一致テストがCIで失敗した場合はリリース停止
- APIエラー率が高い場合はフォールバック(デフォルト特徴量)で段階的対応
再計算ポリシー
- クリティカルな特徴量はリアルタイム近傍での再計算を許容(再現時間を定義)
- コストが高い集計は定期再計算(夜間バッチ)+インクリメンタル更新
- 再計算履歴と監査ログを残す(誰が、いつ、何を実行したか)
⑥ 移行とチェックリスト
一つの特徴量を移行するための最小チェックリスト:
| # | 項目 | 確認内容 |
|---|---|---|
| 1 | 定義作成 | feature_name, schema, version, owner を作成 |
| 2 | バッチ実装 | pandas関数化、テストデータで再現 |
| 3 | 登録APIで投入 | feature_definitionsに登録済み |
| 4 | API取得 | fetch APIで取得できること(ローカルで確認) |
| 5 | 一致テスト | オフラインとオンラインで同等性を確認 |
| 6 | 監視設定 | メトリクスのダッシュボードとアラート作成 |
| 7 | Runbook作成 | 障害時の手順(ロールバック、再計算)を明文化 |
⑦ サンプル実装と次の一歩
以下はリポジトリの最小構成例(ローカルで動く想定)。READMEには起動手順を明記します。
| ファイル | 役割 |
|---|---|
| app/main.py | FastAPI アプリ(register/fetchエンドポイント) |
| batch/compute.py | pandasでの特徴量計算関数 |
| infra/init_db.sql | sqliteテーブル定義 |
| tests/test_consistency.py | 一致テスト(CI用) |
| runbook.md | 運用手順抜粋 |
次の一歩(運用拡張の提案):
- sqlite→postgres/Cloud SQLに移行
- Redisキャッシュ導入でオンライン性能改善
- CIでの品質ゲート強化(差分の可視化、自動承認ルール)
運用上の注意点(実務的な落とし穴)
- 時間窓と遅延特徴量:発生時刻と観測時刻を明確に分け、遅延を過小評価しない
- スキーマ変更:破壊的変更はversionを上げ、互換性ポリシーを明記する
- データ削除(GDPR等):feature_valuesのログやコピーに対する削除方針を用意する
- コスト・スケール:最初は軽量構成で検証し、負荷増に応じて段階的にサービスを分離
成果物テンプレート(持ち帰り)
API契約(抜粋)
| エンドポイント | 入力 | 出力 |
|---|---|---|
| /register_definition (POST) | {feature_name, version, schema, owner, created_at} | {status: ok} |
| /fetch_feature (GET) | feature_name, entity_id, optional version | {entity_id, feature_name, value, timestamp} |
一致テストケース(例)
| テスト名 | 内容 |
|---|---|
| recency_consistency | 同一のentityセットについて、batchで計算したrecencyとAPIで取得したrecencyの差が0であること |
移行チェックリスト(抜粋)
| 項目 | 完了? |
|---|---|
| 定義登録 | □ |
| バッチでの検証 | □ |
| API取得での検証 | □ |
| CIで一致テスト通過 | □ |
| 監視設定 | □ |
Runbook抜粋
障害時の最小対応手順(例):
- 1) APIエラー検知 → 影響範囲を特定(機能・サービス)
- 2) 一致テストをローカルで実行 → batchとAPIの差分を確認
- 3) 差分が大きければ最近のdefのバージョンへロールバック
- 4) 必要なら該当時間範囲での再計算とデプロイ(作業ログを残す)
# 例: ロールバック手順(抜粋)
# 1. feature_definitionsの旧バージョンをenable
# 2. feature_valuesを旧versionに差し替え(バックアップは必須)
# 3. 一致テストを実行
まとめ
この記事では、「今日から1つの特徴量を移行して運用に乗せる」ことを目標に、軽量Feature Storeの設計・実装・運用手順を示しました。重要なポイントは次の通りです。
- まずは最小実装で検証する(sqlite+FastAPIで十分)
- 学習と推論で同じ定義を参照できるように、特徴量定義とスキーマを管理する
- CIで一致テストとスキーマ互換性テストを自動化する
- 監視やRunbookを用意して実運用の障害に備える
Manage AIのシリーズ(第43回・第37回・第36回)で扱ったパイプライン、デプロイ、監視の知見がある前提で、この記事を次の一手として活用してください。まずは一つの特徴量で試し、運用手順とテストを整備してからスケールアウトすることをおすすめします。
参考リポジトリ(ローカルで動く最小実装)を用意しています。まずはローカルで起動して、一つの特徴量を登録→取得→学習まで流してみてください。