jpskill.com
🛠️ 開発・MCP コミュニティ

execution-engine-analysis

Analyze control flow, concurrency models, and event architectures in agent frameworks. Use when (1) understanding async vs sync execution patterns, (2) classifying execution topology (DAG/FSM/Linear), (3) mapping event emission and observability hooks, (4) evaluating scalability characteristics, or (5) comparing execution models across frameworks.

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o execution-engine-analysis.zip https://jpskill.com/download/18856.zip && unzip -o execution-engine-analysis.zip && rm execution-engine-analysis.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/18856.zip -OutFile "$d\execution-engine-analysis.zip"; Expand-Archive "$d\execution-engine-analysis.zip" -DestinationPath $d -Force; ri "$d\execution-engine-analysis.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して execution-engine-analysis.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → execution-engine-analysis フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

[スキル名] 実行エンジン分析

制御フローの基盤と並行処理モデルを分析します。

プロセス

  1. 非同期モデルの特定 — ネイティブ非同期、ラッパー付き同期、またはハイブリッド
  2. トポロジーの分類 — DAG、FSM、または線形チェーン
  3. イベントのカタログ化 — コールバック、リスナー、ジェネレーター
  4. 可観測性のマッピング — 前/後フック、インターセプトポイント

並行処理モデルの分類

ネイティブ非同期

# Signature: async/await throughout
async def run(self):
    result = await self.llm.agenerate(messages)
    return await self.process(result)

# Entry point uses asyncio
asyncio.run(agent.run())

指標: async defawaitasyncio.gatheraiohttp

ラッパー付き同期

# Signature: sync API wrapping async internals
def run(self):
    return asyncio.run(self._async_run())

# Or using thread pools
def run(self):
    with ThreadPoolExecutor() as pool:
        future = pool.submit(self._blocking_call)
        return future.result()

指標: 同期メソッド内の asyncio.run()ThreadPoolExecutorrun_in_executor

ハイブリッド

# Both sync and async APIs exposed
def invoke(self, input):
    return self._sync_invoke(input)

async def ainvoke(self, input):
    return await self._async_invoke(input)

指標: ペアになったメソッド (invoke/ainvoke)、sync_to_async デコレーター

実行トポロジー

DAG (有向非巡回グラフ)

# Signature: Nodes with dependencies
class Node:
    def __init__(self, deps: list[Node]): ...

graph.add_edge(node_a, node_b)
result = graph.execute()  # Topological order

指標: GraphNodeEdge クラス、networkx、トポロジカルソート

FSM (有限状態機械)

# Signature: Explicit states and transitions
class State(Enum):
    THINKING = "thinking"
    ACTING = "acting"
    DONE = "done"

def transition(self, current: State, event: str) -> State:
    if current == State.THINKING and event == "action_chosen":
        return State.ACTING

指標: 状態列挙型、遷移テーブル、current_state、状態機械ライブラリ

線形チェーン

# Signature: Sequential step execution
def run(self):
    result = self.step1()
    result = self.step2(result)
    result = self.step3(result)
    return result

# Or pipeline pattern
chain = step1 | step2 | step3
result = chain.invoke(input)

指標: 連続呼び出し、パイプ演算子 (|)、ChainPipeline クラス

イベントアーキテクチャ

コールバック

class Callbacks:
    def on_llm_start(self, prompt): ...
    def on_llm_end(self, response): ...
    def on_tool_start(self, tool, input): ...
    def on_tool_end(self, output): ...
    def on_error(self, error): ...

柔軟性: 低 — 固定されたフックポイント 追跡可能性: 中 — 追跡しやすい

イベントリスナー/エミッター

emitter = EventEmitter()
emitter.on('llm:start', handler)
emitter.on('tool:*', wildcard_handler)
emitter.emit('llm:start', {'prompt': prompt})

柔軟性: 高 — 動的な登録 追跡可能性: 低 — 追跡が難しい

非同期ジェネレーター (ストリーミング)

async def run(self):
    async for chunk in self.llm.astream(prompt):
        yield {"type": "token", "content": chunk}
    yield {"type": "done"}

柔軟性: 中 — ストリーミングネイティブ 追跡可能性: 高 — データフローに従う

可観測性フックインベントリ

フックポイント 目的 インターセプトレベル
Pre-LLM プロンプトの変更 入力
Post-LLM 生の応答へのアクセス 出力
Pre-Tool ツール入力の検証 入力
Post-Tool ツール出力の変換 出力
Pre-Step 状態の監視 読み取り専用
Post-Step 次のステップの変更 制御フロー
On-Error 処理/変換 エラー

回答すべき質問

  • 実行前にツール入力をインターセプトできますか?
  • 生の LLM 応答にアクセスできますか(トークン数を含む)?
  • フックから制御フローを変更できますか?
  • フックは同期ですか、非同期ですか?

出力テンプレート

## Execution Engine Analysis: [Framework Name]

### Concurrency Model
- **Type**: [Native Async / Sync-with-Wrappers / Hybrid]
- **Entry Point**: `path/to/main.py:run()`
- **Thread Safety**: [Yes/No/Partial]

### Execution Topology
- **Model**: [DAG / FSM / Linear Chain]
- **Implementation**: [Description with code refs]
- **Parallelization**: [Supported/Not Supported]

### Event Architecture
- **Pattern**: [Callbacks / Listeners / Generators]
- **Registration**: [Static / Dynamic]
- **Streaming**: [Supported / Not Supported]

### Observability Inventory

| Hook | Location | Async | Modifiable |
|------|----------|-------|------------|
| on_llm_start | callbacks.py:L23 | Yes | Input only |
| on_tool_end | callbacks.py:L45 | Yes | Output |
| ... | ... | ... | ... |

### Scalability Assessment
- **Blocking Operations**: [List any]
- **Resource Limits**: [Token counters, rate limits]
- **Recommended Concurrency**: [Threads/Processes/AsyncIO]

統合

  • 前提条件: 実行ファイルを特定するための codebase-mapping
  • 入力元: 非同期の決定のための comparative-matrix
  • 関連: エージェント固有のフローのための control-loop-extraction
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Execution Engine Analysis

Analyzes the control flow substrate and concurrency model.

Process

  1. Identify async model — Native async, sync-with-wrappers, or hybrid
  2. Classify topology — DAG, FSM, or linear chain
  3. Catalog events — Callbacks, listeners, generators
  4. Map observability — Pre/post hooks, interception points

Concurrency Model Classification

Native Async

# Signature: async/await throughout
async def run(self):
    result = await self.llm.agenerate(messages)
    return await self.process(result)

# Entry point uses asyncio
asyncio.run(agent.run())

Indicators: async def, await, asyncio.gather, aiohttp

Sync with Wrappers

# Signature: sync API wrapping async internals
def run(self):
    return asyncio.run(self._async_run())

# Or using thread pools
def run(self):
    with ThreadPoolExecutor() as pool:
        future = pool.submit(self._blocking_call)
        return future.result()

Indicators: asyncio.run() inside sync methods, ThreadPoolExecutor, run_in_executor

Hybrid

# Both sync and async APIs exposed
def invoke(self, input):
    return self._sync_invoke(input)

async def ainvoke(self, input):
    return await self._async_invoke(input)

Indicators: Paired methods (invoke/ainvoke), sync_to_async decorators

Execution Topology

DAG (Directed Acyclic Graph)

# Signature: Nodes with dependencies
class Node:
    def __init__(self, deps: list[Node]): ...

graph.add_edge(node_a, node_b)
result = graph.execute()  # Topological order

Indicators: Graph, Node, Edge classes, networkx, topological sort

FSM (Finite State Machine)

# Signature: Explicit states and transitions
class State(Enum):
    THINKING = "thinking"
    ACTING = "acting"
    DONE = "done"

def transition(self, current: State, event: str) -> State:
    if current == State.THINKING and event == "action_chosen":
        return State.ACTING

Indicators: State enums, transition tables, current_state, state machine libraries

Linear Chain

# Signature: Sequential step execution
def run(self):
    result = self.step1()
    result = self.step2(result)
    result = self.step3(result)
    return result

# Or pipeline pattern
chain = step1 | step2 | step3
result = chain.invoke(input)

Indicators: Sequential calls, pipe operators (|), Chain, Pipeline classes

Event Architecture

Callbacks

class Callbacks:
    def on_llm_start(self, prompt): ...
    def on_llm_end(self, response): ...
    def on_tool_start(self, tool, input): ...
    def on_tool_end(self, output): ...
    def on_error(self, error): ...

Flexibility: Low — fixed hook points Traceability: Medium — easy to follow

Event Listeners/Emitters

emitter = EventEmitter()
emitter.on('llm:start', handler)
emitter.on('tool:*', wildcard_handler)
emitter.emit('llm:start', {'prompt': prompt})

Flexibility: High — dynamic registration Traceability: Low — harder to trace

Async Generators (Streaming)

async def run(self):
    async for chunk in self.llm.astream(prompt):
        yield {"type": "token", "content": chunk}
    yield {"type": "done"}

Flexibility: Medium — streaming-native Traceability: High — follows data flow

Observability Hooks Inventory

Hook Point Purpose Interception Level
Pre-LLM Modify prompt Input
Post-LLM Access raw response Output
Pre-Tool Validate tool input Input
Post-Tool Transform tool output Output
Pre-Step Observe state Read-only
Post-Step Modify next step Control flow
On-Error Handle/transform Error

Questions to Answer

  • Can you intercept tool input before execution?
  • Is the raw LLM response accessible (with token counts)?
  • Can you modify control flow from hooks?
  • Are hooks sync or async?

Output Template

## Execution Engine Analysis: [Framework Name]

### Concurrency Model
- **Type**: [Native Async / Sync-with-Wrappers / Hybrid]
- **Entry Point**: `path/to/main.py:run()`
- **Thread Safety**: [Yes/No/Partial]

### Execution Topology
- **Model**: [DAG / FSM / Linear Chain]
- **Implementation**: [Description with code refs]
- **Parallelization**: [Supported/Not Supported]

### Event Architecture
- **Pattern**: [Callbacks / Listeners / Generators]
- **Registration**: [Static / Dynamic]
- **Streaming**: [Supported / Not Supported]

### Observability Inventory

| Hook | Location | Async | Modifiable |
|------|----------|-------|------------|
| on_llm_start | callbacks.py:L23 | Yes | Input only |
| on_tool_end | callbacks.py:L45 | Yes | Output |
| ... | ... | ... | ... |

### Scalability Assessment
- **Blocking Operations**: [List any]
- **Resource Limits**: [Token counters, rate limits]
- **Recommended Concurrency**: [Threads/Processes/AsyncIO]

Integration

  • Prerequisite: codebase-mapping to identify execution files
  • Feeds into: comparative-matrix for async decisions
  • Related: control-loop-extraction for agent-specific flow