jpskill.com
🛠️ 開発・MCP コミュニティ

django-celery-expert

DjangoとCeleryを活用し、非同期タスク処理に関する設計、設定、エラー処理、パフォーマンス最適化、定期タスク実装、本番環境監視まで、Django連携のベストプラクティスに沿った専門的なアドバイスを提供するSkill。

📜 元の英語説明(参考)

Expert Django and Celery guidance for asynchronous task processing. Use when designing background tasks, configuring workers, handling retries and errors, optimizing task performance, implementing periodic tasks, or setting up production monitoring. Follows Celery best practices with Django integration patterns.

🇯🇵 日本人クリエイター向け解説

一言でいうと

DjangoとCeleryを活用し、非同期タスク処理に関する設計、設定、エラー処理、パフォーマンス最適化、定期タスク実装、本番環境監視まで、Django連携のベストプラクティスに沿った専門的なアドバイスを提供するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o django-celery-expert.zip https://jpskill.com/download/10593.zip && unzip -o django-celery-expert.zip && rm django-celery-expert.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/10593.zip -OutFile "$d\django-celery-expert.zip"; Expand-Archive "$d\django-celery-expert.zip" -DestinationPath $d -Force; ri "$d\django-celery-expert.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して django-celery-expert.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → django-celery-expert フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

Django Celery Expert

概要

このスキルは、非同期タスク処理に Celery を使用する Django アプリケーションに対して、専門的なガイダンスを提供します。タスク設計パターン、ワーカー構成、エラー処理、監視、および本番環境へのデプロイ戦略を網羅します。

主な機能:

  • タスクの設計と実装パターン
  • ワーカーの構成とスケーリング
  • エラー処理と再試行戦略
  • 定期的/スケジュールされたタスクの管理
  • 監視と可観測性
  • 本番環境へのデプロイのベストプラクティス

どのような時に使うか

以下のトリガーが発生した場合に、このスキルを呼び出してください。

タスク設計:

  • 「〜のための Celery タスクを作成する」
  • 「これをバックグラウンドジョブに移動する」
  • 「これを非同期的に処理する」
  • 「リクエストの外部でこれを処理する」

構成と設定:

  • 「Django 用に Celery を構成する」
  • 「タスクキューを設定する」
  • 「Celery ワーカーを構成する」
  • 「スケジューリングのために Celery Beat を設定する」

エラー処理:

  • 「タスクの失敗を処理する」
  • 「再試行ロジックを実装する」
  • 「タスクが失敗し続ける」
  • 「デッドレターキューを設定する」

パフォーマンスとスケーリング:

  • 「Celery ワーカーをスケールする」
  • 「タスクのスループットを最適化する」
  • 「タスクが遅すぎる」
  • 「大量のタスクを処理する」

監視:

  • 「Celery タスクを監視する」
  • 「Flower を設定する」
  • 「タスクの進捗状況を追跡する」
  • 「スタックしたタスクをデバッグする」

指示

Django Celery のリクエストを処理する際は、次のワークフローに従ってください。

1. リクエストの分析

タスクの種類を特定する:

  • 単純なバックグラウンドタスク (fire-and-forget)
  • 結果追跡を伴うタスク (完了をポーリングする必要がある)
  • チェーン/グループ化されたタスク (ワークフローオーケストレーション)
  • 定期的/スケジュールされたタスク (cron のような動作)
  • 長時間実行タスク (進捗状況の追跡が必要)

重要な質問:

  • 呼び出し元は結果を必要とするか?
  • 失敗は再試行されるべきか?
  • 冪等性が必要か?
  • 予想される実行時間は?
  • 実行の保証はどれほど重要か?

2. 関連する参照ドキュメントをロードする

タスクの種類に基づいて、適切なバンドルされたドキュメントを参照してください。

  • Django 固有のパターン -> references/django-integration.md
  • タスクの実装 -> references/task-design-patterns.md
  • 構成と設定 -> references/configuration-guide.md
  • エラー処理と再試行 -> references/error-handling.md
  • 定期的なタスク -> references/periodic-tasks.md
  • 監視とデバッグ -> references/monitoring-observability.md
  • 本番環境へのデプロイ -> references/production-deployment.md

3. ベストプラクティスに従って実装する

タスク設計の原則:

  • タスクを小さく、焦点を絞る
  • 可能であれば冪等性を考慮して設計する
  • 明示的なタスク名を使用する
  • self にアクセスするためにタスクをバインドする
  • シリアライズ可能な引数のみを渡す (オブジェクトではなく ID)

エラー処理:

  • 適切な再試行動作を構成する
  • 指数バックオフを使用する
  • 最大再試行回数を設定する
  • 特定の例外を適切に処理する
  • コンテキストとともに失敗をログに記録する

パフォーマンス:

  • 適切なシリアライザーを使用する (安全のためには JSON、Python オブジェクトには pickle)
  • プリフェッチ制限を構成する
  • 優先順位付けのためにタスクルーティングを使用する
  • 適切な場合はバッチ処理を行う
  • メモリ使用量を監視する

4. 実装の検証

ソリューションを提示する前に:

  • 再試行が有効になっている場合は、タスクが冪等であることを確認する
  • 引数のシリアライズを確認する
  • 適切なエラー処理を保証する
  • 監視/ロギングが適切に設定されていることを確認する
  • 失敗のシナリオを検討する

バンドルされたリソース

references/ - 必要に応じてコンテキストにロードされる包括的な Celery ドキュメント

  • references/django-integration.md

    • 安全なタスクキューイングのための transaction.on_commit()
    • 復旧タスクによる信頼できる情報源としてのデータベース
    • django-guid を使用したリクエスト-タスクの関連付け
    • Django Celery タスクのテスト
  • references/task-design-patterns.md

    • タスクシグネチャと呼び出しパターン
    • タスクプロパティのバインディングとアクセス
    • タスクの継承と基底クラス
    • ワークフローパターン (チェーン、グループ、コード)
    • 冪等性と正確に一度の配信
  • references/configuration-guide.md

    • Django-Celery 統合の設定
    • ブローカー構成 (Redis, RabbitMQ)
    • 結果バックエンドのオプション
    • ワーカー設定と並行性
    • タスクルーティングとキュー
  • references/error-handling.md

    • 再試行戦略とバックオフ
    • 例外処理パターン
    • デッドレターキュー
    • タスクの拒否と再キュー
    • タイムアウト処理
  • references/periodic-tasks.md

    • Celery Beat の構成
    • Crontab と間隔スケジュール
    • Django データベーススケジューラ
    • 動的なスケジュール管理
    • タイムゾーンの考慮事項
  • references/monitoring-observability.md

    • Flower の設定と使用法
    • Prometheus/Grafana の統合
    • タスクイベントの監視
    • ロギングのベストプラクティス
    • スタックしたタスクのデバッグ
  • references/production-deployment.md

    • ワーカーのデプロイパターン
    • プロセスの監視 (systemd, supervisor)
    • コンテナ化されたデプロイ
    • スケーリング戦略
    • ヘルスチェックとグレースフルシャットダウン

例 1: 基本的なバックグラウンドタスク

ユーザーリクエスト:

「ユーザー登録後、ウェルカムメールをバックグラウンドで送信する」

実装:

# tasks.py
from celery import shared_task
from django.core.mail import send_mail

@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id):
    from users.models import User

    try:
        user = User.objects.get(id=user_id)
        send_mail(
            subject="Welcome!",
            message=f"Hi {user.name}, welcome to our platform!",
            from_email="noreply@example.com",
            recipient_list=[user.email],
        )
    except User.DoesNotExist:
        pass  # User deleted, skip
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

# views.py
def register(request):
    user = User.objects.create(...)
    send_welcome_email.delay(user.id)  # Fire and forget
    return redirect('dashboard')

例 2: 進捗状況の追跡を伴うタスク

ユーザーリクエスト:

「進捗状況の更新を伴う大規模な CSV インポートを処理する」

実装:


@shared_task(bind=True

(原文がここで切り詰められています)
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Django Celery Expert

Overview

This skill provides expert guidance for Django applications using Celery for asynchronous task processing. It covers task design patterns, worker configuration, error handling, monitoring, and production deployment strategies.

Key Capabilities:

  • Task design and implementation patterns
  • Worker configuration and scaling
  • Error handling and retry strategies
  • Periodic/scheduled task management
  • Monitoring and observability
  • Production deployment best practices

When to Use

Invoke this skill when you encounter these triggers:

Task Design:

  • "Create a Celery task for..."
  • "Move this to a background job"
  • "Process this asynchronously"
  • "Handle this outside the request"

Configuration & Setup:

  • "Configure Celery for Django"
  • "Set up task queues"
  • "Configure Celery workers"
  • "Set up Celery Beat for scheduling"

Error Handling:

  • "Handle task failures"
  • "Implement retry logic"
  • "Task keeps failing"
  • "Set up dead letter queue"

Performance & Scaling:

  • "Scale Celery workers"
  • "Optimize task throughput"
  • "Tasks are too slow"
  • "Handle high task volume"

Monitoring:

  • "Monitor Celery tasks"
  • "Set up Flower"
  • "Track task progress"
  • "Debug stuck tasks"

Instructions

Follow this workflow when handling Django Celery requests:

1. Analyze the Request

Identify the task type:

  • Simple background task (fire-and-forget)
  • Task with result tracking (need to poll for completion)
  • Chained/grouped tasks (workflow orchestration)
  • Periodic/scheduled tasks (cron-like behavior)
  • Long-running tasks (need progress tracking)

Key questions:

  • Does the caller need the result?
  • Should failures be retried?
  • Is idempotency required?
  • What's the expected execution time?
  • How critical is guaranteed execution?

2. Load Relevant Reference Documentation

Based on the task type, reference the appropriate bundled documentation:

  • Django-specific patterns -> references/django-integration.md
  • Task implementation -> references/task-design-patterns.md
  • Configuration & setup -> references/configuration-guide.md
  • Error handling & retries -> references/error-handling.md
  • Periodic tasks -> references/periodic-tasks.md
  • Monitoring & debugging -> references/monitoring-observability.md
  • Production deployment -> references/production-deployment.md

3. Implement Following Best Practices

Task design principles:

  • Keep tasks small and focused
  • Design for idempotency when possible
  • Use explicit task names
  • Bind tasks for access to self
  • Pass serializable arguments only (IDs, not objects)

Error handling:

  • Configure appropriate retry behavior
  • Use exponential backoff
  • Set max retry limits
  • Handle specific exceptions appropriately
  • Log failures with context

Performance:

  • Use appropriate serializers (JSON for safety, pickle for Python objects)
  • Configure prefetch limits
  • Use task routing for prioritization
  • Batch operations when appropriate
  • Monitor memory usage

4. Validate Implementation

Before presenting the solution:

  • Verify task is idempotent if retries enabled
  • Check serialization of arguments
  • Ensure proper error handling
  • Verify monitoring/logging is in place
  • Consider failure scenarios

Bundled Resources

references/ - Comprehensive Celery documentation loaded into context as needed

  • references/django-integration.md

    • transaction.on_commit() for safe task queuing
    • Database as source of truth with recovery tasks
    • Request-task correlation with django-guid
    • Testing Django Celery tasks
  • references/task-design-patterns.md

    • Task signatures and calling patterns
    • Binding and accessing task properties
    • Task inheritance and base classes
    • Workflow patterns (chains, groups, chords)
    • Idempotency and exactly-once delivery
  • references/configuration-guide.md

    • Django-Celery integration setup
    • Broker configuration (Redis, RabbitMQ)
    • Result backend options
    • Worker settings and concurrency
    • Task routing and queues
  • references/error-handling.md

    • Retry strategies and backoff
    • Exception handling patterns
    • Dead letter queues
    • Task rejection and requeue
    • Timeout handling
  • references/periodic-tasks.md

    • Celery Beat configuration
    • Crontab and interval schedules
    • Django database scheduler
    • Dynamic schedule management
    • Timezone considerations
  • references/monitoring-observability.md

    • Flower setup and usage
    • Prometheus/Grafana integration
    • Task event monitoring
    • Logging best practices
    • Debugging stuck tasks
  • references/production-deployment.md

    • Worker deployment patterns
    • Process supervision (systemd, supervisor)
    • Containerized deployments
    • Scaling strategies
    • Health checks and graceful shutdown

Examples

Example 1: Basic Background Task

User Request:

"Send welcome emails in the background after user registration"

Implementation:

# tasks.py
from celery import shared_task
from django.core.mail import send_mail

@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id):
    from users.models import User

    try:
        user = User.objects.get(id=user_id)
        send_mail(
            subject="Welcome!",
            message=f"Hi {user.name}, welcome to our platform!",
            from_email="noreply@example.com",
            recipient_list=[user.email],
        )
    except User.DoesNotExist:
        pass  # User deleted, skip
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

# views.py
def register(request):
    user = User.objects.create(...)
    send_welcome_email.delay(user.id)  # Fire and forget
    return redirect('dashboard')

Example 2: Task with Progress Tracking

User Request:

"Process a large CSV import with progress updates"

Implementation:

@shared_task(bind=True)
def import_csv(self, file_path, total_rows):
    from myapp.models import Record

    with open(file_path) as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            Record.objects.create(**row)
            if i % 100 == 0:
                self.update_state(
                    state='PROGRESS',
                    meta={'current': i, 'total': total_rows}
                )

    return {'status': 'complete', 'processed': total_rows}

# Check progress
result = import_csv.AsyncResult(task_id)
if result.state == 'PROGRESS':
    progress = result.info.get('current', 0) / result.info.get('total', 1)

Example 3: Workflow with Chains

User Request:

"Process an order: validate inventory, charge payment, then send confirmation"

Implementation:

from celery import chain

@shared_task
def validate_inventory(order_id):
    # Returns order_id if valid, raises if not
    order = Order.objects.get(id=order_id)
    if not order.items_in_stock():
        raise ValueError("Items out of stock")
    return order_id

@shared_task
def charge_payment(order_id):
    order = Order.objects.get(id=order_id)
    order.charge()
    return order_id

@shared_task
def send_confirmation(order_id):
    order = Order.objects.get(id=order_id)
    order.send_confirmation_email()

def process_order(order_id):
    workflow = chain(
        validate_inventory.s(order_id),
        charge_payment.s(),
        send_confirmation.s()
    )
    workflow.delay()

Additional Notes

Common Pitfalls:

  • Passing Django model instances instead of IDs
  • Not handling task idempotency with retries
  • Missing timeout configuration for long tasks
  • Not monitoring task queue depth
  • Ignoring result backend cleanup

Django Integration:

  • Use django-celery-beat for database-backed schedules
  • Use django-celery-results for storing task results in Django
  • Configure CELERY_ settings in Django settings.py
  • Use @shared_task for reusable apps

Security:

  • Never pass sensitive data in task arguments
  • Use signed serializers if pickle is required
  • Restrict Flower access in production
  • Validate task arguments