scaffolding-fastapi-dapr
Build production-grade FastAPI backends with SQLModel, Dapr integration, and JWT authentication. Use when building REST APIs with Neon PostgreSQL, implementing event-driven microservices with Dapr pub/sub, scheduling jobs, or creating CRUD endpoints with JWT/JWKS verification. NOT when building simple scripts or non-microservice architectures.
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o scaffolding-fastapi-dapr.zip https://jpskill.com/download/17315.zip && unzip -o scaffolding-fastapi-dapr.zip && rm scaffolding-fastapi-dapr.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/17315.zip -OutFile "$d\scaffolding-fastapi-dapr.zip"; Expand-Archive "$d\scaffolding-fastapi-dapr.zip" -DestinationPath $d -Force; ri "$d\scaffolding-fastapi-dapr.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
scaffolding-fastapi-dapr.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
scaffolding-fastapi-daprフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 6
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
[Skill 名] scaffolding-fastapi-dapr
FastAPI + Dapr バックエンド
SQLModel、Dapr 統合、および JWT 認証を使用して、本番環境グレードの FastAPI バックエンドを構築します。
クイックスタート
# プロジェクトのセットアップ
uv init backend && cd backend
uv add fastapi sqlmodel pydantic httpx python-jose uvicorn
# 開発
uv run uvicorn main:app --reload --port 8000
# Dapr sidecar を使用
dapr run --app-id myapp --app-port 8000 -- uvicorn main:app
FastAPI コアパターン
1. SQLModel スキーマ (データベース + API)
from sqlmodel import SQLModel, Field
from datetime import datetime
from typing import Optional, Literal
class TaskBase(SQLModel):
title: str = Field(max_length=200, index=True)
status: Literal["pending", "in_progress", "completed"] = "pending"
class Task(TaskBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
created_at: datetime = Field(default_factory=datetime.now)
class TaskCreate(TaskBase):
pass
class TaskRead(TaskBase):
id: int
created_at: datetime
2. 非同期データベースのセットアップ
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
import os
DATABASE_URL = os.getenv("DATABASE_URL").replace("postgresql://", "postgresql+asyncpg://")
engine = create_async_engine(DATABASE_URL)
async def get_session() -> AsyncSession:
async with AsyncSession(engine) as session:
yield session
3. CRUD エンドポイント
from fastapi import FastAPI, Depends, HTTPException
from sqlmodel import select
app = FastAPI()
@app.post("/tasks", response_model=TaskRead, status_code=201)
async def create_task(task: TaskCreate, session: AsyncSession = Depends(get_session)):
db_task = Task.model_validate(task)
session.add(db_task)
await session.commit()
await session.refresh(db_task)
return db_task
@app.get("/tasks/{task_id}", response_model=TaskRead)
async def get_task(task_id: int, session: AsyncSession = Depends(get_session)):
task = await session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Not found")
return task
@app.patch("/tasks/{task_id}", response_model=TaskRead)
async def update_task(task_id: int, update: TaskUpdate, session: AsyncSession = Depends(get_session)):
task = await session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Not found")
update_data = update.model_dump(exclude_unset=True)
task.sqlmodel_update(update_data)
session.add(task)
await session.commit()
await session.refresh(task)
return task
4. JWT/JWKS 認証
from jose import jwt
import httpx
JWKS_URL = f"{SSO_URL}/.well-known/jwks.json"
async def get_current_user(authorization: str = Header()):
token = authorization.replace("Bearer ", "")
async with httpx.AsyncClient() as client:
jwks = (await client.get(JWKS_URL)).json()
payload = jwt.decode(token, jwks, algorithms=["RS256"])
return payload
@app.get("/protected")
async def protected_route(user = Depends(get_current_user)):
return {"user": user["sub"]}
監査ログ、ページネーション、および OpenAPI 構成については、references/fastapi-patterns.md を参照してください。
Dapr 統合パターン
1. Pub/Sub サブスクリプション
from fastapi import APIRouter, Request
router = APIRouter(prefix="/dapr", tags=["Dapr"])
@router.get("/subscribe")
async def subscribe():
"""Dapr calls this to discover subscriptions."""
return [{
"pubsubname": "pubsub",
"topic": "task-created",
"route": "/dapr/task-created"
}]
@router.post("/task-created")
async def handle_task_created(request: Request, session: AsyncSession = Depends(get_session)):
# CloudEvent wrapper - data is nested
event = await request.json()
task_data = event.get("data", event) # Handle both wrapped and unwrapped
# Process event
task = Task.model_validate(task_data)
session.add(task)
await session.commit()
return {"status": "processed"}
2. イベントの発行
import httpx
DAPR_URL = "http://localhost:3500"
async def publish_event(topic: str, data: dict):
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={"Content-Type": "application/json"}
)
3. スケジュールされたジョブ
# Dapr Jobs API (alpha) 経由でジョブをスケジュールします
async def schedule_job(name: str, schedule: str, callback_url: str, data: dict):
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0-alpha1/jobs/{name}",
json={
"schedule": schedule, # "@every 5m" or "0 */5 * * * *"
"data": data,
},
headers={"dapr-app-callback-url": callback_url}
)
# ジョブコールバックエンドポイント
@app.post("/jobs/process")
async def process_job(request: Request):
job_data = await request.json()
# ジョブの実行を処理します
return {"status": "completed"}
状態管理と高度なパターンについては、references/dapr-patterns.md を参照してください。
本番環境パターン
構造化ロギング
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
log = structlog.get_logger()
log.info("task_created", task_id=task.id, user_id=user["sub"])
リポジトリ + サービスパターン
# リポジトリ: データアクセスのみ
class TaskRepository:
def __init__(self, session: AsyncSession):
self.session = session
asyn
(原文がここで切り詰められています) 📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
FastAPI + Dapr Backend
Build production-grade FastAPI backends with SQLModel, Dapr integration, and JWT authentication.
Quick Start
# Project setup
uv init backend && cd backend
uv add fastapi sqlmodel pydantic httpx python-jose uvicorn
# Development
uv run uvicorn main:app --reload --port 8000
# With Dapr sidecar
dapr run --app-id myapp --app-port 8000 -- uvicorn main:app
FastAPI Core Patterns
1. SQLModel Schema (Database + API)
from sqlmodel import SQLModel, Field
from datetime import datetime
from typing import Optional, Literal
class TaskBase(SQLModel):
title: str = Field(max_length=200, index=True)
status: Literal["pending", "in_progress", "completed"] = "pending"
class Task(TaskBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
created_at: datetime = Field(default_factory=datetime.now)
class TaskCreate(TaskBase):
pass
class TaskRead(TaskBase):
id: int
created_at: datetime
2. Async Database Setup
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
import os
DATABASE_URL = os.getenv("DATABASE_URL").replace("postgresql://", "postgresql+asyncpg://")
engine = create_async_engine(DATABASE_URL)
async def get_session() -> AsyncSession:
async with AsyncSession(engine) as session:
yield session
3. CRUD Endpoints
from fastapi import FastAPI, Depends, HTTPException
from sqlmodel import select
app = FastAPI()
@app.post("/tasks", response_model=TaskRead, status_code=201)
async def create_task(task: TaskCreate, session: AsyncSession = Depends(get_session)):
db_task = Task.model_validate(task)
session.add(db_task)
await session.commit()
await session.refresh(db_task)
return db_task
@app.get("/tasks/{task_id}", response_model=TaskRead)
async def get_task(task_id: int, session: AsyncSession = Depends(get_session)):
task = await session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Not found")
return task
@app.patch("/tasks/{task_id}", response_model=TaskRead)
async def update_task(task_id: int, update: TaskUpdate, session: AsyncSession = Depends(get_session)):
task = await session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Not found")
update_data = update.model_dump(exclude_unset=True)
task.sqlmodel_update(update_data)
session.add(task)
await session.commit()
await session.refresh(task)
return task
4. JWT/JWKS Authentication
from jose import jwt
import httpx
JWKS_URL = f"{SSO_URL}/.well-known/jwks.json"
async def get_current_user(authorization: str = Header()):
token = authorization.replace("Bearer ", "")
async with httpx.AsyncClient() as client:
jwks = (await client.get(JWKS_URL)).json()
payload = jwt.decode(token, jwks, algorithms=["RS256"])
return payload
@app.get("/protected")
async def protected_route(user = Depends(get_current_user)):
return {"user": user["sub"]}
See references/fastapi-patterns.md for audit logging, pagination, and OpenAPI configuration.
Dapr Integration Patterns
1. Pub/Sub Subscription
from fastapi import APIRouter, Request
router = APIRouter(prefix="/dapr", tags=["Dapr"])
@router.get("/subscribe")
async def subscribe():
"""Dapr calls this to discover subscriptions."""
return [{
"pubsubname": "pubsub",
"topic": "task-created",
"route": "/dapr/task-created"
}]
@router.post("/task-created")
async def handle_task_created(request: Request, session: AsyncSession = Depends(get_session)):
# CloudEvent wrapper - data is nested
event = await request.json()
task_data = event.get("data", event) # Handle both wrapped and unwrapped
# Process event
task = Task.model_validate(task_data)
session.add(task)
await session.commit()
return {"status": "processed"}
2. Publishing Events
import httpx
DAPR_URL = "http://localhost:3500"
async def publish_event(topic: str, data: dict):
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={"Content-Type": "application/json"}
)
3. Scheduled Jobs
# Schedule a job via Dapr Jobs API (alpha)
async def schedule_job(name: str, schedule: str, callback_url: str, data: dict):
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0-alpha1/jobs/{name}",
json={
"schedule": schedule, # "@every 5m" or "0 */5 * * * *"
"data": data,
},
headers={"dapr-app-callback-url": callback_url}
)
# Job callback endpoint
@app.post("/jobs/process")
async def process_job(request: Request):
job_data = await request.json()
# Handle job execution
return {"status": "completed"}
See references/dapr-patterns.md for state management and advanced patterns.
Production Patterns
Structured Logging
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
log = structlog.get_logger()
log.info("task_created", task_id=task.id, user_id=user["sub"])
Repository + Service Pattern
# Repository: data access only
class TaskRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, task: TaskCreate) -> Task:
db_task = Task.model_validate(task)
self.session.add(db_task)
await self.session.commit()
return db_task
# Service: business logic
class TaskService:
def __init__(self, repo: TaskRepository):
self.repo = repo
async def create_task(self, task: TaskCreate, user_id: str) -> Task:
# Business logic here
return await self.repo.create(task)
# Dependency injection
def get_task_service(session: AsyncSession = Depends(get_session)):
return TaskService(TaskRepository(session))
Async Testing
@pytest.fixture
async def client(session):
app.dependency_overrides[get_session] = lambda: session
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as ac:
yield ac
@pytest.mark.anyio
async def test_create_task(client: AsyncClient):
response = await client.post("/tasks", json={"title": "Test"})
assert response.status_code == 201
See references/production-testing.md for full patterns.
Project Structure
backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── database.py # Async engine + session
│ ├── models/ # SQLModel schemas
│ ├── routers/ # API routes
│ ├── repositories/ # Data access layer
│ ├── services/ # Business logic
│ └── dapr/ # Dapr handlers
├── tests/
│ ├── conftest.py # Fixtures
│ └── test_*.py # Test files
├── components/ # Dapr components (k8s)
│ ├── pubsub.yaml
│ └── statestore.yaml
└── pyproject.toml
Verification
Run: python3 scripts/verify.py
Expected: ✓ scaffolding-fastapi-dapr skill ready
If Verification Fails
- Check: references/ folder has both pattern files
- Stop and report if still failing
Related Skills
- configuring-better-auth - JWT/JWKS auth for API endpoints
- fetching-library-docs - FastAPI docs:
--library-id /fastapi/fastapi --topic dependencies
References
- references/fastapi-patterns.md - Complete FastAPI backend patterns
- references/dapr-patterns.md - Dapr pub/sub, state, and jobs
- references/sqlmodel-patterns.md - SQLModel database patterns and migrations
- references/production-testing.md - Structured logging, DI, testing, versioning
同梱ファイル
※ ZIPに含まれるファイル一覧。`SKILL.md` 本体に加え、参考資料・サンプル・スクリプトが入っている場合があります。
- 📄 SKILL.md (9,184 bytes)
- 📎 references/dapr-patterns.md (9,609 bytes)
- 📎 references/fastapi-patterns.md (18,432 bytes)
- 📎 references/production-testing.md (10,540 bytes)
- 📎 references/sqlmodel-patterns.md (6,010 bytes)
- 📎 scripts/verify.py (663 bytes)