apscheduler
Advanced Python Scheduler - Task scheduling and job queue system
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o apscheduler.zip https://jpskill.com/download/17562.zip && unzip -o apscheduler.zip && rm apscheduler.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/17562.zip -OutFile "$d\apscheduler.zip"; Expand-Archive "$d\apscheduler.zip" -DestinationPath $d -Force; ri "$d\apscheduler.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
apscheduler.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
apschedulerフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
APScheduler
APScheduler は、Python アプリケーション向けの柔軟なタスクスケジューリングおよびジョブキューシステムです。cron-スタイル、間隔ベース、およびワンオフスケジューリングを含む、複数のスケジューリングメカニズムによる同期および非同期実行の両方をサポートしています。
クイックスタート
基本的な同期スケジューラ
from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger
def tick():
print(f"Tick: {datetime.now()}")
# メモリデータストアでスケジューラを作成して開始します
with Scheduler() as scheduler:
scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
scheduler.run_until_stopped()
FastAPI を使用した非同期スケジューラ
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
def cleanup_task():
print("Running cleanup task...")
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncScheduler()
async with scheduler:
await scheduler.add_schedule(
cleanup_task,
IntervalTrigger(hours=1),
id="cleanup"
)
await scheduler.start_in_background()
yield
app = FastAPI(lifespan=lifespan)
一般的なパターン
スケジューラ
インメモリ スケジューラ (開発):
from apscheduler import AsyncScheduler
async def main():
async with AsyncScheduler() as scheduler:
# 再起動時にジョブは失われます
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
永続スケジューラ (本番):
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
async def main():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
async with AsyncScheduler(data_store) as scheduler:
# ジョブは再起動後も存続します
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
分散スケジューラ:
from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
# スケジューラノード - スケジュールからジョブを作成します
async def scheduler_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.scheduler
) as scheduler:
await scheduler.add_schedule(task, trigger)
await scheduler.run_until_stopped()
# ワーカーノード - ジョブのみを実行します
async def worker_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.worker
) as scheduler:
await scheduler.run_until_stopped()
ジョブ
単純な関数ジョブ:
def send_daily_report():
generate_report()
email_report("admin@example.com")
scheduler.add_schedule(
send_daily_report,
CronTrigger(hour=9, minute=0) # 毎日午前9時
)
引数付きのジョブ:
def process_data(source: str, destination: str, batch_size: int):
# データ処理ロジック
pass
scheduler.add_schedule(
process_data,
IntervalTrigger(hours=1),
kwargs={
'source': 's3://incoming',
'destination': 's3://processed',
'batch_size': 1000
}
)
非同期ジョブ:
async def fetch_external_api():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
data = await resp.json()
await save_to_database(data)
scheduler.add_schedule(
fetch_external_api,
IntervalTrigger(minutes=5)
)
トリガー
間隔トリガー:
from apscheduler.triggers.interval import IntervalTrigger
# 30秒ごと
IntervalTrigger(seconds=30)
# 2時間15分ごと
IntervalTrigger(hours=2, minutes=15)
# 3日ごと
IntervalTrigger(days=3)
Cron トリガー:
from apscheduler.triggers.cron import CronTrigger
# 月曜日から金曜日の午前9時
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')
# 15分ごと
CronTrigger(minute='*/15')
# 月の最終日の午前0時
CronTrigger(day='last', hour=0, minute=0)
# crontab 構文を使用する
CronTrigger.from_crontab('0 9 * * 1-5') # 平日の午前9時
日付トリガー (ワンタイム):
from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger
# 今から5分後
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)
# 特定の日時
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))
カレンダー間隔:
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
# 毎月1日の午前9時
CalendarIntervalTrigger(months=1, hour=9, minute=0)
# 毎週月曜日の午前10時
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)
永続化
SQLite:
engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)
PostgreSQL:
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
Redis (イベントブローカー):
from apscheduler.eventbrokers.redis import RedisEventBroker
event_broker = RedisEventBroker.from_url("redis://localhost:6379")
ジョブ管理
ジョブの結果を取得:
async def main():
async with AsyncScheduler() as scheduler:
await scheduler.start_in_background()
# 結果保持付きでジョブを追加
job_id = await scheduler.add_job(
calculate_result,
args=(10, 20),
result_expiration_time=timedelta(hours=1)
)
(原文がここで切り詰められています) 📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
APScheduler
APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.
Quick Start
Basic Synchronous Scheduler
from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger
def tick():
print(f"Tick: {datetime.now()}")
# Create and start scheduler with memory datastore
with Scheduler() as scheduler:
scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
scheduler.run_until_stopped()
Async Scheduler with FastAPI
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
def cleanup_task():
print("Running cleanup task...")
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncScheduler()
async with scheduler:
await scheduler.add_schedule(
cleanup_task,
IntervalTrigger(hours=1),
id="cleanup"
)
await scheduler.start_in_background()
yield
app = FastAPI(lifespan=lifespan)
Common Patterns
Schedulers
In-memory scheduler (development):
from apscheduler import AsyncScheduler
async def main():
async with AsyncScheduler() as scheduler:
# Jobs lost on restart
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
Persistent scheduler (production):
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
async def main():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
async with AsyncScheduler(data_store) as scheduler:
# Jobs survive restarts
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
Distributed scheduler:
from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
# Scheduler node - creates jobs from schedules
async def scheduler_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.scheduler
) as scheduler:
await scheduler.add_schedule(task, trigger)
await scheduler.run_until_stopped()
# Worker node - executes jobs only
async def worker_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.worker
) as scheduler:
await scheduler.run_until_stopped()
Jobs
Simple function jobs:
def send_daily_report():
generate_report()
email_report("admin@example.com")
scheduler.add_schedule(
send_daily_report,
CronTrigger(hour=9, minute=0) # 9 AM daily
)
Jobs with arguments:
def process_data(source: str, destination: str, batch_size: int):
# Data processing logic
pass
scheduler.add_schedule(
process_data,
IntervalTrigger(hours=1),
kwargs={
'source': 's3://incoming',
'destination': 's3://processed',
'batch_size': 1000
}
)
Async jobs:
async def fetch_external_api():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
data = await resp.json()
await save_to_database(data)
scheduler.add_schedule(
fetch_external_api,
IntervalTrigger(minutes=5)
)
Triggers
Interval trigger:
from apscheduler.triggers.interval import IntervalTrigger
# Every 30 seconds
IntervalTrigger(seconds=30)
# Every 2 hours and 15 minutes
IntervalTrigger(hours=2, minutes=15)
# Every 3 days
IntervalTrigger(days=3)
Cron trigger:
from apscheduler.triggers.cron import CronTrigger
# 9:00 AM Monday-Friday
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')
# Every 15 minutes
CronTrigger(minute='*/15')
# Last day of month at midnight
CronTrigger(day='last', hour=0, minute=0)
# Using crontab syntax
CronTrigger.from_crontab('0 9 * * 1-5') # 9 AM weekdays
Date trigger (one-time):
from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger
# 5 minutes from now
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)
# Specific datetime
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))
Calendar interval:
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
# First day of every month at 9 AM
CalendarIntervalTrigger(months=1, hour=9, minute=0)
# Every Monday at 10 AM
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)
Persistence
SQLite:
engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)
PostgreSQL:
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
Redis (event broker):
from apscheduler.eventbrokers.redis import RedisEventBroker
event_broker = RedisEventBroker.from_url("redis://localhost:6379")
Job Management
Get job results:
async def main():
async with AsyncScheduler() as scheduler:
await scheduler.start_in_background()
# Add job with result retention
job_id = await scheduler.add_job(
calculate_result,
args=(10, 20),
result_expiration_time=timedelta(hours=1)
)
# Wait for result
result = await scheduler.get_job_result(job_id, wait=True)
print(f"Result: {result.return_value}")
Schedule management:
# Pause schedule
await scheduler.pause_schedule("my_schedule")
# Resume schedule
await scheduler.unpause_schedule("my_schedule")
# Remove schedule
await scheduler.remove_schedule("my_schedule")
# Get schedule info
schedule = await scheduler.get_schedule("my_schedule")
print(f"Next run: {schedule.next_fire_time}")
Event handling:
from apscheduler import JobAdded, JobReleased
def on_job_completed(event: JobReleased):
if event.outcome == Outcome.success:
print(f"Job {event.job_id} completed successfully")
else:
print(f"Job {event.job_id} failed: {event.exception}")
scheduler.subscribe(on_job_completed, JobReleased)
Configuration
Task defaults:
from apscheduler import TaskDefaults
task_defaults = TaskDefaults(
job_executor='threadpool',
max_running_jobs=3,
misfire_grace_time=timedelta(minutes=5)
)
scheduler = AsyncScheduler(task_defaults=task_defaults)
Job execution options:
# Configure task behavior
await scheduler.configure_task(
my_function,
job_executor='processpool',
max_running_jobs=5,
misfire_grace_time=timedelta(minutes=10)
)
# Override per schedule
await scheduler.add_schedule(
my_function,
trigger,
job_executor='threadpool', # Override default
coalesce=CoalescePolicy.latest
)
Requirements
# Core package
uv add apscheduler
# Database backends
uv add "apscheduler[postgresql]" # PostgreSQL
uv add "apscheduler[mongodb]" # MongoDB
uv add "apscheduler[sqlite]" # SQLite
# Event brokers
uv add "apscheduler[redis]" # Redis
uv add "apscheduler[mqtt]" # MQTT
Dependencies by use case:
- Basic scheduling:
apscheduler - PostgreSQL persistence:
asyncpg,sqlalchemy - Redis distributed:
redis - MongoDB:
motor - SQLite:
aiosqlite
Best Practices
- Use persistent storage for production to survive restarts
- Set appropriate misfire_grace_time to handle system delays
- Use conflict policies when updating schedules
- Subscribe to events for monitoring and debugging
- Choose appropriate executors (threadpool for I/O, processpool for CPU)
- Implement error handling in job functions
- Use unique schedule IDs for management operations
- Set result expiration to prevent memory leaks