jpskill.com
🛠️ 開発・MCP コミュニティ

airflow

Apache Airflowは、ワークフローの作成、スケジュール管理、監視を自動化するプラットフォームで、DAGの記述、オペレーターの利用、接続設定、スケジュール設定、Docker Composeでのデプロイなどを習得するSkill。

📜 元の英語説明(参考)

Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. Learn to write DAGs, use operators, set up connections, configure scheduling, and deploy with Docker Compose.

🇯🇵 日本人クリエイター向け解説

一言でいうと

Apache Airflowは、ワークフローの作成、スケジュール管理、監視を自動化するプラットフォームで、DAGの記述、オペレーターの利用、接続設定、スケジュール設定、Docker Composeでのデプロイなどを習得するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o airflow.zip https://jpskill.com/download/14621.zip && unzip -o airflow.zip && rm airflow.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14621.zip -OutFile "$d\airflow.zip"; Expand-Archive "$d\airflow.zip" -DestinationPath $d -Force; ri "$d\airflow.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して airflow.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → airflow フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

Airflow

Apache Airflow を使用すると、ワークフローを Python で有向非巡回グラフ (DAG) として定義できます。各 DAG は、依存関係によって接続されたタスクで構成され、Web UI を介してスケジュールおよび監視されます。

インストール

# docker-compose.yml: LocalExecutor を使用した Airflow (簡略化)
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-data:/var/lib/postgresql/data

  airflow-webserver:
    image: apache/airflow:2.9.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__WEBSERVER__SECRET_KEY: changeme
    volumes:
      - ./dags:/opt/airflow/dags
    ports:
      - "8080:8080"
    command: bash -c "airflow db migrate && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow webserver"

  airflow-scheduler:
    image: apache/airflow:2.9.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
    command: airflow scheduler

volumes:
  postgres-data:
# Airflow の起動
docker compose up -d
# UI は http://localhost:8080 (admin/admin) でアクセスできます

基本的な DAG

# dags/hello_world.py: PythonOperator を使用したシンプルな DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='hello_world',
    default_args=default_args,
    description='A simple hello world DAG',
    schedule='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    def extract(**kwargs):
        import requests
        data = requests.get('https://api.example.com/data').json()
        kwargs['ti'].xcom_push(key='raw_data', value=data)

    def transform(**kwargs):
        data = kwargs['ti'].xcom_pull(key='raw_data', task_ids='extract')
        transformed = [{'id': d['id'], 'value': d['amount'] * 100} for d in data]
        kwargs['ti'].xcom_push(key='transformed', value=transformed)

    extract_task = PythonOperator(task_id='extract', python_callable=extract)
    transform_task = PythonOperator(task_id='transform', python_callable=transform)
    load_task = BashOperator(task_id='load', bash_command='echo "Loading data..."')

    extract_task >> transform_task >> load_task

TaskFlow API

# dags/taskflow_etl.py: デコレーターを使用した最新の TaskFlow API
from datetime import datetime
from airflow.decorators import dag, task

@dag(
    schedule='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['etl'],
)
def taskflow_etl():

    @task()
    def extract():
        return {'users': 100, 'revenue': 50000}

    @task()
    def transform(data: dict):
        return {
            'users': data['users'],
            'avg_revenue': data['revenue'] / data['users'],
        }

    @task()
    def load(summary: dict):
        print(f"Users: {summary['users']}, Avg Revenue: {summary['avg_revenue']}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

taskflow_etl()

一般的な Operator

# dags/operators_demo.py: さまざまな operator の例
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.sensors.filesystem import FileSensor

# SQL 実行
create_table = PostgresOperator(
    task_id='create_table',
    postgres_conn_id='my_postgres',
    sql="""
        CREATE TABLE IF NOT EXISTS daily_stats (
            date DATE PRIMARY KEY,
            total_users INT,
            revenue NUMERIC
        );
    """,
)

# HTTP リクエスト
fetch_api = SimpleHttpOperator(
    task_id='fetch_api',
    http_conn_id='my_api',
    endpoint='/api/stats',
    method='GET',
    response_filter=lambda r: r.json(),
)

# ファイルの待機
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/incoming/report.csv',
    poke_interval=60,
    timeout=3600,
)

Connections と Variables

# connections.sh: CLI を介した connections の設定
airflow connections add 'my_postgres' \
  --conn-type 'postgres' \
  --conn-host 'localhost' \
  --conn-schema 'mydb' \
  --conn-login 'user' \
  --conn-password 'pass' \
  --conn-port 5432

# variables の設定
airflow variables set 'api_key' 'abc123'
airflow variables set 'config' '{"batch_size": 1000}' --serialize-json

# DAG のトリガー
airflow dags trigger hello_world --conf '{"date": "2026-02-19"}'
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Airflow

Apache Airflow lets you define workflows as Directed Acyclic Graphs (DAGs) in Python. Each DAG consists of tasks connected by dependencies, scheduled and monitored via a web UI.

Installation

# docker-compose.yml: Airflow with LocalExecutor (simplified)
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-data:/var/lib/postgresql/data

  airflow-webserver:
    image: apache/airflow:2.9.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__WEBSERVER__SECRET_KEY: changeme
    volumes:
      - ./dags:/opt/airflow/dags
    ports:
      - "8080:8080"
    command: bash -c "airflow db migrate && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow webserver"

  airflow-scheduler:
    image: apache/airflow:2.9.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
    command: airflow scheduler

volumes:
  postgres-data:
# Start Airflow
docker compose up -d
# UI at http://localhost:8080 (admin/admin)

Basic DAG

# dags/hello_world.py: Simple DAG with PythonOperator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='hello_world',
    default_args=default_args,
    description='A simple hello world DAG',
    schedule='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    def extract(**kwargs):
        import requests
        data = requests.get('https://api.example.com/data').json()
        kwargs['ti'].xcom_push(key='raw_data', value=data)

    def transform(**kwargs):
        data = kwargs['ti'].xcom_pull(key='raw_data', task_ids='extract')
        transformed = [{'id': d['id'], 'value': d['amount'] * 100} for d in data]
        kwargs['ti'].xcom_push(key='transformed', value=transformed)

    extract_task = PythonOperator(task_id='extract', python_callable=extract)
    transform_task = PythonOperator(task_id='transform', python_callable=transform)
    load_task = BashOperator(task_id='load', bash_command='echo "Loading data..."')

    extract_task >> transform_task >> load_task

TaskFlow API

# dags/taskflow_etl.py: Modern TaskFlow API with decorators
from datetime import datetime
from airflow.decorators import dag, task

@dag(
    schedule='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['etl'],
)
def taskflow_etl():

    @task()
    def extract():
        return {'users': 100, 'revenue': 50000}

    @task()
    def transform(data: dict):
        return {
            'users': data['users'],
            'avg_revenue': data['revenue'] / data['users'],
        }

    @task()
    def load(summary: dict):
        print(f"Users: {summary['users']}, Avg Revenue: {summary['avg_revenue']}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

taskflow_etl()

Common Operators

# dags/operators_demo.py: Various operator examples
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.sensors.filesystem import FileSensor

# SQL execution
create_table = PostgresOperator(
    task_id='create_table',
    postgres_conn_id='my_postgres',
    sql="""
        CREATE TABLE IF NOT EXISTS daily_stats (
            date DATE PRIMARY KEY,
            total_users INT,
            revenue NUMERIC
        );
    """,
)

# HTTP request
fetch_api = SimpleHttpOperator(
    task_id='fetch_api',
    http_conn_id='my_api',
    endpoint='/api/stats',
    method='GET',
    response_filter=lambda r: r.json(),
)

# Wait for file
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/incoming/report.csv',
    poke_interval=60,
    timeout=3600,
)

Connections and Variables

# connections.sh: Set up connections via CLI
airflow connections add 'my_postgres' \
  --conn-type 'postgres' \
  --conn-host 'localhost' \
  --conn-schema 'mydb' \
  --conn-login 'user' \
  --conn-password 'pass' \
  --conn-port 5432

# Set variables
airflow variables set 'api_key' 'abc123'
airflow variables set 'config' '{"batch_size": 1000}' --serialize-json

# Trigger a DAG
airflow dags trigger hello_world --conf '{"date": "2026-02-19"}'