jpskill.com
💼 ビジネス コミュニティ

OHLCV市場データ処理

ohlcv-processing

OHLCV(始値・高値・安値・終値・出来高)のリサンプリング、欠損値処理、異常検知、正規化、複数データソースの統合など、市場データを分析しやすい形に前処理するSkill。

📜 元の英語説明(参考)

Market data preparation including OHLCV resampling, gap handling, anomaly detection, normalization, and multi-source merging

🇯🇵 日本人クリエイター向け解説

一言でいうと

OHLCV(始値・高値・安値・終値・出来高)のリサンプリング、欠損値処理、異常検知、正規化、複数データソースの統合など、市場データを分析しやすい形に前処理するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o ohlcv-processing.zip https://jpskill.com/download/10422.zip && unzip -o ohlcv-processing.zip && rm ohlcv-processing.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/10422.zip -OutFile "$d\ohlcv-processing.zip"; Expand-Archive "$d\ohlcv-processing.zip" -DestinationPath $d -Force; ri "$d\ohlcv-processing.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して ohlcv-processing.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → ohlcv-processing フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

OHLCV 処理 — 市場データ準備

クリーンで一貫性のある OHLCV データは、あらゆるトレーディング分析の基礎です。ゴミを入れるとゴミが出てきます。たった 1 つの異常なローソク足が、誤ったシグナルをトリガーし、指標の計算を壊し、誤解を招くバックテスト結果を生み出す可能性があります。このスキルでは、検証、クリーニング、リサンプリング、正規化、およびマルチソースのマージという、完全なデータ準備パイプラインについて説明します。

重要な理由: 暗号通貨の OHLCV データは、従来の市場よりも複雑です。24 時間 365 日の取引は、公式の終値がないことを意味し、DEX アグリゲーターは価格に同意せず、低流動性のトークンはありえないローソク足を生み出し、API の停止はギャップを生み出します。すべての分析ワークフローは、このパイプラインから始める必要があります。

クイックスタート

1. 依存関係のインストール

uv pip install pandas numpy httpx

2. 標準 OHLCV DataFrame 形式

すべての処理関数は、この標準形式を想定しています。

import pandas as pd

# 標準 OHLCV DataFrame
# - DatetimeIndex は UTC
# - 列: open, high, low, close, volume (小文字)
# - タイムスタンプで昇順にソート
# - 重複するタイムスタンプなし

df = pd.DataFrame({
    "open": [1.10, 1.12, 1.11],
    "high": [1.15, 1.14, 1.13],
    "low": [1.08, 1.10, 1.09],
    "close": [1.12, 1.11, 1.12],
    "volume": [50000, 48000, 52000],
}, index=pd.to_datetime([
    "2025-01-01 00:00:00",
    "2025-01-01 00:01:00",
    "2025-01-01 00:02:00",
], utc=True))
df.index.name = "timestamp"

3. 完全な処理パイプライン

import pandas as pd
import numpy as np

def process_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    """完全な OHLCV 処理パイプラインを実行します。"""
    df = standardize_columns(df)
    df = validate_ohlcv(df)
    df = handle_gaps(df, method="ffill")
    df = detect_and_flag_anomalies(df)
    return df

データ検証

列のチェック

REQUIRED_COLUMNS = {"open", "high", "low", "close", "volume"}

def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """列名を小文字の標準に正規化します。"""
    df.columns = df.columns.str.lower().str.strip()
    # 一般的な名前の変更
    rename_map = {"vol": "volume", "v": "volume", "o": "open",
                  "h": "high", "l": "low", "c": "close"}
    df = df.rename(columns=rename_map)
    missing = REQUIRED_COLUMNS - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns: {missing}")
    return df[["open", "high", "low", "close", "volume"]]

構造検証

def validate_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    """OHLCV の構造的な整合性を検証します。"""
    # DatetimeIndex が UTC であることを確認
    if not isinstance(df.index, pd.DatetimeIndex):
        df.index = pd.to_datetime(df.index, utc=True)
    if df.index.tz is None:
        df.index = df.index.tz_localize("UTC")

    # ソートと重複排除
    df = df.sort_index()
    dupes = df.index.duplicated(keep="last")
    if dupes.any():
        print(f"Warning: Removed {dupes.sum()} duplicate timestamps")
        df = df[~dupes]

    # 型の強制
    for col in ["open", "high", "low", "close", "volume"]:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    return df

ありえないローソク足の検出

def find_impossible_candles(df: pd.DataFrame) -> pd.DataFrame:
    """OHLC の制約に違反するローソク足を見つけます。"""
    issues = pd.DataFrame(index=df.index)
    issues["high_lt_low"] = df["high"] < df["low"]
    issues["high_lt_open"] = df["high"] < df["open"]
    issues["high_lt_close"] = df["high"] < df["close"]
    issues["low_gt_open"] = df["low"] > df["open"]
    issues["low_gt_close"] = df["low"] > df["close"]
    issues["negative_price"] = (df[["open", "high", "low", "close"]] < 0).any(axis=1)
    issues["negative_volume"] = df["volume"] < 0
    issues["any_issue"] = issues.any(axis=1)
    return issues[issues["any_issue"]]

ギャップ処理

暗号通貨は 24 時間 365 日取引されますが、API の停止、低い流動性、またはアグリゲーターのダウンタイムにより、ギャップが発生することがあります。

ギャップの検出

def detect_gaps(df: pd.DataFrame, expected_freq: str = "1min") -> pd.Series:
    """予想される頻度に基づいて、欠落しているタイムスタンプを見つけます。"""
    full_index = pd.date_range(
        start=df.index.min(), end=df.index.max(), freq=expected_freq, tz="UTC"
    )
    missing = full_index.difference(df.index)
    return missing

ギャップの埋め込み

def handle_gaps(
    df: pd.DataFrame,
    freq: str = "1min",
    method: str = "ffill",
    max_gap: int = 5,
) -> pd.DataFrame:
    """OHLCV データのギャップを埋めます。

    Args:
        df: DatetimeIndex を持つ OHLCV DataFrame。
        freq: 予想されるバーの頻度。
        method: 'ffill' (前方埋め) または 'interpolate'。
        max_gap: 埋める最大連続バー数。これより大きいギャップは NaN のままになります。
    """
    full_index = pd.date_range(
        start=df.index.min(), end=df.index.max(), freq=freq, tz="UTC"
    )
    df = df.reindex(full_index)
    df.index.name = "timestamp"

    # どのバーが埋められたかをマーク
    df["is_filled"] = df["close"].isna()

    if method == "ffill":
        # OHLC を前方埋め (フラットローソク足)、ボリュームはゼロ
        df[["open", "high", "low", "close"]] = (
            df[["open", "high", "low", "close"]].ffill(limit=max_gap)
        )
        df["volume"] = df["volume"].fillna(0)
    elif method == "interpolate":
        df[["open", "high", "low", "close"]] = (
            df[["open", "high", "low", "close"]].interpolate(
                method="time", limit=max_gap
            )
        )
        df["volume"] = df["volume"].fillna(0)

    return df

異常検出

完全な異常分類については、references/data_quality.md を参照してください。

価格スパイクの検出

def detect_price_spikes(
    df: pd.DataFrame, window: int = 20, threshold: float = 3.0
) -> pd.Series:
    """リターンが threshold * ローリング std を超えるバーにフラグを立てます。"""
    returns = df["close"].pct_change()
    rolling_std = returns.rolling(window, min_periods=5).std()
    spike = returns.abs() > (threshold * rolling_std)
    return spike.fillna(False)

📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

OHLCV Processing — Market Data Preparation

Clean, consistent OHLCV data is the foundation of every trading analysis. Garbage in, garbage out — a single anomalous candle can trigger false signals, corrupt indicator calculations, and produce misleading backtest results. This skill covers the full data preparation pipeline: validation, cleaning, resampling, normalization, and multi-source merging.

Why this matters: Crypto OHLCV data is messier than traditional markets. 24/7 trading means no official close, DEX aggregators disagree on prices, low-liquidity tokens produce impossible candles, and API outages create gaps. Every analysis workflow should start with this pipeline.

Quick Start

1. Install Dependencies

uv pip install pandas numpy httpx

2. Standard OHLCV DataFrame Format

All processing functions expect this canonical format:

import pandas as pd

# Canonical OHLCV DataFrame
# - DatetimeIndex in UTC
# - Columns: open, high, low, close, volume (lowercase)
# - Sorted ascending by timestamp
# - No duplicate timestamps

df = pd.DataFrame({
    "open": [1.10, 1.12, 1.11],
    "high": [1.15, 1.14, 1.13],
    "low": [1.08, 1.10, 1.09],
    "close": [1.12, 1.11, 1.12],
    "volume": [50000, 48000, 52000],
}, index=pd.to_datetime([
    "2025-01-01 00:00:00",
    "2025-01-01 00:01:00",
    "2025-01-01 00:02:00",
], utc=True))
df.index.name = "timestamp"

3. Full Processing Pipeline

import pandas as pd
import numpy as np

def process_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    """Run complete OHLCV processing pipeline."""
    df = standardize_columns(df)
    df = validate_ohlcv(df)
    df = handle_gaps(df, method="ffill")
    df = detect_and_flag_anomalies(df)
    return df

Data Validation

Column Checks

REQUIRED_COLUMNS = {"open", "high", "low", "close", "volume"}

def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Normalize column names to lowercase standard."""
    df.columns = df.columns.str.lower().str.strip()
    # Common renames
    rename_map = {"vol": "volume", "v": "volume", "o": "open",
                  "h": "high", "l": "low", "c": "close"}
    df = df.rename(columns=rename_map)
    missing = REQUIRED_COLUMNS - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns: {missing}")
    return df[["open", "high", "low", "close", "volume"]]

Structural Validation

def validate_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    """Validate OHLCV structural integrity."""
    # Ensure DatetimeIndex in UTC
    if not isinstance(df.index, pd.DatetimeIndex):
        df.index = pd.to_datetime(df.index, utc=True)
    if df.index.tz is None:
        df.index = df.index.tz_localize("UTC")

    # Sort and deduplicate
    df = df.sort_index()
    dupes = df.index.duplicated(keep="last")
    if dupes.any():
        print(f"Warning: Removed {dupes.sum()} duplicate timestamps")
        df = df[~dupes]

    # Type enforcement
    for col in ["open", "high", "low", "close", "volume"]:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    return df

Impossible Candle Detection

def find_impossible_candles(df: pd.DataFrame) -> pd.DataFrame:
    """Find candles that violate OHLC constraints."""
    issues = pd.DataFrame(index=df.index)
    issues["high_lt_low"] = df["high"] < df["low"]
    issues["high_lt_open"] = df["high"] < df["open"]
    issues["high_lt_close"] = df["high"] < df["close"]
    issues["low_gt_open"] = df["low"] > df["open"]
    issues["low_gt_close"] = df["low"] > df["close"]
    issues["negative_price"] = (df[["open", "high", "low", "close"]] < 0).any(axis=1)
    issues["negative_volume"] = df["volume"] < 0
    issues["any_issue"] = issues.any(axis=1)
    return issues[issues["any_issue"]]

Gap Handling

Crypto trades 24/7, but gaps still occur from API outages, low liquidity, or aggregator downtime.

Detect Gaps

def detect_gaps(df: pd.DataFrame, expected_freq: str = "1min") -> pd.Series:
    """Find missing timestamps based on expected frequency."""
    full_index = pd.date_range(
        start=df.index.min(), end=df.index.max(), freq=expected_freq, tz="UTC"
    )
    missing = full_index.difference(df.index)
    return missing

Fill Gaps

def handle_gaps(
    df: pd.DataFrame,
    freq: str = "1min",
    method: str = "ffill",
    max_gap: int = 5,
) -> pd.DataFrame:
    """Fill gaps in OHLCV data.

    Args:
        df: OHLCV DataFrame with DatetimeIndex.
        freq: Expected bar frequency.
        method: 'ffill' (forward fill) or 'interpolate'.
        max_gap: Maximum consecutive bars to fill. Larger gaps are left as NaN.
    """
    full_index = pd.date_range(
        start=df.index.min(), end=df.index.max(), freq=freq, tz="UTC"
    )
    df = df.reindex(full_index)
    df.index.name = "timestamp"

    # Mark which bars were filled
    df["is_filled"] = df["close"].isna()

    if method == "ffill":
        # Forward fill OHLC (flat candle), zero volume
        df[["open", "high", "low", "close"]] = (
            df[["open", "high", "low", "close"]].ffill(limit=max_gap)
        )
        df["volume"] = df["volume"].fillna(0)
    elif method == "interpolate":
        df[["open", "high", "low", "close"]] = (
            df[["open", "high", "low", "close"]].interpolate(
                method="time", limit=max_gap
            )
        )
        df["volume"] = df["volume"].fillna(0)

    return df

Anomaly Detection

See references/data_quality.md for the complete anomaly taxonomy.

Price Spike Detection

def detect_price_spikes(
    df: pd.DataFrame, window: int = 20, threshold: float = 3.0
) -> pd.Series:
    """Flag bars where return exceeds threshold * rolling std."""
    returns = df["close"].pct_change()
    rolling_std = returns.rolling(window, min_periods=5).std()
    spike = returns.abs() > (threshold * rolling_std)
    return spike.fillna(False)

Zero Volume Detection

def detect_zero_volume(df: pd.DataFrame, min_volume: float = 0) -> pd.Series:
    """Flag bars with zero or below-minimum volume."""
    return df["volume"] <= min_volume

Composite Anomaly Flagging

def flag_anomalies(df: pd.DataFrame) -> pd.DataFrame:
    """Add anomaly flag columns to DataFrame."""
    df["anomaly_spike"] = detect_price_spikes(df)
    df["anomaly_zero_vol"] = detect_zero_volume(df)
    impossible = find_impossible_candles(df)
    df["anomaly_impossible"] = False
    if not impossible.empty:
        df.loc[impossible.index, "anomaly_impossible"] = True
    df["anomaly_any"] = (
        df["anomaly_spike"] | df["anomaly_zero_vol"] | df["anomaly_impossible"]
    )
    return df

Resampling

See references/resampling_guide.md for detailed guidance.

Standard Resample

OHLCV_RESAMPLE_RULES = {
    "open": "first",
    "high": "max",
    "low": "min",
    "close": "last",
    "volume": "sum",
}

def resample_ohlcv(df: pd.DataFrame, target_freq: str) -> pd.DataFrame:
    """Resample OHLCV to a coarser timeframe.

    Args:
        df: OHLCV DataFrame (must be finer than target_freq).
        target_freq: Pandas frequency string ('5min', '15min', '1h', '4h', '1D').

    Returns:
        Resampled OHLCV DataFrame with no NaN rows.
    """
    ohlcv_cols = ["open", "high", "low", "close", "volume"]
    resampled = df[ohlcv_cols].resample(target_freq).agg(OHLCV_RESAMPLE_RULES)
    return resampled.dropna(subset=["close"])

Common Timeframe Ladder

TIMEFRAME_LADDER = ["1min", "5min", "15min", "1h", "4h", "1D"]

def resample_ladder(df: pd.DataFrame) -> dict[str, pd.DataFrame]:
    """Resample 1-minute data to all standard timeframes."""
    results = {"1min": df.copy()}
    for tf in TIMEFRAME_LADDER[1:]:
        results[tf] = resample_ohlcv(df, tf)
    return results

VWAP Calculation

def compute_vwap(df: pd.DataFrame) -> pd.Series:
    """Compute cumulative VWAP over the DataFrame."""
    typical_price = (df["high"] + df["low"] + df["close"]) / 3
    cum_vol = df["volume"].cumsum()
    cum_tp_vol = (typical_price * df["volume"]).cumsum()
    return cum_tp_vol / cum_vol

Normalization

def normalize_prices(
    df: pd.DataFrame, method: str = "returns"
) -> pd.DataFrame:
    """Normalize OHLCV price columns.

    Methods:
        'returns' — Percentage returns (close-to-close).
        'log_returns' — Log returns.
        'minmax' — Min-max scale to [0, 1].
        'zscore' — Z-score normalization.
    """
    price_cols = ["open", "high", "low", "close"]
    result = df.copy()

    if method == "returns":
        for col in price_cols:
            result[f"{col}_ret"] = result[col].pct_change()
    elif method == "log_returns":
        for col in price_cols:
            result[f"{col}_logret"] = np.log(result[col] / result[col].shift(1))
    elif method == "minmax":
        for col in price_cols:
            cmin, cmax = result[col].min(), result[col].max()
            result[f"{col}_norm"] = (result[col] - cmin) / (cmax - cmin)
    elif method == "zscore":
        for col in price_cols:
            result[f"{col}_z"] = (
                (result[col] - result[col].mean()) / result[col].std()
            )
    return result

Multi-Source Merging

When combining data from multiple sources (e.g., Birdeye + DexScreener), timestamps may not align and prices may differ due to different DEX aggregation.

def merge_ohlcv_sources(
    primary: pd.DataFrame,
    secondary: pd.DataFrame,
    tolerance: str = "30s",
) -> pd.DataFrame:
    """Merge two OHLCV sources, preferring the higher-volume source per bar.

    Args:
        primary: First OHLCV source.
        secondary: Second OHLCV source.
        tolerance: Maximum time difference for alignment.
    """
    merged = pd.merge_asof(
        primary.sort_index(),
        secondary.sort_index(),
        left_index=True, right_index=True,
        tolerance=pd.Timedelta(tolerance),
        suffixes=("_pri", "_sec"),
    )
    # Use higher-volume source per bar
    use_secondary = merged["volume_sec"] > merged["volume_pri"]
    for col in ["open", "high", "low", "close", "volume"]:
        merged[col] = np.where(
            use_secondary, merged[f"{col}_sec"], merged[f"{col}_pri"]
        )
    merged["source"] = np.where(use_secondary, "secondary", "primary")
    return merged[["open", "high", "low", "close", "volume", "source"]]

Timezone Handling

Standard: Always store and process in UTC. Convert only for display.

def ensure_utc(df: pd.DataFrame) -> pd.DataFrame:
    """Ensure DatetimeIndex is UTC."""
    if df.index.tz is None:
        df.index = df.index.tz_localize("UTC")
    elif str(df.index.tz) != "UTC":
        df.index = df.index.tz_convert("UTC")
    return df

Data Quality Report

def quality_report(df: pd.DataFrame) -> dict:
    """Generate a data quality summary."""
    total = len(df)
    return {
        "total_bars": total,
        "date_range": f"{df.index.min()} → {df.index.max()}",
        "missing_values": int(df[["open", "high", "low", "close"]].isna().sum().sum()),
        "zero_volume_bars": int((df["volume"] == 0).sum()),
        "impossible_candles": int((df["high"] < df["low"]).sum()),
        "duplicate_timestamps": int(df.index.duplicated().sum()),
        "negative_prices": int((df[["open", "high", "low", "close"]] < 0).any(axis=1).sum()),
        "completeness_pct": round((1 - df["close"].isna().mean()) * 100, 2),
    }

Files

References

  • references/data_quality.md — Anomaly types, detection methods, correction strategies, crypto-specific data issues
  • references/resampling_guide.md — Resample rules, timeframe use cases, partial bar handling, VWAP resampling, multi-timeframe alignment

Scripts

  • scripts/process_ohlcv.py — Full processing pipeline: validate, clean, resample, normalize with anomaly reporting (run with --demo for synthetic data)
  • scripts/merge_sources.py — Multi-source OHLCV merging with conflict resolution and discrepancy reporting (run with --demo)