第48回 実務で回す軽量Feature StoreをPythonで作る — 設計・実装・運用の手順

実務でAIを回すとき、学習用に作った特徴量と推論時に使う特徴量がずれてしまい、原因調査や不具合対応で手が止まる――そんな経験はありませんか。この記事では「まずは一つの特徴量を確実に移行して運用に乗せる」ことを目標に、Pythonを使った軽量なFeature Storeの設計・実装・運用手順を示します。ローカルsqlite+FastAPIの最小構成で動くサンプルも用意し、今日から試せる形で解説します。

① 現場課題とFeature Storeの役割

現場でよく起きるつまずき:

  • 学習時と推論時で特徴量生成ロジックが異なる(pandasコードと本番バッチが乖離)
  • スキーマや名前のバージョン管理が甘く、デプロイ後に取り返しがつかない
  • 遅延特徴量や時間ウィンドウの扱いが不明確で精度が変わる

Feature Storeが実務で果たす役割は次の3点です。まず特徴量定義を一元化して学習・推論で再利用できること、次に特徴量のバージョン・スキーマを管理して後方互換性を担保すること、最後に計算・配信・監視の運用ルールを標準化することです。

② 設計方針(オンライン/オフライン、一貫性・バージョン管理・スキーマ)

基本方針

  • 軽量:まずは1つの重要な特徴量をFeature Storeで運用に乗せる。フル機能を詰め込まない。
  • 一貫性優先:学習(offline)と推論(online)で同じ特徴量定義を参照する。コードとメタデータ(スキーマ)を同梱する。
  • バージョン管理:特徴量定義はバージョンを付け、互換性ルールを明示する(後方互換性あり/なし)。

オフライン/オンラインの分担

  • オフライン:バッチでの特徴量計算、学習用の時系列結合、品質チェック。
  • オンライン:低遅延での特徴量取得API(キャッシュ併用)、推論環境向けのAPI契約を厳密化。

③ 実装手順(ストレージ設計、特徴量計算バッチ、登録API、フェッチAPI)

ストレージ設計(最小構成)

まずはローカルで動くsqliteベースを想定。実運用ではpostgresなどに置き換え可能に設計します。

テーブル名 用途 主なカラム例
feature_definitions 特徴量定義とメタデータ(名前, バージョン, スキーマ, 作成者) feature_name, version, schema_json, owner, created_at
feature_values 計算済み特徴量(time-partitioned) entity_id, feature_name, version, value, timestamp
feature_audit 登録・更新の監査ログ event, feature_name, version, payload, created_at

テーブル定義テンプレート(sqlite):

CREATE TABLE feature_definitions (
  feature_name TEXT,
  version TEXT,
  schema_json TEXT,
  owner TEXT,
  created_at TEXT,
  PRIMARY KEY(feature_name, version)
);

CREATE TABLE feature_values (
  entity_id TEXT,
  feature_name TEXT,
  version TEXT,
  value TEXT,
  timestamp TEXT,
  PRIMARY KEY(entity_id, feature_name, version, timestamp)
);

CREATE TABLE feature_audit (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  event TEXT,
  feature_name TEXT,
  version TEXT,
  payload TEXT,
  created_at TEXT
);

特徴量計算バッチ(pandas)

ポイントは計算ロジックを関数化して、オフライン/オンラインで再利用できるようにすることです。簡単な例:

def compute_recency(df, ref_col='last_purchase_at', now=None):
    import pandas as pd
    if now is None:
        now = pd.Timestamp.utcnow()
    df = df.copy()
    df['recency_days'] = (now - pd.to_datetime(df[ref_col])).dt.days
    return df[['entity_id', 'recency_days']]

登録API(FastAPI)

管理者が新しい特徴量定義や計算済み値を登録するためのAPI。認証は省略していますが、本番では必須です。

from fastapi import FastAPI, HTTPException
import sqlite3
import json

app = FastAPI()
DB = 'feature_store.db'

@app.post('/register_definition')
async def register_definition(payload: dict):
    conn = sqlite3.connect(DB)
    cur = conn.cursor()
    cur.execute('INSERT OR REPLACE INTO feature_definitions (feature_name, version, schema_json, owner, created_at) VALUES (?, ?, ?, ?, ?)',
                (payload['feature_name'], payload['version'], json.dumps(payload['schema']), payload.get('owner',''), payload.get('created_at','')))
    conn.commit(); conn.close()
    return {'status': 'ok'}

フェッチAPI(推論向け)

低遅延を優先するため、Redisキャッシュを併用します。ここではシンプルにsqliteから値を返す例です。

@app.get('/fetch_feature')
async def fetch_feature(feature_name: str, entity_id: str, version: str = None):
    conn = sqlite3.connect(DB)
    cur = conn.cursor()
    if version:
        cur.execute('SELECT value, timestamp FROM feature_values WHERE entity_id=? AND feature_name=? AND version=? ORDER BY timestamp DESC LIMIT 1',
                    (entity_id, feature_name, version))
    else:
        cur.execute('SELECT value, timestamp FROM feature_values WHERE entity_id=? AND feature_name=? ORDER BY timestamp DESC LIMIT 1',
                    (entity_id, feature_name))
    row = cur.fetchone(); conn.close()
    if not row:
        raise HTTPException(status_code=404, detail='feature not found')
    return {'entity_id': entity_id, 'feature_name': feature_name, 'value': row[0], 'timestamp': row[1]}

学習時の接続(pandas + sqlite)

学習側ではオフラインの特徴量テーブルを使って結合します。例:

import pandas as pd
import sqlite3

conn = sqlite3.connect('feature_store.db')
train = pd.read_csv('train_table.csv')
features = pd.read_sql_query("SELECT * FROM feature_values WHERE feature_name='recency'", conn)
train = train.merge(features, left_on='entity_id', right_on='entity_id', how='left')

④ テストと品質ゲート(一致テスト・後方互換性テスト)

実務で重要なのは「ローカルで簡単に回せるテスト」です。以下を最低限用意します。

テスト名 目的 期待される判定
一致テスト(offline vs online) バッチ計算結果とAPI取得結果の同等性を検証 許容差以内/不一致はブロック
スキーマ互換性テスト 新バージョンが既存クライアントへ影響しないか確認 互換なら通過、破壊的変更は明示的に承認
後方データ復元テスト 過去データの再計算で同じ結果が得られるか 再現可能であること

簡単な一致テストの流れ:

  • サンプルのentityリストを準備
  • オフラインバッチで特徴量を計算してfeature_valuesへ登録
  • APIで同じentityの特徴量を取得して比較
  • 差分があればCIで失敗にする

⑤ 運用と監視(メトリクス・アラート・再計算ポリシー)

監視すべきメトリクス

  • APIレイテンシ(p95, p99)、エラー率
  • 特徴量欠損率(entityごとの欠損割合)
  • オフラインとオンラインの差分分布(平均差、分位)
  • バッチの遅延(期待完了時刻からのずれ)

アラート設計の例

  • 欠損率が閾値を超えたらSlack通知・自動ロールバックを検討
  • 一致テストがCIで失敗した場合はリリース停止
  • APIエラー率が高い場合はフォールバック(デフォルト特徴量)で段階的対応

再計算ポリシー

  • クリティカルな特徴量はリアルタイム近傍での再計算を許容(再現時間を定義)
  • コストが高い集計は定期再計算(夜間バッチ)+インクリメンタル更新
  • 再計算履歴と監査ログを残す(誰が、いつ、何を実行したか)

⑥ 移行とチェックリスト

一つの特徴量を移行するための最小チェックリスト:

# 項目 確認内容
1 定義作成 feature_name, schema, version, owner を作成
2 バッチ実装 pandas関数化、テストデータで再現
3 登録APIで投入 feature_definitionsに登録済み
4 API取得 fetch APIで取得できること(ローカルで確認)
5 一致テスト オフラインとオンラインで同等性を確認
6 監視設定 メトリクスのダッシュボードとアラート作成
7 Runbook作成 障害時の手順(ロールバック、再計算)を明文化

⑦ サンプル実装と次の一歩

以下はリポジトリの最小構成例(ローカルで動く想定)。READMEには起動手順を明記します。

ファイル 役割
app/main.py FastAPI アプリ(register/fetchエンドポイント)
batch/compute.py pandasでの特徴量計算関数
infra/init_db.sql sqliteテーブル定義
tests/test_consistency.py 一致テスト(CI用)
runbook.md 運用手順抜粋

次の一歩(運用拡張の提案):

  • sqlite→postgres/Cloud SQLに移行
  • Redisキャッシュ導入でオンライン性能改善
  • CIでの品質ゲート強化(差分の可視化、自動承認ルール)

運用上の注意点(実務的な落とし穴)

  • 時間窓と遅延特徴量:発生時刻と観測時刻を明確に分け、遅延を過小評価しない
  • スキーマ変更:破壊的変更はversionを上げ、互換性ポリシーを明記する
  • データ削除(GDPR等):feature_valuesのログやコピーに対する削除方針を用意する
  • コスト・スケール:最初は軽量構成で検証し、負荷増に応じて段階的にサービスを分離

成果物テンプレート(持ち帰り)

API契約(抜粋)

エンドポイント 入力 出力
/register_definition (POST) {feature_name, version, schema, owner, created_at} {status: ok}
/fetch_feature (GET) feature_name, entity_id, optional version {entity_id, feature_name, value, timestamp}

一致テストケース(例)

テスト名 内容
recency_consistency 同一のentityセットについて、batchで計算したrecencyとAPIで取得したrecencyの差が0であること

移行チェックリスト(抜粋)

項目 完了?
定義登録
バッチでの検証
API取得での検証
CIで一致テスト通過
監視設定

Runbook抜粋

障害時の最小対応手順(例):

  • 1) APIエラー検知 → 影響範囲を特定(機能・サービス)
  • 2) 一致テストをローカルで実行 → batchとAPIの差分を確認
  • 3) 差分が大きければ最近のdefのバージョンへロールバック
  • 4) 必要なら該当時間範囲での再計算とデプロイ(作業ログを残す)
# 例: ロールバック手順(抜粋)
# 1. feature_definitionsの旧バージョンをenable
# 2. feature_valuesを旧versionに差し替え(バックアップは必須)
# 3. 一致テストを実行

まとめ

この記事では、「今日から1つの特徴量を移行して運用に乗せる」ことを目標に、軽量Feature Storeの設計・実装・運用手順を示しました。重要なポイントは次の通りです。

  • まずは最小実装で検証する(sqlite+FastAPIで十分)
  • 学習と推論で同じ定義を参照できるように、特徴量定義とスキーマを管理する
  • CIで一致テストとスキーマ互換性テストを自動化する
  • 監視やRunbookを用意して実運用の障害に備える

Manage AIのシリーズ(第43回・第37回・第36回)で扱ったパイプライン、デプロイ、監視の知見がある前提で、この記事を次の一手として活用してください。まずは一つの特徴量で試し、運用手順とテストを整備してからスケールアウトすることをおすすめします。

参考リポジトリ(ローカルで動く最小実装)を用意しています。まずはローカルで起動して、一つの特徴量を登録→取得→学習まで流してみてください。