jpskill.com
🛠️ 開発・MCP コミュニティ

celery

Pythonで非同期処理や定期的なジョブ実行、バックグラウンド処理などをCeleryを使って効率的に行うことができ、Webリクエストから重い処理を分離して、スムーズなユーザー体験を提供するSkill。

📜 元の英語説明(参考)

Run background tasks in Python with Celery. Use when a user asks to process tasks asynchronously, schedule periodic jobs, run background workers, build task queues in Python, or offload heavy processing from web requests.

🇯🇵 日本人クリエイター向け解説

一言でいうと

Pythonで非同期処理や定期的なジョブ実行、バックグラウンド処理などをCeleryを使って効率的に行うことができ、Webリクエストから重い処理を分離して、スムーズなユーザー体験を提供するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o celery.zip https://jpskill.com/download/14725.zip && unzip -o celery.zip && rm celery.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14725.zip -OutFile "$d\celery.zip"; Expand-Archive "$d\celery.zip" -DestinationPath $d -Force; ri "$d\celery.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して celery.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → celery フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

Celery

概要

Celery は、分散タスク処理のための標準的な Python ライブラリです。時間のかかる操作(メール送信、レポート生成、画像処理)を、ウェブのリクエストからバックグラウンドワーカーにオフロードします。タスクのリトライ、スケジューリング、レート制限、およびチェイニングをサポートします。

手順

ステップ 1: セットアップ

pip install celery[redis]
# celery_app.py — Celery アプリケーション設定
from celery import Celery

app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',       # メッセージブローカー
    backend='redis://localhost:6379/1',       # 結果ストレージ
)

app.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    task_acks_late=True,                     # 処理後に ack (より安全)
    worker_prefetch_multiplier=1,            # ワーカーごとに一度に 1 つのタスク
)

ステップ 2: タスクの定義

# tasks.py — バックグラウンドタスクの定義
from celery_app import app
from celery import shared_task
import time

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: int):
    """新規ユーザーにウェルカムメールを送信します。

    Args:
        user_id: 新規登録されたユーザーのデータベース ID
    """
    try:
        user = get_user(user_id)
        send_email(
            to=user.email,
            subject='Welcome!',
            body=render_template('welcome.html', user=user),
        )
    except EmailServiceError as exc:
        # 指数バックオフでリトライ
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))


@app.task(rate_limit='10/m')    # 1 分あたり最大 10
def process_image(image_path: str, output_path: str):
    """アップロードされた画像のリサイズと最適化を行います。"""
    img = Image.open(image_path)
    img.thumbnail((1200, 1200))
    img.save(output_path, optimize=True, quality=85)
    return output_path


@app.task
def generate_report(org_id: int, start_date: str, end_date: str):
    """分析レポートを生成します(数分かかる場合があります)。"""
    data = fetch_analytics(org_id, start_date, end_date)
    pdf_path = render_pdf_report(data)
    notify_user(org_id, pdf_path)
    return pdf_path

ステップ 3: タスクの呼び出し

# ウェブハンドラー内 (Django view, FastAPI endpoint, など)
from tasks import send_welcome_email, generate_report
from celery import chain, group

# 実行して忘れる
send_welcome_email.delay(user.id)

# 後で結果を取得する
result = generate_report.delay(org.id, '2025-01-01', '2025-01-31')
print(result.status)      # PENDING → STARTED → SUCCESS
print(result.get())        # 完了するまでブロック

# Chain: task1 の結果が task2 に供給される
chain(extract_data.s(url), transform_data.s(), load_data.s())()

# Group: タスクを並列実行する
group(process_image.s(path) for path in image_paths)()

ステップ 4: ワーカーの実行

celery -A celery_app worker --loglevel=info --concurrency=4
celery -A celery_app beat --loglevel=info    # 定期タスク用

ガイドライン

  • 信頼性のために、常に task_acks_late=True を使用してください — タスクはワーカーのクラッシュ後も存続します。
  • バックオフによる自動リトライには、bind=Trueself.retry() を使用してください。
  • Redis は最もシンプルなブローカーです。RabbitMQ は本番環境向けにより堅牢です。
  • Flower で監視します: celery -A celery_app flower (ポート 5555 のウェブダッシュボード)。
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Celery

Overview

Celery is the standard Python library for distributed task processing. Offload slow operations (email sending, report generation, image processing) from web requests to background workers. Supports task retries, scheduling, rate limiting, and chaining.

Instructions

Step 1: Setup

pip install celery[redis]
# celery_app.py — Celery application configuration
from celery import Celery

app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',       # message broker
    backend='redis://localhost:6379/1',       # result storage
)

app.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    task_acks_late=True,                     # ack after processing (safer)
    worker_prefetch_multiplier=1,            # one task at a time per worker
)

Step 2: Define Tasks

# tasks.py — Background task definitions
from celery_app import app
from celery import shared_task
import time

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: int):
    """Send welcome email to new user.

    Args:
        user_id: Database ID of the newly registered user
    """
    try:
        user = get_user(user_id)
        send_email(
            to=user.email,
            subject='Welcome!',
            body=render_template('welcome.html', user=user),
        )
    except EmailServiceError as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))


@app.task(rate_limit='10/m')    # max 10 per minute
def process_image(image_path: str, output_path: str):
    """Resize and optimize uploaded image."""
    img = Image.open(image_path)
    img.thumbnail((1200, 1200))
    img.save(output_path, optimize=True, quality=85)
    return output_path


@app.task
def generate_report(org_id: int, start_date: str, end_date: str):
    """Generate analytics report (may take several minutes)."""
    data = fetch_analytics(org_id, start_date, end_date)
    pdf_path = render_pdf_report(data)
    notify_user(org_id, pdf_path)
    return pdf_path

Step 3: Call Tasks

# In your web handler (Django view, FastAPI endpoint, etc.)
from tasks import send_welcome_email, generate_report
from celery import chain, group

# Fire and forget
send_welcome_email.delay(user.id)

# Get result later
result = generate_report.delay(org.id, '2025-01-01', '2025-01-31')
print(result.status)      # PENDING → STARTED → SUCCESS
print(result.get())        # blocks until done

# Chain: task1 result feeds into task2
chain(extract_data.s(url), transform_data.s(), load_data.s())()

# Group: run tasks in parallel
group(process_image.s(path) for path in image_paths)()

Step 4: Run Workers

celery -A celery_app worker --loglevel=info --concurrency=4
celery -A celery_app beat --loglevel=info    # for periodic tasks

Guidelines

  • Always use task_acks_late=True for reliability — tasks survive worker crashes.
  • Use bind=True and self.retry() for automatic retry with backoff.
  • Redis is the simplest broker; RabbitMQ is more robust for production.
  • Monitor with Flower: celery -A celery_app flower (web dashboard on port 5555).