jpskill.com
🛠️ 開発・MCP コミュニティ

gcp-bigquery

Google BigQueryを活用し、ペタバイト級の膨大なデータをSQLで分析、リアルタイムでのデータ投入や機械学習モデルの構築まで、データ分析と活用をSQLだけで完結させるSkill。

📜 元の英語説明(参考)

Analyze massive datasets with Google BigQuery. Run SQL queries on petabytes of data, load and stream data in real-time, create materialized views, and use BigQuery ML for machine learning models directly in SQL.

🇯🇵 日本人クリエイター向け解説

一言でいうと

Google BigQueryを活用し、ペタバイト級の膨大なデータをSQLで分析、リアルタイムでのデータ投入や機械学習モデルの構築まで、データ分析と活用をSQLだけで完結させるSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o gcp-bigquery.zip https://jpskill.com/download/14926.zip && unzip -o gcp-bigquery.zip && rm gcp-bigquery.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14926.zip -OutFile "$d\gcp-bigquery.zip"; Expand-Archive "$d\gcp-bigquery.zip" -DestinationPath $d -Force; ri "$d\gcp-bigquery.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して gcp-bigquery.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → gcp-bigquery フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

GCP BigQuery

概要

Google BigQuery は、サーバーレスのペタバイト規模のデータウェアハウスです。インフラストラクチャの管理は不要で、大規模なデータセットに対して SQL クエリを数秒で実行できます。実行したクエリと保存したデータに対してのみ料金が発生します。

手順

コアコンセプト

  • Dataset — プロジェクトとリージョンをスコープとする、テーブルのコンテナ
  • Table — スキーマを持つ構造化データ(ネイティブ、外部、またはビュー)
  • Partitioned Table — クエリのパフォーマンス向上のために日付/整数でデータを分割
  • Clustered Table — さらなる最適化のためにパーティション内のデータをソート
  • Streaming Insert — リアルタイムのデータ取り込み
  • BigQuery ML — SQL を使用して ML モデルをトレーニングおよび予測

Dataset と Table

# Dataset を作成
bq mk --dataset --location=US my_project:analytics
# パーティション分割およびクラスタ化されたテーブルを作成
bq mk --table \
  --time_partitioning_field created_at \
  --time_partitioning_type DAY \
  --clustering_fields user_id,event_type \
  --schema 'event_id:STRING,user_id:STRING,event_type:STRING,payload:JSON,created_at:TIMESTAMP' \
  analytics.events
-- SQL DDL でテーブルを作成
CREATE TABLE `my_project.analytics.page_views` (
  session_id STRING NOT NULL,
  user_id STRING,
  page_url STRING,
  referrer STRING,
  duration_ms INT64,
  created_at TIMESTAMP NOT NULL
)
PARTITION BY DATE(created_at)
CLUSTER BY user_id, page_url
OPTIONS (
  partition_expiration_days = 365,
  description = 'Page view events'
);

データのロード

# ローカルファイルから CSV をロード
bq load --source_format=CSV --autodetect \
  analytics.customers ./customers.csv
# Cloud Storage からロード (JSON)
bq load --source_format=NEWLINE_DELIMITED_JSON \
  --autodetect \
  analytics.events \
  gs://my-data-bucket/events/2024-01-*.json
# GCS から Parquet をロード (最も効率的な形式)
bq load --source_format=PARQUET \
  analytics.events \
  gs://my-data-bucket/events/2024-01/*.parquet

ストリーミングデータ

# リアルタイムで BigQuery に行をストリーミング
from google.cloud import bigquery

client = bigquery.Client()
table_id = "my_project.analytics.events"

rows = [
    {
        "event_id": "evt-001",
        "user_id": "u-123",
        "event_type": "purchase",
        "payload": '{"amount": 49.99, "currency": "USD"}',
        "created_at": "2024-01-15T10:30:00Z"
    },
    {
        "event_id": "evt-002",
        "user_id": "u-456",
        "event_type": "page_view",
        "payload": '{"url": "/products/widget"}',
        "created_at": "2024-01-15T10:30:01Z"
    }
]

errors = client.insert_rows_json(table_id, rows)
if errors:
    print(f"Insert errors: {errors}")
else:
    print(f"Inserted {len(rows)} rows")

クエリ

-- パーティションプルーニングを使用したクエリ (関連するパーティションのみをスキャン)
SELECT
  user_id,
  event_type,
  COUNT(*) as event_count,
  AVG(CAST(JSON_VALUE(payload, '$.duration_ms') AS INT64)) as avg_duration
FROM `analytics.events`
WHERE created_at BETWEEN '2024-01-01' AND '2024-01-31'
  AND event_type IN ('page_view', 'purchase')
GROUP BY user_id, event_type
ORDER BY event_count DESC
LIMIT 100;
-- ユーザー行動分析のためのウィンドウ関数
SELECT
  user_id,
  event_type,
  created_at,
  LAG(event_type) OVER (PARTITION BY user_id ORDER BY created_at) as prev_event,
  TIMESTAMP_DIFF(
    created_at,
    LAG(created_at) OVER (PARTITION BY user_id ORDER BY created_at),
    SECOND
  ) as seconds_since_last
FROM `analytics.events`
WHERE DATE(created_at) = '2024-01-15'
ORDER BY user_id, created_at;
# CLI からクエリを実行
bq query --use_legacy_sql=false \
  'SELECT COUNT(*) as total FROM `analytics.events` WHERE DATE(created_at) = CURRENT_DATE()'

マテリアライズドビュー

-- 高速なダッシュボードクエリのためのマテリアライズドビューを作成
CREATE MATERIALIZED VIEW `analytics.daily_metrics`
OPTIONS (enable_refresh = true, refresh_interval_minutes = 30)
AS
SELECT
  DATE(created_at) as date,
  event_type,
  COUNT(*) as event_count,
  COUNT(DISTINCT user_id) as unique_users
FROM `analytics.events`
GROUP BY date, event_type;

BigQuery ML

-- 解約を予測するための分類モデルをトレーニング
CREATE OR REPLACE MODEL `analytics.churn_model`
OPTIONS (
  model_type = 'LOGISTIC_REG',
  input_label_cols = ['churned']
) AS
SELECT
  user_id,
  COUNT(*) as total_events,
  COUNT(DISTINCT DATE(created_at)) as active_days,
  MAX(TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), created_at, DAY)) as days_since_last,
  churned
FROM `analytics.user_activity`
GROUP BY user_id, churned;
-- 現在のユーザーの解約を予測
SELECT
  user_id,
  predicted_churned,
  predicted_churned_probs[OFFSET(1)].prob as churn_probability
FROM ML.PREDICT(
  MODEL `analytics.churn_model`,
  (SELECT user_id, total_events, active_days, days_since_last
   FROM `analytics.current_user_stats`)
)
WHERE predicted_churned_probs[OFFSET(1)].prob > 0.7
ORDER BY churn_probability DESC;

BigQuery AI Functions (SQL の Gemini)

BigQuery は Gemini を SQL 関数として直接公開します — Python もオーケストレーションも不要です。BigQuery ML (CREATE MODEL) とは異なり、これらはクエリ時の Gemini への推論呼び出しです。

-- 行ごとにテキストを生成
SELECT
  product_id,
  AI.GENERATE(
    ('Write a one-line product tagline for: ', name, ' — ', description),
    connection_id => 'us.gemini-conn',
    endpoint => 'gemini-2.5-flash'
  ).result AS tagline
FROM `analytics.products`
LIMIT 100;
-- ブール値の分類
SELECT
  review_id, review_text,
  AI.GENERATE_BOOL(
    ('Is this review positive? ', review_text),
    connection_id => 'us.gemini-conn',
    endpoint => 'gemini-2.5-flash'
  ).result AS is_positive
FROM `analytics.product_reviews`;
-- 数値抽出 (例: 自由形式のテキストから価格を抽出)
SELECT
  listing_id, raw_text,
  AI.GENERATE_DOUBLE(
    ('Extract the price in USD from: ', raw_text),
    connection_id => 'us.gemini-conn'
  ).result AS price_usd
FROM `analytics.scraped_listings`;

-- AI.FORECAST を使用した時系列予測 (

(原文がここで切り詰められています)
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

GCP BigQuery

Overview

Google BigQuery is a serverless, petabyte-scale data warehouse. It runs SQL queries across massive datasets in seconds, with no infrastructure to manage. Pay only for queries run and data stored.

Instructions

Core Concepts

  • Dataset — a container for tables, scoped to a project and region
  • Table — structured data with a schema (native, external, or view)
  • Partitioned Table — split data by date/integer for query performance
  • Clustered Table — sort data within partitions for further optimization
  • Streaming Insert — real-time data ingestion
  • BigQuery ML — train and predict with ML models using SQL

Datasets and Tables

# Create a dataset
bq mk --dataset --location=US my_project:analytics
# Create a partitioned and clustered table
bq mk --table \
  --time_partitioning_field created_at \
  --time_partitioning_type DAY \
  --clustering_fields user_id,event_type \
  --schema 'event_id:STRING,user_id:STRING,event_type:STRING,payload:JSON,created_at:TIMESTAMP' \
  analytics.events
-- Create table with SQL DDL
CREATE TABLE `my_project.analytics.page_views` (
  session_id STRING NOT NULL,
  user_id STRING,
  page_url STRING,
  referrer STRING,
  duration_ms INT64,
  created_at TIMESTAMP NOT NULL
)
PARTITION BY DATE(created_at)
CLUSTER BY user_id, page_url
OPTIONS (
  partition_expiration_days = 365,
  description = 'Page view events'
);

Loading Data

# Load CSV from local file
bq load --source_format=CSV --autodetect \
  analytics.customers ./customers.csv
# Load from Cloud Storage (JSON)
bq load --source_format=NEWLINE_DELIMITED_JSON \
  --autodetect \
  analytics.events \
  gs://my-data-bucket/events/2024-01-*.json
# Load Parquet from GCS (most efficient format)
bq load --source_format=PARQUET \
  analytics.events \
  gs://my-data-bucket/events/2024-01/*.parquet

Streaming Data

# Stream rows into BigQuery in real-time
from google.cloud import bigquery

client = bigquery.Client()
table_id = "my_project.analytics.events"

rows = [
    {
        "event_id": "evt-001",
        "user_id": "u-123",
        "event_type": "purchase",
        "payload": '{"amount": 49.99, "currency": "USD"}',
        "created_at": "2024-01-15T10:30:00Z"
    },
    {
        "event_id": "evt-002",
        "user_id": "u-456",
        "event_type": "page_view",
        "payload": '{"url": "/products/widget"}',
        "created_at": "2024-01-15T10:30:01Z"
    }
]

errors = client.insert_rows_json(table_id, rows)
if errors:
    print(f"Insert errors: {errors}")
else:
    print(f"Inserted {len(rows)} rows")

Querying

-- Query with partition pruning (scans only relevant partitions)
SELECT
  user_id,
  event_type,
  COUNT(*) as event_count,
  AVG(CAST(JSON_VALUE(payload, '$.duration_ms') AS INT64)) as avg_duration
FROM `analytics.events`
WHERE created_at BETWEEN '2024-01-01' AND '2024-01-31'
  AND event_type IN ('page_view', 'purchase')
GROUP BY user_id, event_type
ORDER BY event_count DESC
LIMIT 100;
-- Window functions for user journey analysis
SELECT
  user_id,
  event_type,
  created_at,
  LAG(event_type) OVER (PARTITION BY user_id ORDER BY created_at) as prev_event,
  TIMESTAMP_DIFF(
    created_at,
    LAG(created_at) OVER (PARTITION BY user_id ORDER BY created_at),
    SECOND
  ) as seconds_since_last
FROM `analytics.events`
WHERE DATE(created_at) = '2024-01-15'
ORDER BY user_id, created_at;
# Run query from CLI
bq query --use_legacy_sql=false \
  'SELECT COUNT(*) as total FROM `analytics.events` WHERE DATE(created_at) = CURRENT_DATE()'

Materialized Views

-- Create a materialized view for fast dashboard queries
CREATE MATERIALIZED VIEW `analytics.daily_metrics`
OPTIONS (enable_refresh = true, refresh_interval_minutes = 30)
AS
SELECT
  DATE(created_at) as date,
  event_type,
  COUNT(*) as event_count,
  COUNT(DISTINCT user_id) as unique_users
FROM `analytics.events`
GROUP BY date, event_type;

BigQuery ML

-- Train a classification model to predict churn
CREATE OR REPLACE MODEL `analytics.churn_model`
OPTIONS (
  model_type = 'LOGISTIC_REG',
  input_label_cols = ['churned']
) AS
SELECT
  user_id,
  COUNT(*) as total_events,
  COUNT(DISTINCT DATE(created_at)) as active_days,
  MAX(TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), created_at, DAY)) as days_since_last,
  churned
FROM `analytics.user_activity`
GROUP BY user_id, churned;
-- Predict churn for current users
SELECT
  user_id,
  predicted_churned,
  predicted_churned_probs[OFFSET(1)].prob as churn_probability
FROM ML.PREDICT(
  MODEL `analytics.churn_model`,
  (SELECT user_id, total_events, active_days, days_since_last
   FROM `analytics.current_user_stats`)
)
WHERE predicted_churned_probs[OFFSET(1)].prob > 0.7
ORDER BY churn_probability DESC;

BigQuery AI Functions (Gemini in SQL)

BigQuery exposes Gemini directly as SQL functions — no Python, no orchestration. Different from BigQuery ML (CREATE MODEL): these are inference calls into Gemini at query time.

-- Generate text per row
SELECT
  product_id,
  AI.GENERATE(
    ('Write a one-line product tagline for: ', name, ' — ', description),
    connection_id => 'us.gemini-conn',
    endpoint => 'gemini-2.5-flash'
  ).result AS tagline
FROM `analytics.products`
LIMIT 100;
-- Boolean classification
SELECT
  review_id, review_text,
  AI.GENERATE_BOOL(
    ('Is this review positive? ', review_text),
    connection_id => 'us.gemini-conn',
    endpoint => 'gemini-2.5-flash'
  ).result AS is_positive
FROM `analytics.product_reviews`;
-- Numeric extraction (e.g., extract price from free-form text)
SELECT
  listing_id, raw_text,
  AI.GENERATE_DOUBLE(
    ('Extract the price in USD from: ', raw_text),
    connection_id => 'us.gemini-conn'
  ).result AS price_usd
FROM `analytics.scraped_listings`;
-- Time series forecasting with AI.FORECAST (no model training needed)
SELECT * FROM AI.FORECAST(
  TABLE `analytics.daily_revenue`,
  data_col => 'revenue',
  timestamp_col => 'date',
  horizon => 30
);
-- Semantic similarity / search
SELECT
  product_id, name,
  AI.SIMILARITY(
    name,
    'wireless noise-cancelling headphones',
    connection_id => 'us.gemini-conn'
  ).score AS similarity
FROM `analytics.products`
ORDER BY similarity DESC
LIMIT 20;

Set up the connection once with bq mk --connection --location=US --connection_type=CLOUD_RESOURCE gemini-conn and grant the connection's service account roles/aiplatform.user.

Scheduled Queries

# Create a scheduled query
bq mk --transfer_config \
  --data_source=scheduled_query \
  --target_dataset=analytics \
  --display_name="Daily aggregation" \
  --schedule="every 24 hours" \
  --params='{
    "query": "INSERT INTO analytics.daily_summary SELECT DATE(created_at), COUNT(*) FROM analytics.events WHERE DATE(created_at) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) GROUP BY 1",
    "destination_table_name_template": "",
    "write_disposition": "WRITE_APPEND"
  }'

Cost Control

# Dry run to estimate query cost
bq query --dry_run --use_legacy_sql=false \
  'SELECT * FROM `analytics.events` WHERE DATE(created_at) = "2024-01-15"'
# Set maximum bytes billed per query
bq query --maximum_bytes_billed=1000000000 --use_legacy_sql=false \
  'SELECT COUNT(*) FROM `analytics.events`'

Examples

Example 1 — Build a partitioned events table with PII-safe streaming

User has Kafka events landing in GCS as JSON and wants them queryable with sub-second latency. Create a partitioned/clustered table on created_at and user_id, set up a Pub/Sub-to-BigQuery streaming pipeline using client.insert_rows_json, add a 365-day partition expiration, and create a materialized view for the daily dashboard query. Set maximum_bytes_billed on the analyst service account to cap surprise costs.

Example 2 — Add semantic search to a product catalog without an ML pipeline

User wants to add "find similar products" without building a vector store. Use AI.SIMILARITY directly in SQL against the product name column — no embeddings to manage, no separate index. Set up the cloud-resource connection once, grant roles/aiplatform.user, then queries become single SQL statements. For higher scale, persist embeddings into a column with AI.GENERATE_EMBEDDING and use BigQuery's vector index.

Guidelines

  • Always partition tables by date and cluster by frequently filtered columns
  • Use --dry_run to estimate query costs before running expensive queries
  • Avoid SELECT * — query only the columns you need
  • Use materialized views for repeated dashboard queries
  • Stream only when real-time is required; batch load is cheaper
  • Set maximum_bytes_billed to prevent runaway query costs
  • Use Parquet or Avro for bulk loading (faster and cheaper than CSV/JSON)
  • Expire old partitions automatically with partition_expiration_days
  • Prefer AI.GENERATE_* functions over building separate ML pipelines for per-row inference