airflow
Apache Airflowは、ワークフローの作成、スケジュール管理、監視を自動化するプラットフォームで、DAGの記述、オペレーターの利用、接続設定、スケジュール設定、Docker Composeでのデプロイなどを習得するSkill。
📜 元の英語説明(参考)
Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. Learn to write DAGs, use operators, set up connections, configure scheduling, and deploy with Docker Compose.
🇯🇵 日本人クリエイター向け解説
Apache Airflowは、ワークフローの作成、スケジュール管理、監視を自動化するプラットフォームで、DAGの記述、オペレーターの利用、接続設定、スケジュール設定、Docker Composeでのデプロイなどを習得するSkill。
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o airflow.zip https://jpskill.com/download/14621.zip && unzip -o airflow.zip && rm airflow.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14621.zip -OutFile "$d\airflow.zip"; Expand-Archive "$d\airflow.zip" -DestinationPath $d -Force; ri "$d\airflow.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
airflow.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
airflowフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
Airflow
Apache Airflow を使用すると、ワークフローを Python で有向非巡回グラフ (DAG) として定義できます。各 DAG は、依存関係によって接続されたタスクで構成され、Web UI を介してスケジュールおよび監視されます。
インストール
# docker-compose.yml: LocalExecutor を使用した Airflow (簡略化)
services:
postgres:
image: postgres:16
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-data:/var/lib/postgresql/data
airflow-webserver:
image: apache/airflow:2.9.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__WEBSERVER__SECRET_KEY: changeme
volumes:
- ./dags:/opt/airflow/dags
ports:
- "8080:8080"
command: bash -c "airflow db migrate && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow webserver"
airflow-scheduler:
image: apache/airflow:2.9.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
command: airflow scheduler
volumes:
postgres-data:
# Airflow の起動
docker compose up -d
# UI は http://localhost:8080 (admin/admin) でアクセスできます
基本的な DAG
# dags/hello_world.py: PythonOperator を使用したシンプルな DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='hello_world',
default_args=default_args,
description='A simple hello world DAG',
schedule='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['example'],
) as dag:
def extract(**kwargs):
import requests
data = requests.get('https://api.example.com/data').json()
kwargs['ti'].xcom_push(key='raw_data', value=data)
def transform(**kwargs):
data = kwargs['ti'].xcom_pull(key='raw_data', task_ids='extract')
transformed = [{'id': d['id'], 'value': d['amount'] * 100} for d in data]
kwargs['ti'].xcom_push(key='transformed', value=transformed)
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = BashOperator(task_id='load', bash_command='echo "Loading data..."')
extract_task >> transform_task >> load_task
TaskFlow API
# dags/taskflow_etl.py: デコレーターを使用した最新の TaskFlow API
from datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['etl'],
)
def taskflow_etl():
@task()
def extract():
return {'users': 100, 'revenue': 50000}
@task()
def transform(data: dict):
return {
'users': data['users'],
'avg_revenue': data['revenue'] / data['users'],
}
@task()
def load(summary: dict):
print(f"Users: {summary['users']}, Avg Revenue: {summary['avg_revenue']}")
raw = extract()
transformed = transform(raw)
load(transformed)
taskflow_etl()
一般的な Operator
# dags/operators_demo.py: さまざまな operator の例
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.sensors.filesystem import FileSensor
# SQL 実行
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='my_postgres',
sql="""
CREATE TABLE IF NOT EXISTS daily_stats (
date DATE PRIMARY KEY,
total_users INT,
revenue NUMERIC
);
""",
)
# HTTP リクエスト
fetch_api = SimpleHttpOperator(
task_id='fetch_api',
http_conn_id='my_api',
endpoint='/api/stats',
method='GET',
response_filter=lambda r: r.json(),
)
# ファイルの待機
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/incoming/report.csv',
poke_interval=60,
timeout=3600,
)
Connections と Variables
# connections.sh: CLI を介した connections の設定
airflow connections add 'my_postgres' \
--conn-type 'postgres' \
--conn-host 'localhost' \
--conn-schema 'mydb' \
--conn-login 'user' \
--conn-password 'pass' \
--conn-port 5432
# variables の設定
airflow variables set 'api_key' 'abc123'
airflow variables set 'config' '{"batch_size": 1000}' --serialize-json
# DAG のトリガー
airflow dags trigger hello_world --conf '{"date": "2026-02-19"}' 📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
Airflow
Apache Airflow lets you define workflows as Directed Acyclic Graphs (DAGs) in Python. Each DAG consists of tasks connected by dependencies, scheduled and monitored via a web UI.
Installation
# docker-compose.yml: Airflow with LocalExecutor (simplified)
services:
postgres:
image: postgres:16
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-data:/var/lib/postgresql/data
airflow-webserver:
image: apache/airflow:2.9.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__WEBSERVER__SECRET_KEY: changeme
volumes:
- ./dags:/opt/airflow/dags
ports:
- "8080:8080"
command: bash -c "airflow db migrate && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow webserver"
airflow-scheduler:
image: apache/airflow:2.9.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
command: airflow scheduler
volumes:
postgres-data:
# Start Airflow
docker compose up -d
# UI at http://localhost:8080 (admin/admin)
Basic DAG
# dags/hello_world.py: Simple DAG with PythonOperator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='hello_world',
default_args=default_args,
description='A simple hello world DAG',
schedule='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['example'],
) as dag:
def extract(**kwargs):
import requests
data = requests.get('https://api.example.com/data').json()
kwargs['ti'].xcom_push(key='raw_data', value=data)
def transform(**kwargs):
data = kwargs['ti'].xcom_pull(key='raw_data', task_ids='extract')
transformed = [{'id': d['id'], 'value': d['amount'] * 100} for d in data]
kwargs['ti'].xcom_push(key='transformed', value=transformed)
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = BashOperator(task_id='load', bash_command='echo "Loading data..."')
extract_task >> transform_task >> load_task
TaskFlow API
# dags/taskflow_etl.py: Modern TaskFlow API with decorators
from datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['etl'],
)
def taskflow_etl():
@task()
def extract():
return {'users': 100, 'revenue': 50000}
@task()
def transform(data: dict):
return {
'users': data['users'],
'avg_revenue': data['revenue'] / data['users'],
}
@task()
def load(summary: dict):
print(f"Users: {summary['users']}, Avg Revenue: {summary['avg_revenue']}")
raw = extract()
transformed = transform(raw)
load(transformed)
taskflow_etl()
Common Operators
# dags/operators_demo.py: Various operator examples
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.sensors.filesystem import FileSensor
# SQL execution
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='my_postgres',
sql="""
CREATE TABLE IF NOT EXISTS daily_stats (
date DATE PRIMARY KEY,
total_users INT,
revenue NUMERIC
);
""",
)
# HTTP request
fetch_api = SimpleHttpOperator(
task_id='fetch_api',
http_conn_id='my_api',
endpoint='/api/stats',
method='GET',
response_filter=lambda r: r.json(),
)
# Wait for file
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/incoming/report.csv',
poke_interval=60,
timeout=3600,
)
Connections and Variables
# connections.sh: Set up connections via CLI
airflow connections add 'my_postgres' \
--conn-type 'postgres' \
--conn-host 'localhost' \
--conn-schema 'mydb' \
--conn-login 'user' \
--conn-password 'pass' \
--conn-port 5432
# Set variables
airflow variables set 'api_key' 'abc123'
airflow variables set 'config' '{"batch_size": 1000}' --serialize-json
# Trigger a DAG
airflow dags trigger hello_world --conf '{"date": "2026-02-19"}'