jpskill.com
🛠️ 開発・MCP コミュニティ

creating-bauplan-pipelines

Creates bauplan data pipeline projects with SQL and Python models. Use when starting a new pipeline, defining DAG transformations, writing models, or setting up bauplan project structure from scratch.

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o creating-bauplan-pipelines.zip https://jpskill.com/download/17480.zip && unzip -o creating-bauplan-pipelines.zip && rm creating-bauplan-pipelines.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/17480.zip -OutFile "$d\creating-bauplan-pipelines.zip"; Expand-Archive "$d\creating-bauplan-pipelines.zip" -DestinationPath $d -Force; ri "$d\creating-bauplan-pipelines.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して creating-bauplan-pipelines.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → creating-bauplan-pipelines フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

新規 Bauplan データパイプラインの作成

このスキルでは、プロジェクトの設定や SQL/Python 変換モデルなど、新規の bauplan データパイプラインプロジェクトをゼロから作成する方法を説明します。

重要: ブランチの安全性

main ブランチでパイプラインを実行しないでください。 常に開発ブランチを使用してください。

ブランチの命名規則: <username>.<branch_name> (例: john.feature-pipeline)。bauplan info であなたのユーザー名を取得してください。正確なコマンドについては、ワークフローチェックリスト を参照してください。

前提条件

パイプラインを作成する前に、以下を確認してください。

  1. 開発ブランチがあること (main ではない)
  2. ソーステーブルが bauplan レイクハウスに存在すること (デフォルトの名前空間は bauplan です)
  3. ソーステーブルのスキーマを理解していること

DAG としてのパイプライン

bauplan パイプラインは、関数 (モデル) の DAG です。主なルール:

  1. モデル: データを変換する SQL または Python 関数
  2. ソーステーブル: 既存のレイクハウステーブル - DAG へのエントリポイント
  3. 入力: 各モデルは bauplan.Model() 参照を介して 複数のテーブル を受け取ることができます
  4. 出力: 各モデルは 正確に 1 つのテーブル を生成します:
    • SQL: 出力名 = ファイル名 (trips.sqltrips)
    • Python: 出力名 = 関数名 (def clean_trips()clean_trips)
  5. トポロジー: 入力参照によって暗黙的に定義されます - bauplan が実行順序を決定します

期待値: テーブルを入力として受け取り、ブール値 を返すデータ品質関数。

DAG の例

[lakehouse: taxi_fhvhv] ──→ [trips.sql] ──→ [clean_trips] ──→ [daily_summary]
                                                ↑
[lakehouse: taxi_zones] ────────────────────────┘

この例では:

  • taxi_fhvhvtaxi_zones はソーステーブルです (すでにレイクハウスに存在します)
  • trips.sqltaxi_fhvhv から読み取ります (SQL モデル、最初のノード)
  • clean_tripstripstaxi_zones を入力として受け取ります (Python モデル、複数の入力)
  • daily_summaryclean_trips を入力として受け取ります (Python モデル、単一の入力)

必要なユーザー入力

パイプラインを作成する前に、ユーザーから以下の情報を必ず収集する必要があります。

  1. パイプラインの目的 (必須): DAG はどのような変換を実行する必要がありますか? ビジネスロジックまたは目標は何ですか?
  2. ソーステーブル (必須): レイクハウスのどのテーブルを入力として使用する必要がありますか? bauplan table get で存在を確認してください
  3. 出力テーブル (必須): パイプラインの最後にどのテーブルを具体化する必要がありますか? これらは、ダウンストリームのコンシューマーに見える最終的な出力です
  4. 具体化戦略 (オプション): 出力テーブルは REPLACE (デフォルト) または APPEND を使用する必要がありますか?
  5. 厳密モード (オプション): パイプラインは厳密モードで実行する必要がありますか? はいの場合、すべての CLI コマンドは --strict フラグを使用します。これにより、ドライラン中の出力列の不一致などの問題が発生した場合にすぐにエラーが発生し、即座にエラーを検出して修正できます。

ユーザーがこの情報を提供していない場合は、実装に進む前に確認してください。

厳密モード (--strict フラグ)

厳密モードが有効になっている場合は、すべての bauplan run コマンドに --strict を追加します:

# 厳密モードなし (デフォルト)
bauplan run --dry-run
bauplan run

# 厳密モードが有効
bauplan run --dry-run --strict
bauplan run --strict

厳密モードの利点:

  • 出力列の不一致が発生した場合にすぐにエラーが発生します
  • 期待値が満たされない場合にすぐにエラーが発生します
  • パイプラインの完了前に宣言エラーを修正できます
  • パイプラインの開発を繰り返す場合に推奨されます

プロジェクト構造

bauplan プロジェクトは、以下を含むフォルダです:

my-project/
  bauplan_project.yml    # 必須: プロジェクト設定
  model.sql              # オプション: 単一の SQL モデル、ファイルごとに 1 つ
  models.py              # オプション: Python モデル (1 つのファイルに複数のモデルを含めることも、複数のファイルに分割することもできます)
  expectations.py        # オプション: データ品質テスト (存在する場合)

bauplan_project.yml

すべてのプロジェクトは個別のフォルダであり、この設定ファイルが必要です:

project:
  id: <unique-uuid>       # 一意の UUID を生成します
  name: <project_name>    # プロジェクトのわかりやすい名前

SQL モデルと Python モデルの使い分け

重要: SQL モデルは、パイプライングラフの 最初のノード のみに限定する必要があります。

  • SQL モデル: レイクハウスのソーステーブル (パイプライングラフの外にあるテーブル) から直接読み取るノードにのみ使用します
  • Python モデル: パイプラインの他のすべての変換に推奨されます

これにより、一貫性が確保され、変換、出力スキーマの検証、およびドキュメントの制御が向上します。

SQL モデル (最初のノードのみ)

SQL モデルは .sql ファイルであり、以下を満たします:

  • ファイル名 が出力テーブル名になります
  • FROM 句 が入力テーブルを定義します
  • オプション: 具体化戦略をコメントとして追加します

SQL モデルは、既存のレイクハウステーブルから読み取る場合にのみ使用します:

-- trips.sql
-- 最初のノード: レイクハウスの taxi_fhvhv テーブルから読み取ります
SELECT
    pickup_datetime,
    PULocationID,
    trip_miles
FROM taxi_fhvhv
WHERE pickup_datetime >= '2022-12-01'

出力テーブル: trips (ファイル名から) 入力テーブル: taxi_fhvhv (FROM 句から、レイクハウスに存在します)

Python モデル (推奨)

Python モデルはデコレータを使用して変換を定義します。レイクハウスから読み取る最初のノードを除く、すべてのパイプラインノードに使用する必要があります。

主要なデコレータ

  • @bauplan.model() - 関数をモデルとして登録します
  • @bauplan.model(columns=[...]) - 検証のために予想される出力列を指定します (オプションですが推奨)
  • @bauplan.model(materialization_strategy='REPLACE') - 出力をレイクハウスに永続化します
  • @bauplan.python('3.11', pip={'pandas': '1.5.3'}) - Python のバージョンとパッケージを指定します

ベストプラクティス: 出力列の検証

重要: 可能な限り、@bauplan.model()columns パラメータを指定して、予想される出力スキーマを定義してください。これにより、モデルの出力の自動検証が可能になります。

まず、ソーステーブルのスキーマを確認して、入力列を理解します。次に、変換に基づいて出力列を指定します

(原文はここで切り詰められています)

📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Creating a New Bauplan Data Pipeline

This skill guides you through creating a new bauplan data pipeline project from scratch, including the project configuration and SQL/Python transformation models.

CRITICAL: Branch Safety

NEVER run pipelines on main branch. Always use a development branch.

Branch naming convention: <username>.<branch_name> (e.g., john.feature-pipeline). Get your username with bauplan info. See Workflow Checklist for exact commands.

Prerequisites

Before creating the pipeline, verify that:

  1. You have a development branch (not main)
  2. Source tables exist in the bauplan lakehouse (the default namespace is bauplan)
  3. You understand the schema of the source tables

Pipeline as a DAG

A bauplan pipeline is a DAG of functions (models). Key rules:

  1. Models: SQL or Python functions that transform data
  2. Source Tables: Existing lakehouse tables - entry points to your DAG
  3. Inputs: Each model can take multiple tables via bauplan.Model() references
  4. Outputs: Each model produces exactly one table:
    • SQL: output name = filename (trips.sqltrips)
    • Python: output name = function name (def clean_trips()clean_trips)
  5. Topology: Implicitly defined by input references - bauplan determines execution order

Expectations: Data quality functions that take tables as input and return a boolean.

Example DAG

[lakehouse: taxi_fhvhv] ──→ [trips.sql] ──→ [clean_trips] ──→ [daily_summary]
                                                ↑
[lakehouse: taxi_zones] ────────────────────────┘

In this example:

  • taxi_fhvhv and taxi_zones are source tables (already in lakehouse)
  • trips.sql reads from taxi_fhvhv (SQL model, first node)
  • clean_trips takes trips and taxi_zones as inputs (Python model, multiple inputs)
  • daily_summary takes clean_trips as input (Python model, single input)

Required User Input

Before writing a pipeline, you MUST gather the following information from the user:

  1. Pipeline purpose (required): What transformations should the DAG perform? What is the business logic or goal?
  2. Source tables (required): Which tables from the lakehouse should be used as inputs? Verify they exist with bauplan table get
  3. Output tables (required): Which tables should be materialized at the end of the pipeline? These are the final outputs visible to downstream consumers
  4. Materialization strategy (optional): Should output tables use REPLACE (default) or APPEND?
  5. Strict mode (optional): Should the pipeline run in strict mode? If yes, all CLI commands will use --strict flag, which fails on issues like output column mismatches during dry-run, allowing immediate error detection and correction.

If the user hasn't provided this information, ask before proceeding with implementation.

Strict Mode (--strict flag)

When strict mode is enabled, append --strict to all bauplan run commands:

# Without strict mode (default)
bauplan run --dry-run
bauplan run

# With strict mode enabled
bauplan run --dry-run --strict
bauplan run --strict

Benefits of strict mode:

  • Fails immediately on output column mismatches
  • Fails immediately if an expectation fails
  • Allows you to rectify declaration errors before pipeline completion
  • Recommended when iterating on pipeline development

Project Structure

A bauplan project is a folder containing:

my-project/
  bauplan_project.yml    # Required: project configuration
  model.sql              # Optional: a single SQL model, one per file
  models.py              # Optional: Python models (one file can have >1 models, or be split into multiple files)
  expectations.py        # Optional: data quality tests (if any)

bauplan_project.yml

Every project is a separate folder which requires this configuration file:

project:
  id: <unique-uuid>       # Generate a unique UUID
  name: <project_name>    # Descriptive name for the project

When to Use SQL vs Python Models

IMPORTANT: SQL models should be LIMITED to first nodes in the pipeline graph only.

  • SQL models: Use ONLY for nodes that read directly from source tables in the lakehouse (tables outside your pipeline graph)
  • Python models: Preferred for ALL other transformations in the pipeline

This ensures consistency and allows for better control over transformations, output schema validation, and documentation.

SQL Models (First Nodes Only)

SQL models are .sql files where:

  • The filename becomes the output table name
  • The FROM clause defines input tables
  • Optional: Add materialization strategy as a comment

Use SQL models only when reading from existing lakehouse tables:

-- trips.sql
-- First node: reads from taxi_fhvhv table in the lakehouse
SELECT
    pickup_datetime,
    PULocationID,
    trip_miles
FROM taxi_fhvhv
WHERE pickup_datetime >= '2022-12-01'

Output table: trips (from filename) Input table: taxi_fhvhv (from FROM clause, exists in lakehouse)

Python Models (Preferred)

Python models use decorators to define transformations. They should be used for all pipeline nodes except first nodes reading from the lakehouse.

Key Decorators

  • @bauplan.model() - Registers function as a model
  • @bauplan.model(columns=[...]) - Specify expected output columns for validation (Optional but recommended)
  • @bauplan.model(materialization_strategy='REPLACE') - Persist output to lakehouse
  • @bauplan.python('3.11', pip={'pandas': '1.5.3'}) - Specify Python version and packages

Best Practice: Output Columns Validation

IMPORTANT: whenever possible, specify the columns parameter in @bauplan.model() to define the expected output schema. This enables automatic validation of your model's output.

First, check the schema of your source tables to understand input columns. Then specify the output columns based on your transformation:

# If input has columns: [id, name, age, city]
# And transformation drops 'city' column
# Then output columns should be: [id, name, age]

@bauplan.model(columns=['id', 'name', 'age'])

Best Practice: Docstrings with Output Schema

IMPORTANT: Every Python model should have a docstring describing the transformation and showing the output table structure as an ASCII table (if the table is too wide, show only key columns, if values are too large, truncate them in the cells).

@bauplan.model(columns=['id', 'name', 'age'])
@bauplan.python('3.11')
def clean_users(data=bauplan.Model('raw_users')):
    """
    Cleans user data by removing invalid entries and dropping the city column.

    | id  | name    | age |
    |-----|---------|-----|
    | 1   | Alice   | 30  |
    | 2   | Bob     | 25  |
    """
    # transformation logic
    return data.drop_columns(['city'])

I/O Pushdown with columns and filter

IMPORTANT: whenever possible, use columns and filter parameters in bauplan.Model() to restrict the data read. This enables I/O pushdown, dramatically reducing the amount of data transferred and improving performance. Do not read columns you don't need.

bauplan.Model(
    'table_name',
    columns=['col1', 'col2', 'col3'],   # Only read these columns
    filter="date >= '2022-01-01'"       # Pre-filter at storage level
)

Whenever possible, specify:

  • columns: List only the columns your model actually needs
  • filter: SQL-like filter expression to restrict rows at the storage level, if appropriate

Basic Python Model

import bauplan

@bauplan.model(
    columns=['pickup_datetime', 'PULocationID', 'trip_miles'],
    materialization_strategy='REPLACE'
)
@bauplan.python('3.11', pip={'polars': '1.15.0'})
def clean_trips(
    # Use columns and filter for I/O pushdown
    data=bauplan.Model(
        'trips',
        columns=['pickup_datetime', 'PULocationID', 'trip_miles'],
        filter="trip_miles > 0"
    )
):
    """
    Filters trips to include only those with positive mileage.

    | pickup_datetime     | PULocationID | trip_miles |
    |---------------------|--------------|------------|
    | 2022-12-01 08:00:00 | 123          | 5.2        |
    """
    import polars as pl

    df = pl.from_arrow(data)
    df = df.filter(pl.col('trip_miles') > 0.0)

    return df.to_arrow()

Python Model with Multiple Inputs

Models can take multiple tables as input - just add more bauplan.Model() parameters:

def model_with_joins(
    table_a=bauplan.Model('source_a', columns=['id', 'value']),
    table_b=bauplan.Model('source_b', columns=['id', 'name'])
):
    # Join, transform, return Arrow table
    return table_a.join(table_b, 'id', 'id')

See examples.md for complete multi-input examples with Polars.

Workflow Checklist

Copy this checklist and track your progress:

Pipeline Creation Progress:
- [ ] Step 1: Get username → bauplan info
- [ ] Step 2: Checkout main → bauplan branch checkout main
- [ ] Step 3: Create dev branch → bauplan branch create <username>.<branch_name>
- [ ] Step 4: Checkout dev branch → bauplan branch checkout <username>.<branch_name>
- [ ] Step 5: Verify source tables → bauplan table get <namespace>.<table_name>, Optional for data preview: bauplan query "SELECT * FROM <namespace>.<table_name> LIMIT 3"
- [ ] Step 6: Create project folder with bauplan_project.yml
- [ ] Step 7: Write SQL model(s) / Python model(s) for transformations respecting the guidelines
- [ ] Step 8: Verify materialization decorators (see Materialization Checklist below)
- [ ] Step 9: Dry run → bauplan run --dry-run [--strict if strict mode]
- [ ] Step 10: Run pipeline → bauplan run [--strict if strict mode]

CRITICAL: Never run on main branch. Steps 2-4 ensure you're on a development branch.

Materialization Checklist

After writing models, verify that each model has the correct materialization_strategy based on user requirements:

Model Type No Materialization (intermediate) Materialized Output
Python @bauplan.model() (no strategy) @bauplan.model(materialization_strategy='REPLACE') or 'APPEND'
SQL No comment needed Add comment: -- bauplan: materialization_strategy=REPLACE or APPEND

Verify for each model:

  • [ ] Intermediate tables (not final outputs): NO materialization_strategy specified
  • [ ] Final output tables requested by user: materialization_strategy='REPLACE' (default) or 'APPEND'
  • [ ] If user specified APPEND for any table: confirm materialization_strategy='APPEND' is set

Example Python decorator for materialized output:

@bauplan.model(materialization_strategy='REPLACE', columns=['col1', 'col2'])

Example SQL comment for materialized output:

-- bauplan: materialization_strategy=REPLACE
SELECT * FROM source_table

Advanced Examples

See examples.md for:

  • APPEND materialization strategy
  • DuckDB queries in Python models
  • Data quality expectations
  • Multi-stage pipelines