jpskill.com
🛠️ 開発・MCP コミュニティ

sqlalchemy

Pythonでデータベースを扱う際に、SQLAlchemyを使ってORMを構築したり、データベースのモデルを定義したり、非同期クエリを書いたり、Alembicでデータベースの移行を管理したりするのを支援するSkill。

📜 元の英語説明(参考)

Work with databases in Python using SQLAlchemy. Use when a user asks to set up a Python ORM, define database models, write async database queries, manage migrations with Alembic, or choose between SQLAlchemy and Django ORM.

🇯🇵 日本人クリエイター向け解説

一言でいうと

Pythonでデータベースを扱う際に、SQLAlchemyを使ってORMを構築したり、データベースのモデルを定義したり、非同期クエリを書いたり、Alembicでデータベースの移行を管理したりするのを支援するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o sqlalchemy.zip https://jpskill.com/download/15407.zip && unzip -o sqlalchemy.zip && rm sqlalchemy.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/15407.zip -OutFile "$d\sqlalchemy.zip"; Expand-Archive "$d\sqlalchemy.zip" -DestinationPath $d -Force; ri "$d\sqlalchemy.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して sqlalchemy.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → sqlalchemy フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

SQLAlchemy

概要

SQLAlchemy は、標準的な Python ORM および SQL ツールキットです。バージョン 2.0 では、非同期サポートを備えた最新の型フレンドリーな API が導入されています。モデルを Python クラスとして定義し、ビルダーパターンでクエリを記述し、Alembic マイグレーションでスキーマの変更を管理します。

手順

ステップ 1: 非同期セットアップ

# db.py — 非同期 SQLAlchemy 構成
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy import String, ForeignKey, DateTime, func
from datetime import datetime

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/myapp"

engine = create_async_engine(DATABASE_URL, echo=False, pool_size=20)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

ステップ 2: モデルの定義

# models.py — 型ヒント付き SQLAlchemy 2.0 モデル
from db import Base
from sqlalchemy import String, ForeignKey, DateTime, Integer, Text, Boolean, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime

class User(Base):
    __tablename__ = "users"

    id: Mapped[str] = mapped_column(String(36), primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    role: Mapped[str] = mapped_column(String(20), default="member")
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

    # Relationships
    projects: Mapped[list["Project"]] = relationship(back_populates="owner", cascade="all, delete")

    def __repr__(self) -> str:
        return f"<User {self.email}>"

class Project(Base):
    __tablename__ = "projects"

    id: Mapped[str] = mapped_column(String(36), primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    description: Mapped[str | None] = mapped_column(Text)
    status: Mapped[str] = mapped_column(String(20), default="active")
    owner_id: Mapped[str] = mapped_column(ForeignKey("users.id"))
    task_count: Mapped[int] = mapped_column(Integer, default=0)
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

    owner: Mapped["User"] = relationship(back_populates="projects")
    tasks: Mapped[list["Task"]] = relationship(back_populates="project", cascade="all, delete")

class Task(Base):
    __tablename__ = "tasks"

    id: Mapped[str] = mapped_column(String(36), primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    status: Mapped[str] = mapped_column(String(20), default="todo")
    project_id: Mapped[str] = mapped_column(ForeignKey("projects.id"))
    assignee_id: Mapped[str | None] = mapped_column(ForeignKey("users.id"))

    project: Mapped["Project"] = relationship(back_populates="tasks")

ステップ 3: クエリ

# queries.py — 非同期クエリの例
from sqlalchemy import select, func, and_
from sqlalchemy.orm import selectinload

async def get_user_projects(db: AsyncSession, user_id: str):
    """タスク数とともにユーザーのプロジェクトを取得します。"""
    result = await db.execute(
        select(Project)
        .where(Project.owner_id == user_id, Project.status == "active")
        .options(selectinload(Project.tasks))   # N+1 を避けるための eager load
        .order_by(Project.created_at.desc())
    )
    return result.scalars().all()

async def get_project_stats(db: AsyncSession, project_id: str):
    """プロジェクトのタスク統計を集計します。"""
    result = await db.execute(
        select(
            Task.status,
            func.count(Task.id).label("count"),
        )
        .where(Task.project_id == project_id)
        .group_by(Task.status)
    )
    return {row.status: row.count for row in result.all()}

async def search_tasks(db: AsyncSession, query: str, project_id: str):
    """タスクのタイトルで全文検索を行います。"""
    result = await db.execute(
        select(Task)
        .where(
            and_(
                Task.project_id == project_id,
                Task.title.ilike(f"%{query}%"),
            )
        )
        .limit(20)
    )
    return result.scalars().all()

ステップ 4: Alembic マイグレーション

# Alembic の初期化
pip install alembic
alembic init alembic

# モデルの変更からマイグレーションを生成
alembic revision --autogenerate -m "add tasks table"

# マイグレーションの適用
alembic upgrade head

# 1 ステップロールバック
alembic downgrade -1

ガイドライン

  • Mapped 型ヒント (SQLAlchemy 2.0) を使用します。IDE のオートコンプリートと型安全性が提供されます。
  • リレーションシップには常に selectinload または joinedload を使用します。N+1 クエリの問題を防ぎます。
  • 非同期セッションには expire_on_commit=False を使用します。遅延ロードの例外を防ぎます。
  • Alembic の自動生成はほとんどのスキーマ変更を検出しますが、適用する前にマイグレーションを確認してください。
  • シンプルなプロジェクトの場合は、SQLModel (FastAPI の作成者のライブラリ) を検討してください。よりシンプルな API で、同じエンジンです。
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

SQLAlchemy

Overview

SQLAlchemy is the standard Python ORM and SQL toolkit. Version 2.0 introduces a modern, type-friendly API with async support. Define models as Python classes, write queries with the builder pattern, and manage schema changes with Alembic migrations.

Instructions

Step 1: Async Setup

# db.py — Async SQLAlchemy configuration
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy import String, ForeignKey, DateTime, func
from datetime import datetime

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/myapp"

engine = create_async_engine(DATABASE_URL, echo=False, pool_size=20)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

Step 2: Define Models

# models.py — SQLAlchemy 2.0 models with type hints
from db import Base
from sqlalchemy import String, ForeignKey, DateTime, Integer, Text, Boolean, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime

class User(Base):
    __tablename__ = "users"

    id: Mapped[str] = mapped_column(String(36), primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    role: Mapped[str] = mapped_column(String(20), default="member")
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

    # Relationships
    projects: Mapped[list["Project"]] = relationship(back_populates="owner", cascade="all, delete")

    def __repr__(self) -> str:
        return f"<User {self.email}>"

class Project(Base):
    __tablename__ = "projects"

    id: Mapped[str] = mapped_column(String(36), primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    description: Mapped[str | None] = mapped_column(Text)
    status: Mapped[str] = mapped_column(String(20), default="active")
    owner_id: Mapped[str] = mapped_column(ForeignKey("users.id"))
    task_count: Mapped[int] = mapped_column(Integer, default=0)
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

    owner: Mapped["User"] = relationship(back_populates="projects")
    tasks: Mapped[list["Task"]] = relationship(back_populates="project", cascade="all, delete")

class Task(Base):
    __tablename__ = "tasks"

    id: Mapped[str] = mapped_column(String(36), primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    status: Mapped[str] = mapped_column(String(20), default="todo")
    project_id: Mapped[str] = mapped_column(ForeignKey("projects.id"))
    assignee_id: Mapped[str | None] = mapped_column(ForeignKey("users.id"))

    project: Mapped["Project"] = relationship(back_populates="tasks")

Step 3: Queries

# queries.py — Async query examples
from sqlalchemy import select, func, and_
from sqlalchemy.orm import selectinload

async def get_user_projects(db: AsyncSession, user_id: str):
    """Fetch user's projects with task counts."""
    result = await db.execute(
        select(Project)
        .where(Project.owner_id == user_id, Project.status == "active")
        .options(selectinload(Project.tasks))   # eager load to avoid N+1
        .order_by(Project.created_at.desc())
    )
    return result.scalars().all()

async def get_project_stats(db: AsyncSession, project_id: str):
    """Aggregate task statistics for a project."""
    result = await db.execute(
        select(
            Task.status,
            func.count(Task.id).label("count"),
        )
        .where(Task.project_id == project_id)
        .group_by(Task.status)
    )
    return {row.status: row.count for row in result.all()}

async def search_tasks(db: AsyncSession, query: str, project_id: str):
    """Full-text search in task titles."""
    result = await db.execute(
        select(Task)
        .where(
            and_(
                Task.project_id == project_id,
                Task.title.ilike(f"%{query}%"),
            )
        )
        .limit(20)
    )
    return result.scalars().all()

Step 4: Alembic Migrations

# Initialize Alembic
pip install alembic
alembic init alembic

# Generate migration from model changes
alembic revision --autogenerate -m "add tasks table"

# Apply migrations
alembic upgrade head

# Rollback one step
alembic downgrade -1

Guidelines

  • Use Mapped type hints (SQLAlchemy 2.0) — they provide IDE autocompletion and type safety.
  • Always use selectinload or joinedload for relationships — prevents N+1 query problems.
  • Use expire_on_commit=False for async sessions — prevents lazy loading exceptions.
  • Alembic autogenerate detects most schema changes, but review migrations before applying.
  • For simple projects, consider SQLModel (FastAPI creator's library) — simpler API, same engine.