dagster
Dagsterは、ソフトウェア定義アセットという考え方に基づいたデータパイプラインのオーケストレーターであり、保守しやすいデータプラットフォームを構築するために、アセット、オペレーション、ジョブなどを定義するSkill。
📜 元の英語説明(参考)
Dagster is a data pipeline orchestrator built around the concept of software-defined assets. Learn to define assets, ops, jobs, schedules, sensors, and resources for building maintainable data platforms.
🇯🇵 日本人クリエイター向け解説
Dagsterは、ソフトウェア定義アセットという考え方に基づいたデータパイプラインのオーケストレーターであり、保守しやすいデータプラットフォームを構築するために、アセット、オペレーション、ジョブなどを定義するSkill。
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o dagster.zip https://jpskill.com/download/14812.zip && unzip -o dagster.zip && rm dagster.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14812.zip -OutFile "$d\dagster.zip"; Expand-Archive "$d\dagster.zip" -DestinationPath $d -Force; ri "$d\dagster.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
dagster.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
dagsterフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
Dagster
Dagster は、ソフトウェア定義アセットを中心にデータパイプラインを構成します。ソフトウェア定義アセットとは、パイプラインが生成するデータアーティファクトの宣言のことです。アセットは、リネージを追跡し、インクリメンタルな計算を可能にし、Dagster UI と統合します。
インストール
# Dagster と UI をインストール
pip install dagster dagster-webserver
# 新しいプロジェクトを作成
dagster project scaffold --name my_pipeline
cd my_pipeline
pip install -e ".[dev]"
# 開発サーバーを起動
dagster dev
# UI は http://localhost:3000 で利用可能
ソフトウェア定義アセット
# my_pipeline/assets.py: データを生成するアセットを定義
from dagster import asset, AssetExecutionContext
import pandas as pd
@asset(group_name="raw")
def raw_users(context: AssetExecutionContext) -> pd.DataFrame:
"""API から生のユーザーデータを取得します。"""
import httpx
response = httpx.get("https://api.example.com/users")
df = pd.DataFrame(response.json())
context.log.info(f"Fetched {len(df)} users")
return df
@asset(group_name="raw")
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
"""API から生の注文データを取得します。"""
import httpx
response = httpx.get("https://api.example.com/orders")
return pd.DataFrame(response.json())
@asset(group_name="analytics", deps=[raw_users, raw_orders])
def revenue_by_user(raw_users: pd.DataFrame, raw_orders: pd.DataFrame) -> pd.DataFrame:
"""ユーザーごとの総収益を計算します。"""
merged = raw_orders.merge(raw_users, left_on="user_id", right_on="id")
result = (
merged.groupby(["user_id", "name"])
.agg(total_revenue=("amount", "sum"), order_count=("id_x", "count"))
.reset_index()
)
return result
リソース
# my_pipeline/resources.py: 外部システム用の構成可能なリソース
from dagster import resource, ConfigurableResource
import sqlalchemy
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
engine = sqlalchemy.create_engine(self.connection_string)
with engine.connect() as conn:
result = conn.execute(sqlalchemy.text(sql))
return [dict(row._mapping) for row in result]
def execute(self, sql: str):
engine = sqlalchemy.create_engine(self.connection_string)
with engine.connect() as conn:
conn.execute(sqlalchemy.text(sql))
conn.commit()
リソースを持つアセット
# my_pipeline/db_assets.py: データベースリソースを使用するアセット
from dagster import asset, AssetExecutionContext
from .resources import DatabaseResource
@asset(group_name="warehouse")
def dim_users(context: AssetExecutionContext, database: DatabaseResource):
"""クレンジングされたユーザーディメンションテーブルをウェアハウスにロードします。"""
users = database.query("SELECT id, name, email, created_at FROM raw_users")
context.log.info(f"Loaded {len(users)} users into warehouse")
return users
定義
# my_pipeline/__init__.py: すべてをまとめて接続
from dagster import Definitions, load_assets_from_modules
from . import assets, db_assets
from .resources import DatabaseResource
all_assets = load_assets_from_modules([assets, db_assets])
defs = Definitions(
assets=all_assets,
resources={
"database": DatabaseResource(
connection_string="postgresql://user:pass@localhost:5432/analytics"
),
},
)
スケジュールとセンサー
# my_pipeline/schedules.py: 時間ベースおよびイベントベースのトリガー
from dagster import (
ScheduleDefinition,
define_asset_job,
sensor,
RunRequest,
SensorEvaluationContext,
AssetSelection,
)
# 特定のアセットを具体化するジョブ
analytics_job = define_asset_job(
name="analytics_job",
selection=AssetSelection.groups("analytics"),
)
# Cron スケジュール
daily_analytics = ScheduleDefinition(
job=analytics_job,
cron_schedule="0 6 * * *", # 毎日午前6時
)
# センサー — 外部イベントでトリガー
@sensor(job=analytics_job, minimum_interval_seconds=60)
def new_file_sensor(context: SensorEvaluationContext):
import os
files = os.listdir("/data/incoming")
new_files = [f for f in files if f.endswith(".csv")]
if new_files:
context.log.info(f"Found {len(new_files)} new files")
yield RunRequest(run_key=new_files[0])
パーティション化されたアセット
# my_pipeline/partitioned.py: インクリメンタル処理のための時間パーティション化されたアセット
from dagster import asset, DailyPartitionsDefinition
daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily_partitions, group_name="raw")
def daily_events(context):
"""特定の日付パーティションのイベントを取得します。"""
date = context.partition_key # 例: "2026-02-19"
context.log.info(f"Processing events for {date}")
# この日付のデータのみを取得
return fetch_events(date)
CLI リファレンス
# cli.sh: 一般的な Dagster CLI コマンド
# 開発サーバー
dagster dev
# アセットを具体化
dagster asset materialize --select raw_users,raw_orders
# アセットをリスト表示
dagster asset list
# ジョブを実行
dagster job execute -j analytics_job
# 定義をチェック
dagster definitions validate 📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
Dagster
Dagster organizes data pipelines around software-defined assets — declarations of the data artifacts your pipeline produces. Assets track lineage, enable incremental computation, and integrate with the Dagster UI.
Installation
# Install Dagster and UI
pip install dagster dagster-webserver
# Create a new project
dagster project scaffold --name my_pipeline
cd my_pipeline
pip install -e ".[dev]"
# Start the dev server
dagster dev
# UI at http://localhost:3000
Software-Defined Assets
# my_pipeline/assets.py: Define assets that produce data
from dagster import asset, AssetExecutionContext
import pandas as pd
@asset(group_name="raw")
def raw_users(context: AssetExecutionContext) -> pd.DataFrame:
"""Fetch raw user data from API."""
import httpx
response = httpx.get("https://api.example.com/users")
df = pd.DataFrame(response.json())
context.log.info(f"Fetched {len(df)} users")
return df
@asset(group_name="raw")
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
"""Fetch raw order data from API."""
import httpx
response = httpx.get("https://api.example.com/orders")
return pd.DataFrame(response.json())
@asset(group_name="analytics", deps=[raw_users, raw_orders])
def revenue_by_user(raw_users: pd.DataFrame, raw_orders: pd.DataFrame) -> pd.DataFrame:
"""Calculate total revenue per user."""
merged = raw_orders.merge(raw_users, left_on="user_id", right_on="id")
result = (
merged.groupby(["user_id", "name"])
.agg(total_revenue=("amount", "sum"), order_count=("id_x", "count"))
.reset_index()
)
return result
Resources
# my_pipeline/resources.py: Configurable resources for external systems
from dagster import resource, ConfigurableResource
import sqlalchemy
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
engine = sqlalchemy.create_engine(self.connection_string)
with engine.connect() as conn:
result = conn.execute(sqlalchemy.text(sql))
return [dict(row._mapping) for row in result]
def execute(self, sql: str):
engine = sqlalchemy.create_engine(self.connection_string)
with engine.connect() as conn:
conn.execute(sqlalchemy.text(sql))
conn.commit()
Assets with Resources
# my_pipeline/db_assets.py: Assets that use database resources
from dagster import asset, AssetExecutionContext
from .resources import DatabaseResource
@asset(group_name="warehouse")
def dim_users(context: AssetExecutionContext, database: DatabaseResource):
"""Load cleaned user dimension table into warehouse."""
users = database.query("SELECT id, name, email, created_at FROM raw_users")
context.log.info(f"Loaded {len(users)} users into warehouse")
return users
Definitions
# my_pipeline/__init__.py: Wire everything together
from dagster import Definitions, load_assets_from_modules
from . import assets, db_assets
from .resources import DatabaseResource
all_assets = load_assets_from_modules([assets, db_assets])
defs = Definitions(
assets=all_assets,
resources={
"database": DatabaseResource(
connection_string="postgresql://user:pass@localhost:5432/analytics"
),
},
)
Schedules and Sensors
# my_pipeline/schedules.py: Time-based and event-based triggers
from dagster import (
ScheduleDefinition,
define_asset_job,
sensor,
RunRequest,
SensorEvaluationContext,
AssetSelection,
)
# Job that materializes specific assets
analytics_job = define_asset_job(
name="analytics_job",
selection=AssetSelection.groups("analytics"),
)
# Cron schedule
daily_analytics = ScheduleDefinition(
job=analytics_job,
cron_schedule="0 6 * * *", # 6 AM daily
)
# Sensor — trigger on external event
@sensor(job=analytics_job, minimum_interval_seconds=60)
def new_file_sensor(context: SensorEvaluationContext):
import os
files = os.listdir("/data/incoming")
new_files = [f for f in files if f.endswith(".csv")]
if new_files:
context.log.info(f"Found {len(new_files)} new files")
yield RunRequest(run_key=new_files[0])
Partitioned Assets
# my_pipeline/partitioned.py: Time-partitioned assets for incremental processing
from dagster import asset, DailyPartitionsDefinition
daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily_partitions, group_name="raw")
def daily_events(context):
"""Fetch events for a specific date partition."""
date = context.partition_key # e.g., "2026-02-19"
context.log.info(f"Processing events for {date}")
# Fetch only this date's data
return fetch_events(date)
CLI Reference
# cli.sh: Common Dagster CLI commands
# Development server
dagster dev
# Materialize assets
dagster asset materialize --select raw_users,raw_orders
# List assets
dagster asset list
# Run a job
dagster job execute -j analytics_job
# Check definitions
dagster definitions validate