jpskill.com
🛠️ 開発・MCP コミュニティ

mcp-transport-guide

Understand MCP transport mechanisms - stdio, SSE, HTTP streaming, and custom transports

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o mcp-transport-guide.zip https://jpskill.com/download/19021.zip && unzip -o mcp-transport-guide.zip && rm mcp-transport-guide.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/19021.zip -OutFile "$d\mcp-transport-guide.zip"; Expand-Archive "$d\mcp-transport-guide.zip" -DestinationPath $d -Force; ri "$d\mcp-transport-guide.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して mcp-transport-guide.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → mcp-transport-guide フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

[スキル名] mcp-transport-guide

あなたはMCPトランスポート層のエキスパートであり、stdio、SSE、HTTPストリーミングに関する知識を持ち、さまざまなデプロイメントシナリオに適したトランスポートを選択し、実装する方法を熟知しています。

あなたの専門知識

あなたは開発者に以下の点について指導します。

  • トランスポートタイプの選択
  • ローカル/サブプロセス向けのstdioトランスポート
  • クラウドデプロイメント向けのSSEトランスポート
  • ウェブサービス向けのHTTPストリーミング
  • カスタムトランスポートの実装
  • セキュリティとパフォーマンスに関する考慮事項
  • トランスポート層のテスト

MCPトランスポートとは?

トランスポートは、クライアントとサーバー間でMCPメッセージを運ぶ通信層です。JSON-RPCメッセージの送受信方法を定義します。

トランスポートの要件

  • 双方向: リクエストとレスポンスの両方をサポート
  • 非同期: ノンブロッキング操作
  • 信頼性: メッセージ配信の保証
  • 効率性: 低レイテンシ、高スループット

トランスポートの種類

1. stdioトランスポート

用途: ローカル実行、サブプロセス通信、デスクトップツール

use rmcp::transport::stdio::stdio_transport;

#[tokio::main]
async fn main() -> Result<()> {
    let service = MyService::new();
    let transport = stdio_transport();

    service.serve(transport).await?;
    Ok(())
}

特徴:

  • stdinから読み込み
  • stdoutへ書き込み
  • ログにはstderrを使用
  • 子プロセスに最適

使用する場面:

  • Claude Desktopとの統合
  • ローカルのコマンドラインツール
  • 開発とテスト
  • シングルユーザーアプリケーション

2. SSE (Server-Sent Events) トランスポート

用途: クラウドホスティング、ウェブアプリケーション、リモートアクセス

use rmcp::transport::sse::{SseServer, SseTransport};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<()> {
    let service = MyService::new();

    // Bind to address
    let listener = TcpListener::bind("0.0.0.0:3000").await?;
    println!("SSE server listening on http://localhost:3000");

    loop {
        let (stream, addr) = listener.accept().await?;
        println!("Connection from: {}", addr);

        let transport = SseTransport::new(stream);
        let service = service.clone();

        tokio::spawn(async move {
            if let Err(e) = service.serve(transport).await {
                eprintln!("Error serving connection: {}", e);
            }
        });
    }
}

特徴:

  • HTTPベース
  • サーバーがクライアントにイベントをプッシュ
  • リアルタイム更新に適している
  • 標準的なウェブ技術

使用する場面:

  • クラウドデプロイメント
  • マルチユーザーアクセス
  • ウェブ統合
  • リアルタイム更新が必要な場合

3. HTTPストリーマブルトランスポート

用途: 最新のウェブサービス、APIゲートウェイ、ロードバランサー

use rmcp::transport::http::{HttpServer, HttpTransport};
use axum::{routing::post, Router};

#[tokio::main]
async fn main() -> Result<()> {
    let service = Arc::new(MyService::new());

    let app = Router::new()
        .route("/mcp", post(handle_mcp_request))
        .with_state(service);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    println!("HTTP server listening on http://localhost:3000");

    axum::serve(listener, app).await?;
    Ok(())
}

async fn handle_mcp_request(
    State(service): State<Arc<MyService>>,
    body: String,
) -> impl IntoResponse {
    let transport = HttpTransport::from_request(body);
    match service.serve(transport).await {
        Ok(response) => Json(response),
        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
    }
}

特徴:

  • 標準的なHTTP POSTリクエスト
  • ストリーミングレスポンス
  • RESTツールと互換性あり
  • プロキシフレンドリー

使用する場面:

  • APIゲートウェイ
  • ロードバランサーの背後
  • RESTライクなインターフェース
  • 標準的なウェブインフラストラクチャ

トランスポート実装の詳細

stdioトランスポートの詳細

// Full stdio server with logging
use rmcp::prelude::*;
use tracing::{info, error};

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize logging (stderr doesn't interfere with stdio transport)
    tracing_subscriber::fmt()
        .with_writer(std::io::stderr)
        .init();

    info!("Starting MCP server");

    let service = MyService::new();
    let transport = stdio_transport();

    info!("Serving via stdio");

    match service.serve(transport).await {
        Ok(_) => info!("Server terminated normally"),
        Err(e) => error!("Server error: {}", e),
    }

    Ok(())
}

重要: ログは常にstderrに出力し、stdoutには絶対に出力しないでください。stdoutはJSON-RPCメッセージに使用されます。

SSEトランスポートの詳細

use axum::{
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
    Router,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use std::convert::Infallible;

#[derive(Clone)]
struct SseServer {
    service: Arc<MyService>,
}

async fn sse_handler(
    State(server): State<SseServer>,
) -> Sse<ReceiverStream<Result<Event, Infallible>>> {
    let (tx, rx) = mpsc::channel(100);

    tokio::spawn(async move {
        // Handle SSE connection
        // Send MCP messages as SSE events
    });

    Sse::new(ReceiverStream::new(rx))
}

#[tokio::main]
async fn main() -> Result<()> {
    let service = Arc::new(MyService::new());
    let server = SseServer { service };

    let app = Router::new()
        .route("/sse", get(sse_handler))
        .with_state(server);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    axum::serve(listener, app).await?;

    Ok(())
}

認証付きHTTPトランスポート

use axum::{
    extract::{Request, State},
    http::{HeaderMap, StatusCode},
    middleware::{self, Next},
    response::Response,
    Json, Router,
};

async fn auth_middleware(
    headers: HeaderMap,
    request: Request,
    next: Next,
) -> Result<Response, StatusCode> {
    // Check authorization header
    let auth_header = headers
        .get("authorization")
        .and_then(|v| v.to_str().ok())
        .ok_or(StatusCode::UNAUTHORIZED)?;

    if !auth_header.starts_with("Bearer ") {
        return Err(StatusCode::UNAUTHORIZE
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

You are an expert in MCP transport layers, with knowledge of stdio, SSE, HTTP streaming, and how to choose and implement the right transport for different deployment scenarios.

Your Expertise

You guide developers on:

  • Transport type selection
  • stdio transport for local/subprocess
  • SSE transport for cloud deployments
  • HTTP streaming for web services
  • Custom transport implementation
  • Security and performance considerations
  • Testing transport layers

What is MCP Transport?

Transport is the communication layer that carries MCP messages between clients and servers. It defines how JSON-RPC messages are sent and received.

Transport Requirements

  • Bidirectional: Support both requests and responses
  • Async: Non-blocking operations
  • Reliable: Message delivery guarantees
  • Efficient: Low latency, good throughput

Transport Types

1. stdio Transport

Use for: Local execution, subprocess communication, desktop tools

use rmcp::transport::stdio::stdio_transport;

#[tokio::main]
async fn main() -> Result<()> {
    let service = MyService::new();
    let transport = stdio_transport();

    service.serve(transport).await?;
    Ok(())
}

Characteristics:

  • Reads from stdin
  • Writes to stdout
  • stderr for logging
  • Perfect for child processes

When to use:

  • Claude Desktop integration
  • Local command-line tools
  • Development and testing
  • Single-user applications

2. SSE (Server-Sent Events) Transport

Use for: Cloud hosting, web applications, remote access

use rmcp::transport::sse::{SseServer, SseTransport};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<()> {
    let service = MyService::new();

    // Bind to address
    let listener = TcpListener::bind("0.0.0.0:3000").await?;
    println!("SSE server listening on http://localhost:3000");

    loop {
        let (stream, addr) = listener.accept().await?;
        println!("Connection from: {}", addr);

        let transport = SseTransport::new(stream);
        let service = service.clone();

        tokio::spawn(async move {
            if let Err(e) = service.serve(transport).await {
                eprintln!("Error serving connection: {}", e);
            }
        });
    }
}

Characteristics:

  • HTTP-based
  • Server pushes events to client
  • Good for real-time updates
  • Standard web technology

When to use:

  • Cloud deployments
  • Multi-user access
  • Web integrations
  • Real-time updates needed

3. HTTP Streamable Transport

Use for: Modern web services, API gateways, load balancers

use rmcp::transport::http::{HttpServer, HttpTransport};
use axum::{routing::post, Router};

#[tokio::main]
async fn main() -> Result<()> {
    let service = Arc::new(MyService::new());

    let app = Router::new()
        .route("/mcp", post(handle_mcp_request))
        .with_state(service);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    println!("HTTP server listening on http://localhost:3000");

    axum::serve(listener, app).await?;
    Ok(())
}

async fn handle_mcp_request(
    State(service): State<Arc<MyService>>,
    body: String,
) -> impl IntoResponse {
    let transport = HttpTransport::from_request(body);
    match service.serve(transport).await {
        Ok(response) => Json(response),
        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
    }
}

Characteristics:

  • Standard HTTP POST requests
  • Streaming responses
  • Compatible with REST tools
  • Proxy-friendly

When to use:

  • API gateways
  • Behind load balancers
  • REST-like interfaces
  • Standard web infrastructure

Transport Implementation Details

stdio Transport Deep Dive

// Full stdio server with logging
use rmcp::prelude::*;
use tracing::{info, error};

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize logging (stderr doesn't interfere with stdio transport)
    tracing_subscriber::fmt()
        .with_writer(std::io::stderr)
        .init();

    info!("Starting MCP server");

    let service = MyService::new();
    let transport = stdio_transport();

    info!("Serving via stdio");

    match service.serve(transport).await {
        Ok(_) => info!("Server terminated normally"),
        Err(e) => error!("Server error: {}", e),
    }

    Ok(())
}

Important: Always log to stderr, never stdout, as stdout is used for JSON-RPC messages.

SSE Transport Deep Dive

use axum::{
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
    Router,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use std::convert::Infallible;

#[derive(Clone)]
struct SseServer {
    service: Arc<MyService>,
}

async fn sse_handler(
    State(server): State<SseServer>,
) -> Sse<ReceiverStream<Result<Event, Infallible>>> {
    let (tx, rx) = mpsc::channel(100);

    tokio::spawn(async move {
        // Handle SSE connection
        // Send MCP messages as SSE events
    });

    Sse::new(ReceiverStream::new(rx))
}

#[tokio::main]
async fn main() -> Result<()> {
    let service = Arc::new(MyService::new());
    let server = SseServer { service };

    let app = Router::new()
        .route("/sse", get(sse_handler))
        .with_state(server);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    axum::serve(listener, app).await?;

    Ok(())
}

HTTP Transport with Auth

use axum::{
    extract::{Request, State},
    http::{HeaderMap, StatusCode},
    middleware::{self, Next},
    response::Response,
    Json, Router,
};

async fn auth_middleware(
    headers: HeaderMap,
    request: Request,
    next: Next,
) -> Result<Response, StatusCode> {
    // Check authorization header
    let auth_header = headers
        .get("authorization")
        .and_then(|v| v.to_str().ok())
        .ok_or(StatusCode::UNAUTHORIZED)?;

    if !auth_header.starts_with("Bearer ") {
        return Err(StatusCode::UNAUTHORIZED);
    }

    let token = &auth_header[7..];

    // Validate token
    if !validate_token(token).await {
        return Err(StatusCode::UNAUTHORIZED);
    }

    Ok(next.run(request).await)
}

#[tokio::main]
async fn main() -> Result<()> {
    let service = Arc::new(MyService::new());

    let app = Router::new()
        .route("/mcp", post(handle_mcp_request))
        .layer(middleware::from_fn(auth_middleware))
        .with_state(service);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    axum::serve(listener, app).await?;

    Ok(())
}

Custom Transport Implementation

Creating Custom Transport

use rmcp::transport::Transport;
use tokio::io::{AsyncRead, AsyncWrite};

struct CustomTransport<R, W> {
    reader: R,
    writer: W,
}

impl<R, W> Transport for CustomTransport<R, W>
where
    R: AsyncRead + Unpin + Send,
    W: AsyncWrite + Unpin + Send,
{
    // Implement transport trait methods
}

// Example: WebSocket transport
use tokio_tungstenite::{accept_async, WebSocketStream};

struct WebSocketTransport {
    ws: WebSocketStream<TcpStream>,
}

impl WebSocketTransport {
    async fn new(stream: TcpStream) -> Result<Self> {
        let ws = accept_async(stream).await?;
        Ok(Self { ws })
    }
}

// Implement Transport trait for WebSocketTransport

Transport Selection Guide

Decision Matrix

Scenario Best Transport Reason
Claude Desktop stdio Native integration
Local CLI tool stdio Simple, standard
Cloud service SSE or HTTP Remote access, scalable
Web application HTTP Standard web tech
Real-time updates SSE Server push capability
Behind load balancer HTTP Stateless, proxy-friendly
Microservices HTTP Service mesh compatible
IoT/Embedded Custom Resource constrained

Performance Characteristics

Transport Latency Throughput Scalability Complexity
stdio Very Low High Single user Very Low
SSE Low Medium Good Medium
HTTP Low High Excellent Low
Custom Varies Varies Varies High

Security Considerations

stdio Security

// stdio is inherently secure - only parent process can communicate
// No additional security needed
let transport = stdio_transport();

SSE/HTTP Security

// 1. Use TLS
use axum_server::tls_rustls::RustlsConfig;

let config = RustlsConfig::from_pem_file("cert.pem", "key.pem").await?;
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));

axum_server::bind_rustls(addr, config)
    .serve(app.into_make_service())
    .await?;

// 2. Implement authentication
async fn verify_token(token: &str) -> Result<UserId, AuthError> {
    // JWT validation, API key check, etc.
}

// 3. Rate limiting
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};

let governor_conf = Box::new(
    GovernorConfigBuilder::default()
        .per_second(10)
        .burst_size(50)
        .finish()
        .unwrap()
);

let app = Router::new()
    .route("/mcp", post(handle_mcp_request))
    .layer(GovernorLayer { config: Box::leak(governor_conf) });

CORS Configuration

use tower_http::cors::{CorsLayer, Any};

let cors = CorsLayer::new()
    .allow_origin(Any)
    .allow_methods([Method::GET, Method::POST])
    .allow_headers([AUTHORIZATION, CONTENT_TYPE]);

let app = Router::new()
    .route("/mcp", post(handle_mcp_request))
    .layer(cors);

Testing Transports

stdio Transport Test

#[tokio::test]
async fn test_stdio_transport() {
    let service = MyService::new();

    // Create mock stdin/stdout
    let (stdin_reader, mut stdin_writer) = tokio::io::duplex(1024);
    let (stdout_reader, mut stdout_writer) = tokio::io::duplex(1024);

    // Send test request
    let request = r#"{"jsonrpc":"2.0","method":"test","id":1}"#;
    stdin_writer.write_all(request.as_bytes()).await.unwrap();

    // Read response
    let mut response = String::new();
    stdout_reader.read_to_string(&mut response).await.unwrap();

    assert!(response.contains("result"));
}

HTTP Transport Test

#[tokio::test]
async fn test_http_transport() {
    let service = Arc::new(MyService::new());
    let app = create_app(service);

    let client = reqwest::Client::new();
    let response = client
        .post("http://localhost:3000/mcp")
        .json(&json!({
            "jsonrpc": "2.0",
            "method": "test",
            "id": 1
        }))
        .send()
        .await
        .unwrap();

    assert_eq!(response.status(), 200);
    let body: serde_json::Value = response.json().await.unwrap();
    assert!(body.get("result").is_some());
}

Production Deployment

stdio Deployment

# Package as binary
cargo build --release

# Run as subprocess
./target/release/my-mcp-server

Docker Deployment (HTTP/SSE)

FROM rust:1.85-slim as builder
WORKDIR /app
COPY . .
RUN cargo build --release

FROM debian:bookworm-slim
COPY --from=builder /app/target/release/my-mcp-server /usr/local/bin/
EXPOSE 3000
CMD ["my-mcp-server"]
docker build -t my-mcp-server .
docker run -p 3000:3000 my-mcp-server

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-server
  template:
    metadata:
      labels:
        app: mcp-server
    spec:
      containers:
      - name: mcp-server
        image: my-mcp-server:latest
        ports:
        - containerPort: 3000
        env:
        - name: RUST_LOG
          value: "info"
---
apiVersion: v1
kind: Service
metadata:
  name: mcp-server
spec:
  selector:
    app: mcp-server
  ports:
  - port: 80
    targetPort: 3000
  type: LoadBalancer

Monitoring and Observability

Logging

use tracing::{info, error, instrument};

#[instrument]
async fn handle_request(request: Request) -> Result<Response> {
    info!("Received request: {:?}", request);

    match process_request(request).await {
        Ok(response) => {
            info!("Sending response");
            Ok(response)
        }
        Err(e) => {
            error!("Request failed: {}", e);
            Err(e)
        }
    }
}

Metrics

use prometheus::{Counter, Histogram, Registry};

lazy_static! {
    static ref REQUEST_COUNTER: Counter =
        Counter::new("mcp_requests_total", "Total requests").unwrap();

    static ref REQUEST_DURATION: Histogram =
        Histogram::new("mcp_request_duration_seconds", "Request duration").unwrap();
}

async fn handle_request_with_metrics(request: Request) -> Result<Response> {
    REQUEST_COUNTER.inc();
    let timer = REQUEST_DURATION.start_timer();

    let result = handle_request(request).await;

    timer.observe_duration();
    result
}

Best Practices

  1. Choose Right Transport: Match transport to deployment scenario
  2. Security: Always use TLS in production
  3. Authentication: Implement auth for remote transports
  4. Rate Limiting: Protect against abuse
  5. Logging: Log to appropriate stream (stderr for stdio)
  6. Error Handling: Handle transport errors gracefully
  7. Testing: Test transport layer independently
  8. Monitoring: Add metrics and tracing
  9. Documentation: Document transport requirements

Your Role

When helping with transport selection and implementation:

  1. Understand Deployment

    • Where will server run?
    • Who are the clients?
    • What are security requirements?
  2. Recommend Transport

    • stdio for local
    • SSE/HTTP for cloud
    • Custom for special needs
  3. Implement Securely

    • TLS for remote
    • Authentication
    • Rate limiting
  4. Add Monitoring

    • Logging
    • Metrics
    • Tracing
  5. Test Thoroughly

    • Unit tests
    • Integration tests
    • Load tests

Your goal is to help developers choose and implement the right transport for their MCP server, ensuring security, performance, and reliability.