jpskill.com
🛠️ 開発・MCP コミュニティ

streaming-api-patterns

Implement real-time data streaming with Server-Sent Events (SSE), WebSockets, and ReadableStream APIs. Master backpressure handling, reconnection strategies, and LLM streaming for 2025+ real-time applications.

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o streaming-api-patterns.zip https://jpskill.com/download/17255.zip && unzip -o streaming-api-patterns.zip && rm streaming-api-patterns.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/17255.zip -OutFile "$d\streaming-api-patterns.zip"; Expand-Archive "$d\streaming-api-patterns.zip" -DestinationPath $d -Force; ri "$d\streaming-api-patterns.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して streaming-api-patterns.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → streaming-api-patterns フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

Streaming API パターン

概要

現代のアプリケーションは、リアルタイムなデータ配信を必要とします。このスキルでは、サーバーからクライアントへのストリーミングのための Server-Sent Events (SSE)、双方向通信のための WebSockets、そしてバックプレッシャーの処理と効率的なデータフローのための Streams API を扱います。

このスキルを使用するべき場合:

  • LLM レスポンスのストリーミング (ChatGPT スタイルのインターフェース)
  • リアルタイムな通知とアップデート
  • ライブデータフィード (株価、分析)
  • チャットアプリケーション
  • 長時間実行タスクの進捗アップデート
  • 共同編集機能

コアテクノロジー

1. Server-Sent Events (SSE)

最適な用途: サーバーからクライアントへのストリーミング (LLM レスポンス、通知)

// Next.js Route Handler
export async function GET(req: Request) {
  const encoder = new TextEncoder()

  const stream = new ReadableStream({
    async start(controller) {
      // Send data
      controller.enqueue(encoder.encode('data: Hello\n\n'))

      // Keep connection alive
      const interval = setInterval(() => {
        controller.enqueue(encoder.encode(': keepalive\n\n'))
      }, 30000)

      // Cleanup
      req.signal.addEventListener('abort', () => {
        clearInterval(interval)
        controller.close()
      })
    }
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    }
  })
}

// Client
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
  console.log(event.data)
}

2. WebSockets

最適な用途: 双方向リアルタイム通信 (チャット、コラボレーション)

// WebSocket Server (Next.js with ws)
import { WebSocketServer } from 'ws'

const wss = new WebSocketServer({ port: 8080 })

wss.on('connection', (ws) => {
  ws.on('message', (data) => {
    // Broadcast to all clients
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(data)
      }
    })
  })
})

// Client
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))

3. ReadableStream API

最適な用途: バックプレッシャーを伴う大規模なデータストリームの処理

async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    await new Promise(resolve => setTimeout(resolve, 100))
    yield `data-${i}`
  }
}

const stream = new ReadableStream({
  async start(controller) {
    for await (const chunk of generateData()) {
      controller.enqueue(new TextEncoder().encode(chunk + '\n'))
    }
    controller.close()
  }
})

LLM ストリーミングパターン

// Server
import OpenAI from 'openai'

const openai = new OpenAI()

export async function POST(req: Request) {
  const { messages } = await req.json()

  const stream = await openai.chat.completions.create({
    model: 'gpt-4-turbo-preview',
    messages,
    stream: true
  })

  const encoder = new TextEncoder()

  return new Response(
    new ReadableStream({
      async start(controller) {
        for await (const chunk of stream) {
          const content = chunk.choices[0]?.delta?.content
          if (content) {
            controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`))
          }
        }
        controller.enqueue(encoder.encode('data: [DONE]\n\n'))
        controller.close()
      }
    }),
    {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache'
      }
    }
  )
}

// Client
async function streamChat(messages) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages })
  })

  const reader = response.body.getReader()
  const decoder = new TextDecoder()

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    const chunk = decoder.decode(value)
    const lines = chunk.split('\n')

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6)
        if (data === '[DONE]') return

        const json = JSON.parse(data)
        console.log(json.content) // Stream token
      }
    }
  }
}

再接続戦略

class ReconnectingEventSource {
  private eventSource: EventSource | null = null
  private reconnectDelay = 1000
  private maxReconnectDelay = 30000

  constructor(private url: string, private onMessage: (data: string) => void) {
    this.connect()
  }

  private connect() {
    this.eventSource = new EventSource(this.url)

    this.eventSource.onmessage = (event) => {
      this.reconnectDelay = 1000 // Reset on success
      this.onMessage(event.data)
    }

    this.eventSource.onerror = () => {
      this.eventSource?.close()

      // Exponential backoff
      setTimeout(() => this.connect(), this.reconnectDelay)
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
    }
  }

  close() {
    this.eventSource?.close()
  }
}

ベストプラクティス

SSE

  • ✅ 一方向のサーバーからクライアントへのストリーミングに使用します
  • ✅ 自動再接続を実装します
  • ✅ 30秒ごとにキープアライブメッセージを送信します
  • ✅ ブラウザの接続制限 (ドメインごとに6つ) を処理します
  • ✅ より良いパフォーマンスのために HTTP/2 を使用します

WebSockets

  • ✅ 双方向のリアルタイム通信に使用します
  • ✅ ハートビート/ping-pong を実装します
  • ✅ 指数バックオフで再接続を処理します
  • ✅ メッセージを検証およびサニタイズします
  • ✅ オフライン期間のメッセージキューイングを実装します

バックプレッシャー

  • ✅ 適切なフロー制御で ReadableStream を使用します
  • ✅ バッファサイズを監視します
  • ✅ コンシューマーが遅い場合は、プロダクションを一時停止します
  • ✅ 低速なコンシューマーに対するタイムアウトを実装します

パフォーマンス

  • ✅ データを圧縮します (gzip/brotli)
  • ✅ 小さなメッセージをバッチ処理します
  • ✅ 大量のデータにはバイナリ形式 (MessagePack, Protobuf) を使用します
  • ✅ クライアント側のバッファリングを実装します
  • ✅ 接続数とリソース使用量を監視します

リソース

  • [Server-Sent Events
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Streaming API Patterns

Overview

Modern applications require real-time data delivery. This skill covers Server-Sent Events (SSE) for server-to-client streaming, WebSockets for bidirectional communication, and the Streams API for handling backpressure and efficient data flow.

When to use this skill:

  • Streaming LLM responses (ChatGPT-style interfaces)
  • Real-time notifications and updates
  • Live data feeds (stock prices, analytics)
  • Chat applications
  • Progress updates for long-running tasks
  • Collaborative editing features

Core Technologies

1. Server-Sent Events (SSE)

Best for: Server-to-client streaming (LLM responses, notifications)

// Next.js Route Handler
export async function GET(req: Request) {
  const encoder = new TextEncoder()

  const stream = new ReadableStream({
    async start(controller) {
      // Send data
      controller.enqueue(encoder.encode('data: Hello\n\n'))

      // Keep connection alive
      const interval = setInterval(() => {
        controller.enqueue(encoder.encode(': keepalive\n\n'))
      }, 30000)

      // Cleanup
      req.signal.addEventListener('abort', () => {
        clearInterval(interval)
        controller.close()
      })
    }
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    }
  })
}

// Client
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
  console.log(event.data)
}

2. WebSockets

Best for: Bidirectional real-time communication (chat, collaboration)

// WebSocket Server (Next.js with ws)
import { WebSocketServer } from 'ws'

const wss = new WebSocketServer({ port: 8080 })

wss.on('connection', (ws) => {
  ws.on('message', (data) => {
    // Broadcast to all clients
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(data)
      }
    })
  })
})

// Client
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))

3. ReadableStream API

Best for: Processing large data streams with backpressure

async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    await new Promise(resolve => setTimeout(resolve, 100))
    yield `data-${i}`
  }
}

const stream = new ReadableStream({
  async start(controller) {
    for await (const chunk of generateData()) {
      controller.enqueue(new TextEncoder().encode(chunk + '\n'))
    }
    controller.close()
  }
})

LLM Streaming Pattern

// Server
import OpenAI from 'openai'

const openai = new OpenAI()

export async function POST(req: Request) {
  const { messages } = await req.json()

  const stream = await openai.chat.completions.create({
    model: 'gpt-4-turbo-preview',
    messages,
    stream: true
  })

  const encoder = new TextEncoder()

  return new Response(
    new ReadableStream({
      async start(controller) {
        for await (const chunk of stream) {
          const content = chunk.choices[0]?.delta?.content
          if (content) {
            controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`))
          }
        }
        controller.enqueue(encoder.encode('data: [DONE]\n\n'))
        controller.close()
      }
    }),
    {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache'
      }
    }
  )
}

// Client
async function streamChat(messages) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages })
  })

  const reader = response.body.getReader()
  const decoder = new TextDecoder()

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    const chunk = decoder.decode(value)
    const lines = chunk.split('\n')

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6)
        if (data === '[DONE]') return

        const json = JSON.parse(data)
        console.log(json.content) // Stream token
      }
    }
  }
}

Reconnection Strategy

class ReconnectingEventSource {
  private eventSource: EventSource | null = null
  private reconnectDelay = 1000
  private maxReconnectDelay = 30000

  constructor(private url: string, private onMessage: (data: string) => void) {
    this.connect()
  }

  private connect() {
    this.eventSource = new EventSource(this.url)

    this.eventSource.onmessage = (event) => {
      this.reconnectDelay = 1000 // Reset on success
      this.onMessage(event.data)
    }

    this.eventSource.onerror = () => {
      this.eventSource?.close()

      // Exponential backoff
      setTimeout(() => this.connect(), this.reconnectDelay)
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
    }
  }

  close() {
    this.eventSource?.close()
  }
}

Best Practices

SSE

  • ✅ Use for one-way server-to-client streaming
  • ✅ Implement automatic reconnection
  • ✅ Send keepalive messages every 30s
  • ✅ Handle browser connection limits (6 per domain)
  • ✅ Use HTTP/2 for better performance

WebSockets

  • ✅ Use for bidirectional real-time communication
  • ✅ Implement heartbeat/ping-pong
  • ✅ Handle reconnection with exponential backoff
  • ✅ Validate and sanitize messages
  • ✅ Implement message queuing for offline periods

Backpressure

  • ✅ Use ReadableStream with proper flow control
  • ✅ Monitor buffer sizes
  • ✅ Pause production when consumer is slow
  • ✅ Implement timeouts for slow consumers

Performance

  • ✅ Compress data (gzip/brotli)
  • ✅ Batch small messages
  • ✅ Use binary formats (MessagePack, Protobuf) for large data
  • ✅ Implement client-side buffering
  • ✅ Monitor connection count and resource usage

Resources