第97回 実務で回す特徴量パイプラインとフィーチャーストア — Pythonで作るバッチ/オンライン生成、整合性検証、サービング手順

モデル開発は進んだけれど、本番で特徴量が安定して供給されない──そんな現場のつまずきに寄り添います。ここでは「どの特徴量を、どう作り、どう配るか」を実務目線で整理し、まず試せるPythonの骨組みを示します。過度に理想化せず、実務で優先して取り組むべき点に焦点を当てます。

全体像

特徴量パイプラインは大きく「バッチ」「オンライン(低レイテンシ)」「フィーチャーストア(保存と配布)」に分かれます。学習時と配信時で差分(training/serving skew)が生じやすいため、その起きやすい箇所を最初に押さえます。

コンポーネント 役割 主な利点 注意点
バッチ特徴量パイプライン 定期的に集計・加工し、学習用/配信用のテーブルを作る 整合性がとりやすく計算コスト低め 更新遅延がある。リアルタイム性が必要なケースには不向き
オンライン特徴量パイプライン リクエスト時に最新値を返す(キャッシュ併用) 低レイテンシで最新データを提供可能 整合性の担保が難しい。スキーマ管理が重要
フィーチャーストア 特徴量の保存・提供・メタ情報管理を一元化 学習/配信ロジックの共通化、共有が容易に 導入コスト・運用負荷、権限設計が必要
training/serving skew 学習時と配信時で特徴量が一致しない事象 時間ズレ、欠損処理差分、aggregationウィンドウの不一致が原因

設計の判断基準

バッチ/オンラインの使い分け

判断は主に3つの軸で行います。

要件 バッチ向き オンライン向き
レイテンシ 数分〜数時間許容 数ms〜数百ms要求
整合性 高い(同じロジックで学習・配信を行いやすい) 注意が必要(生データと変換ロジックの同期を要する)
コスト 一般に低コストでスケールしやすい 高コストになりやすい。キャッシュで緩和

既存データ基盤との接続

  • まずは既存のバッチ出力(CSV/Parquet/DB)を活用して、最小実装で回す。
  • オンラインが必要なら、DB→Redisキャッシュ→APIという段階的導入を推奨。

スキーマと命名規約

名前は「entity_feature_name__agg_window」など規則的にし、スキーマはバージョンを必ず持ちます。違いを生む小さな差(時刻単位、タイムゾーン、欠損の埋め方)を仕様書に明記してください。

優先順位(実務的)

  • まずは少数の高価値特徴量を安定供給する。フルカバレッジよりも高影響の安定性を優先。
  • 次に監視とテストを整備し、段階的に追加。

Pythonで作る実装ガイド

最低構成は「抽出→変換→集計→検証→保存→配信」です。以下にコンポーネントごとの骨組みを示します。

抽出(例:Parquet/SQL)

import pandas as pd
from sqlalchemy import create_engine

# Parquet
df = pd.read_parquet('s3://bucket/raw/events.parquet')

# SQL
engine = create_engine('postgresql://user:pass@host/db')
df_sql = pd.read_sql('select * from events where ts > now() - interval '1 day'', engine)

変換・集計(pandasベースの例)

def compute_features(events: pd.DataFrame) -> pd.DataFrame:
    # シンプルな例:ユーザーごとの1日アクティビティ数
    events['ts'] = pd.to_datetime(events['ts'])
    window = events.set_index('ts').last('1D')
    feat = events.groupby('user_id').agg({
        'event_type': 'count',
    }).rename(columns={'event_type': 'events_1d'})
    return feat.reset_index()

検証(保存前の基本チェック)

def basic_checks(df: pd.DataFrame):
    assert 'user_id' in df.columns
    assert df['events_1d'].isnull().sum() == 0
    # 分布のサマリ確認
    print(df['events_1d'].describe())

保存(例:Parquet + S3)

import pyarrow.parquet as pq
import pyarrow as pa

table = pa.Table.from_pandas(feat)
pq.write_table(table, '/tmp/features_2026-06-01.parquet')
# 実務ではboto3でS3アップロード

ワークフロー化(Airflowタスクの骨組み)

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def job():
    # 上の処理を呼ぶ
    pass

dag = DAG('feature_pipeline', start_date=datetime(2026,6,1), schedule_interval='@daily')
run = PythonOperator(task_id='compute_features', python_callable=job, dag=dag)

オンライン配信(FastAPI + Redisキャッシュ)

from fastapi import FastAPI
import redis
import json

app = FastAPI()
redis_client = redis.Redis(host='redis', port=6379)

@app.get('/features/{user_id}')
def get_features(user_id: int):
    key = f'features:{user_id}'
    v = redis_client.get(key)
    if v:
        return json.loads(v)
    # キャッシュミス時はバッチストアを参照(簡易例)
    # db_fetch_featuresはParquetやDBを参照する関数
    feat = db_fetch_features(user_id)
    if feat:
        redis_client.set(key, json.dumps(feat), ex=300)
    return feat

整合性検証とテスト

学習用と配信用の特徴量が同じロジックで生成されることを保証するには、ロジックの共通化と自動テストが肝心です。

  • 関数の再利用:変換ロジックをライブラリ関数にして、学習パイプラインも配信パイプラインも同じ関数を呼ぶ。
  • ユニットテスト:pytestで入出力を固定したテストを用意する。
  • 差分検出:学習用のバッチ出力とオンライン配信結果のサンプル比較(平均・分散・KS検定)。
# pytestでの簡易テスト例
from myfeatures import compute_features

def test_compute_simple():
    df = pd.DataFrame({
        'user_id': [1,1,2],
        'event_type': ['a','b','a'],
        'ts': ['2026-06-01T00:00:00','2026-06-01T01:00:00','2026-06-01T02:00:00']
    })
    out = compute_features(df)
    assert out.loc[out['user_id']==1, 'events_1d'].iloc[0] == 2
# 分布比較の例(KS検定)
from scipy.stats import ks_2samp

stat, p = ks_2samp(batch['events_1d'], online_sample['events_1d'])
print('ks p-value', p)

モニタリングと運用

重要な指標とアラート設計の例を示します。

指標 何を見るか しきい値例 対応フロー
生成レイテンシ バッチジョブの完了時間、オンラインAPIの応答時間 バッチが遅延>1h、API p95>500ms まずリトライとログ確認、負荷増ならスケール
欠損率 主要特徴量の欠損割合 主要特徴量の欠損率>1% データソース確認、回復処理、ロールバック検討
分布逸脱 平均/分散の急変、KS検定p値低下 p<0.01や平均差が事前定義値を超える 根本原因調査、必要であれば特徴量の一時無効化

移行・バージョン管理

特徴量スキーマや計算ロジックは、データセット版管理と結びつけて扱います。以下は運用チェックリストです。

項目 運用アクション
スキーマバージョン 各特徴量にversionタグを付与し、変更は互換性ルールを明記
コード管理 特徴量生成コードはGit管理。リリース時にタグ付与
後方互換性テスト 旧バージョンの出力と比較する自動テストを用意
ロールバック フェイル時に旧パーティションや旧APIを再有効化する手順書

簡易フィーチャーストアの導入例とチェックリスト

すぐ試せるミニマム構成と、最初の14日でやることを示します。

レイヤー 技術例 役割
バッチストア Parquet + S3 学習用の安定したスナップショット保存
オンラインキャッシュ Redis 低レイテンシで特徴量提供
API層 FastAPI 配信用のHTTPインターフェース

最初の14日ガイド(プラン):

  • Day 1-3: 既存データで1つ高価値特徴量のバッチ生成を実装しParquetに保存。
  • Day 4-7: 同じ変換ロジックをライブラリ化し、unit testを追加。
  • Day 8-10: RedisキャッシュとFastAPIで配信経路を作成。簡易SLOを決める。
  • Day 11-14: モニタリング指標(生成時間、欠損率、分布)をダッシュボードに追加。

運用で陥りやすい失敗と回避策

  • 時刻のズレ:UTC/ローカル混在で集計窓がずれる。タイムゾーンは明示しUTCで統一。
  • 遅延更新:バッチ更新とAPI参照のタイミング不一致。更新スケジュールとTTLを設計。
  • 計算順序の不一致:学習時は過去のみ参照、配信時に未来情報を使ってしまう。必ずtime-travel安全なロジックを作る。
  • テスト不足:ローカル/サンプルでの動作確認だけで運用に出すと本番データで破綻する。自動テストとステージングを必須に。
  • 過度な最適化:最初から全特徴量を高速化しようとせず、まずは正確なパイプラインを整備する。

次の一歩

この記事を読んだあとに試す3つの実践タスクです。短時間で成果を得られる項目を選びました。

  1. 小さなバッチ特徴量を1つ作る(Parquet出力まで)。
  2. 同じロジックで学習用と配信用の関数を作り、pytestで差分テストを1つ追加する。
  3. 監視ダッシュボードに「バッチ生成レイテンシ」メトリクスを追加してアラート閾値を決める。

関連回の案内:第96回(データバージョン管理)、第82回(ドリフト検出)、第94回(CI/CD)を参照して、データ管理とデプロイをつなげてください。

まとめ

特徴量パイプラインは「作ること」より「安定して供給すること」が重要です。まずは少数の高影響特徴量を、共通化された変換関数とテストで運用に載せ、段階的にオンライン配信やフィーチャーストアを導入してください。モニタリングとバージョン管理を初期から設計することで、training/serving skewや運用トラブルを未然に防げます。

Manage AI シリーズ「AIとPythonの実務」では、次回以降も「試して動く」手順を中心に紹介します。まずはこの記事の3つのタスクのうち1つを今週の作業としてスケジュールに入れてみてください。