putcho01

Back

TABLE OF CONTENTS

通信プロトコル「SSE」を理解するBlur image

AI要約#

はじめに#

この記事は、SSE について調べ、実際に実装してみた際の情報をまとめています。

SSE とは#

SSE(Server-Sent Events)は W3C 標準 の技術 で、HTTP コネクションを維持したままサーバーがクライアントへ非同期にイベントをプッシュできる。ブラウザは EventSource API でネイティブに対応している。

項目内容
通信方向Server → Client(単方向)
プロトコル基盤HTTP / HTTPS
コンテンツタイプtext/event-stream
ブラウザ APIEventSource
標準化W3C / WHATWG Living Standard(全モダンブラウザ対応)

クライアントからサーバーへの追加送信は、通常の HTTP リクエストを別途行う形になる。

イベントストリームの形式#

SSE のデータはテキストベースで、\n\n(空行)でイベントを区切るシンプルな形式。

id: 42
event: priceUpdate
data: {"symbol":"BTC","price":8500000}
retry: 3000
 
data: シンプルなメッセージ(event フィールドなし)
 
: これはコメント行(クライアントには届かない)
bash

https://developer.mozilla.org/ja/docs/Web/API/Server-sent_events/Using_server-sent_events#%E3%82%A4%E3%83%99%E3%83%B3%E3%83%88%E3%82%B9%E3%83%88%E3%83%AA%E3%83%BC%E3%83%A0%E3%81%AE%E5%BD%A2%E5%BC%8F

フィールド一覧#

フィールド必須説明
id任意イベント ID。再接続時の Last-Event-ID ヘッダーに使用される
event任意イベント種別。省略時は "message" として扱われる
data実質必須ペイロード。複数行記述可(各行に data: を付与)
retry任意再接続間隔(ミリ秒)
: commentコロン始まりはコメント

特徴・仕組み#

メリット#

  • HTTP ベースで既存インフラをそのまま利用可能(プロキシ・CDN・ロードバランサーとの相性良好)
  • ブラウザに EventSource API が組み込み(追加ライブラリ不要)
  • 自動再接続機能を標準装備
  • Last-Event-ID によるイベント再送・耐障害性
  • 実装がシンプルでデバッグしやすい
  • CORS は通常の HTTP と同じ方法で対応可

制限・注意点#

  • サーバー → クライアントの一方向のみ(双方向通信には WebSocket が適切)
  • HTTP/1.1 ではドメインあたり接続数制限(6本)
  • バイナリデータは Base64 等でエンコードが必要
  • サーバー側でコネクションを長期維持するためリソース管理が必要

接続シーケンス#

sequenceDiagram
    participant C as Client (EventSource)
    participant S as Server
 
    C->>S: GET /events<br/>Accept: text/event-stream
    S-->>C: 200 OK<br/>Content-Type: text/event-stream
 
    loop 接続を維持したまま継続
        S-->>C: id: 1\ndata: event1\n\n
        S-->>C: id: 2\ndata: event2\n\n
        S-->>C: : keepalive(コメント)
    end
 
    note over C,S: ネットワーク障害などで切断
 
    C->>S: GET /events<br/>Last-Event-ID: 2(自動再接続)
    S-->>C: 200 OK(id: 2 以降のイベントから再開)
mermaid

HTTP/2 での挙動#

HTTP/2 では単一の TCP コネクションで複数の SSE ストリームを多重化できる。同じ接続で複数の要求を送信可能なため、HTTP/1.1 の接続数制限を実質解消できる。本番環境では HTTP/2 + TLS の構成を推奨。

他プロトコルとの比較#

項目SSEWebSocketLong PollingHTTP/2 Push
通信方向Server → Client双方向Server → ClientServer → Client
プロトコルHTTPws://(別プロトコル)HTTPHTTP/2
自動再接続✅ 標準装備✖️ 自前実装✖️ 自前実装✖️ 自前実装
ブラウザ実装EventSource APIWebSocket APIfetch / XHRPush API(限定的)
オーバーヘッド低(初回 HTTP headers のみ)高(毎回 HTTP headers)
プロキシ通過✅ 容易△ 要設定✅ 容易△ HTTP/2 対応必要
データ形式テキストのみテキスト・バイナリ任意任意
実装難易度低〜中中〜高
最適なユースケース通知・フィード・AI 出力チャット・ゲーム・同時編集旧環境・シンプルな通知リソースプッシュ(CSS/JS)

SSE が優れる場面#

  • サーバー → クライアントへの一方向プッシュで十分
  • 既存の HTTP インフラ(nginx, CDN)を活かしたい
  • 実装コストを最小限にしたい
  • 自動再接続・イベント ID の仕組みが欲しい
  • AI のストリーミングレスポンス

用途#

ユースケース説明
AI ストリーミング出力LLM のレスポンスを生成しながらリアルタイム表示
リアルタイム通知GitHub 通知・Jira コメントなど、サーバー起因イベントのプッシュ配信
ライブダッシュボード株価・IoT センサー値のリアルタイム更新
ライブフィードニュースフィード・SNS タイムライン・スポーツスコアの更新
長時間ジョブの進捗PDF 生成・動画変換・ML トレーニングなどの進捗表示

なぜ AI ストリーミングに SSE が選ばれるのか#

LLM はレスポンスを逐次生成(autoregressive) する。全文生成を待ってから返すと数十秒かかる場合があり、UX が著しく悪化する。SSE を使えば最初のトークンを即座に表示でき、ユーザーは応答が始まったことをすぐに認識できる。

観点詳細
生成の性質LLM はトークンを 1 つずつ順番に生成するため、ストリーミングと相性が良い
通信の方向ユーザーの入力(プロンプト)は 1 回の POST、応答は SSE で返す一方向で十分
インフラ適合既存の HTTP/HTTPS スタックをそのまま利用でき、CDN エッジでも扱いやすい
実装コストWebSocket より構成がシンプルで、CORS も通常の HTTP と同じ方法で対応できる

全体のアーキテクチャ#

sequenceDiagram
    participant U as ユーザー (Browser)
    participant API as API Server
    participant LLM as LLM Engine
 
    U->>API: POST /chat<br/>{"message": "こんにちは"}
    API->>LLM: プロンプト送信(推論開始)
 
    API-->>U: 200 OK<br/>Content-Type: text/event-stream
 
    loop トークンを逐次ストリーミング
        LLM-->>API: token: "こん"
        API-->>U: data: {"token":"こん","done":false}
 
        LLM-->>API: token: "にちは"
        API-->>U: data: {"token":"にちは","done":false}
 
        LLM-->>API: token: "!"
        API-->>U: data: {"token":"!","done":false}
    end
 
    LLM-->>API: 生成完了
    API-->>U: data: {"token":"","done":true}
mermaid

Anthropic API(Claude)の実際のストリームイベント#

Claude API は SSE で以下のようなイベントを順番に送信する

event: message_start
data: {"type":"message_start","message":{"id":"msg_01...","role":"assistant",...}}
 
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
 
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"こん"}}
 
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"にちは"}}
 
event: content_block_stop
data: {"type":"content_block_stop","index":0}
 
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":5}}
 
event: message_stop
data: {"type":"message_stop"}
bash

Go で Claude API のストリームを中継する実装例#

package main
 
import (
    "bufio"
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
    "strings"
)
 
// ClaudeStreamHandler はクライアントへ Claude のストリームをそのまま中継する
func ClaudeStreamHandler(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }
 
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")
 
    // リクエストボディからユーザーメッセージを取得
    var req struct {
        Message string `json:"message"`
    }
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "invalid request", http.StatusBadRequest)
        return
    }
 
    // Anthropic API へストリーミングリクエスト
    payload := map[string]any{
        "model":      "claude-sonnet-4.6",
        "max_tokens": 1024,
        "stream":     true,
        "messages": []map[string]string{
            {"role": "user", "content": req.Message},
        },
    }
    body, _ := json.Marshal(payload)
 
    apiReq, _ := http.NewRequestWithContext(r.Context(), "POST",
        "https://api.anthropic.com/v1/messages", bytes.NewReader(body))
    apiReq.Header.Set("x-api-key", "YOUR_API_KEY")
    apiReq.Header.Set("anthropic-version", "2023-06-01")
    apiReq.Header.Set("content-type", "application/json")
 
    resp, err := http.DefaultClient.Do(apiReq)
    if err != nil {
        fmt.Fprintf(w, "event: error\ndata: {\"message\":\"%v\"}\n\n", err)
        flusher.Flush()
        return
    }
    defer resp.Body.Close()
 
    // SSE をそのままクライアントへ中継
    scanner := bufio.NewScanner(resp.Body)
    for scanner.Scan() {
        line := scanner.Text()
 
        // content_block_delta からテキストだけ抽出して転送する場合
        if strings.HasPrefix(line, "data: ") {
            fmt.Fprintf(w, "%s\n\n", line)
            flusher.Flush()
        }
 
        // クライアント切断を検知して終了
        select {
        case <-r.Context().Done():
            return
        default:
        }
    }
}
go

ストリーミング受信の状態管理#

stateDiagram-v2
    [*] --> Idle
    Idle --> Connecting : POST /chat 送信
    Connecting --> Streaming : 200 OK 受信
    Streaming --> Streaming : content_block_delta 受信 トークンを UI に追加
    Streaming --> Done : message_stop 受信
    Streaming --> Error : ネットワーク切断 / エラー
    Error --> Connecting : 自動再接続(Last-Event-ID)
    Done --> Idle : 次のメッセージ待ち
mermaid

実装上の注意点#

項目内容
バックプレッシャーLLM の生成速度 > クライアントの消費速度になることはほぼないが、中継サーバーのチャンネルバッファには余裕を持たせる
タイムアウト設定長文生成は数分かかる場合がある。Go の http.Client.Timeout を十分長く設定する
エラーイベントの送信LLM API 側のエラーはカスタムイベントで通知し、クライアントが適切にハンドリングできるようにする
コスト管理ストリーミング中にユーザーがキャンセルした場合でも LLM 側の推論は続く。context.WithCancel でリクエストを明示的にキャンセルする
部分的なレスポンスの保存切断時に途中まで生成されたテキストを保存しておくと、再接続後にシームレスに再開できる

Go での実装#

基本的な SSE サーバー(標準ライブラリのみ)#

package main
 
import (
    "fmt"
    "math/rand"
    "net/http"
    "time"
)
 
func sseHandler(w http.ResponseWriter, r *http.Request) {
    // SSE に必要なヘッダーを設定
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    // CORS が必要な場合
    w.Header().Set("Access-Control-Allow-Origin", "*")
 
    // http.Flusher を取得(チャンク送信に必須)
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }
 
    // クライアント切断を検知
    ctx := r.Context()
    eventID := 1
 
    for {
        select {
        case <-ctx.Done(): // クライアントが切断した
            return
        default:
        }
 
        // イベントを送信
        price := 8000000 + rand.Intn(500000)
        fmt.Fprintf(w, "id: %d\n", eventID)
        fmt.Fprintf(w, "event: priceUpdate\n")
        fmt.Fprintf(w, "data: {\"price\":%d}\n", price)
        fmt.Fprintf(w, "\n") // イベント終端の空行
        flusher.Flush()      // バッファをフラッシュ(重要!)
 
        eventID++
        time.Sleep(1 * time.Second)
    }
}
 
func main() {
    http.HandleFunc("/events", sseHandler)
    fmt.Println("Server running on :8080")
    http.ListenAndServe(":8080", nil)
}
go

実装のポイント

  1. Content-Type: text/event-stream + Cache-Control: no-cache のヘッダーセット
  2. http.Flusher による即時フラッシュ(これがないとバッファリングされて届かない)
  3. r.Context().Done() によるクライアント切断の検知

ブロードキャストアーキテクチャ#

graph LR
    SRC["イベントソース\n(DB / Queue / Timer)"]
 
    subgraph Server["Go Server"]
        B["Broker\n(goroutine)"]
        H1["Handler\ngoroutine"]
        H2["Handler\ngoroutine"]
        H3["Handler\ngoroutine"]
    end
 
    C1["Client 1\nEventSource"]
    C2["Client 2\nEventSource"]
    C3["Client 3\nEventSource"]
 
    SRC -->|broadcast chan| B
    B -->|chan string| H1
    B -->|chan string| H2
    B -->|chan string| H3
    H1 -->|SSE stream| C1
    H2 -->|SSE stream| C2
    H3 -->|SSE stream| C3
mermaid

複数クライアントへのブロードキャスト#

package main
 
import (
    "fmt"
    "net/http"
    "sync"
)
 
// Broker はクライアントへのブロードキャストを管理する
type Broker struct {
    clients   map[chan string]struct{}
    mu        sync.RWMutex
    subscribe chan chan string
    unsub     chan chan string
    broadcast chan string
}
 
func NewBroker() *Broker {
    b := &Broker{
        clients:   make(map[chan string]struct{}),
        subscribe: make(chan chan string),
        unsub:     make(chan chan string),
        broadcast: make(chan string, 100),
    }
    go b.run()
    return b
}
 
func (b *Broker) run() {
    for {
        select {
        case client := <-b.subscribe:
            b.clients[client] = struct{}{}
 
        case client := <-b.unsub:
            delete(b.clients, client)
            close(client)
 
        case msg := <-b.broadcast:
            for client := range b.clients {
                select {
                case client <- msg: // ノンブロッキング送信
                default:            // 遅延クライアントはスキップ
                }
            }
        }
    }
}
 
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }
 
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
 
    client := make(chan string, 10)
    b.subscribe <- client
    defer func() { b.unsub <- client }()
 
    for {
        select {
        case <-r.Context().Done():
            return
        case msg := <-client:
            fmt.Fprintf(w, "data: %s\n\n", msg)
            flusher.Flush()
        }
    }
}
go

接続数の管理#

// セマフォで同時接続数を制限
type Server struct {
    sem chan struct{} // バッファサイズ = 最大同時接続数
}
 
func NewServer(maxConns int) *Server {
    return &Server{sem: make(chan struct{}, maxConns)}
}
 
func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
    select {
    case s.sem <- struct{}{}: // 接続枠を取得
        defer func() { <-s.sem }() // 終了時に解放
    default:
        http.Error(w, "Too many connections", http.StatusTooManyRequests)
        return
    }
    // ... SSE 処理
}
go

認証#

EventSource はリクエストヘッダーを自由にセットできないため、通常の Authorization: Bearer ... ヘッダー認証がそのままでは使えない。 https://developer.mozilla.org/en-US/docs/Web/API/EventSource/EventSource

// ❌ EventSource:ヘッダー設定不可・GET しか使えない
new EventSource('/events', { headers: { 'Authorization': 'Bearer token' } });
javascript

認証方法の比較#

方法向いている場面注意点
fetch + Bearer トークンAPI サーバーが別ドメイン自動再接続は自前実装が必要
Cookie 認証同一ドメインの Web アプリ・セッション管理が既にある構成Access-Control-Allow-Origin: * は不可。withCredentials: true が必要
ワンタイムトークン方式EventSource を使いたいが Bearer トークンも必要な場合事前に POST でトークンを発行し、クエリパラメータに含めて SSE 接続する。サーバー側での管理が必要

例: fetch + Bearer トークンの認証フロー#

sequenceDiagram
    participant C as Client (SPA)
    participant API as API Server
    participant LLM as LLM Engine
 
    C->>API: POST /auth { email, password }
    API-->>C: 200 OK { token: "eyJ..." }
 
    C->>API: POST /chat<br/>Authorization: Bearer eyJ...<br/>body: { message: "..." }
    API->>LLM: プロンプト送信
    API-->>C: 200 OK(SSE ストリーム開始)
 
    loop トークン逐次配信
        LLM-->>API: token
        API-->>C: data: {"token":"..."}
    end
 
    API-->>C: data: {"done":true}
mermaid

再接続について#

AI チャットでは自動再接続は基本的に不要。途中で切断した場合はエラーを表示してユーザーに再送信を促す設計で十分。LLM の生成途中からストリームを再開する仕組みはサーバー側の実装が複雑になるため、費用対効果が低い。

ライブラリ選定(Go)#

標準ライブラリr3labs/sse/v2manucorporat/sse
依存なしありあり
ブロードキャスト自前⚪︎自前
トピック管理自前⚪︎✖️
学習コスト低(Go の知識のみ)低〜中
選ぶ基準依存を増やしたくない・1対1 SSE で十分ブロードキャスト・トピック管理が必要gin 使用・シンプルな用途
profile

putcho01

サーバーサイドエンジニア。 GoとGCPをよく触っています。

通信プロトコル「SSE」を理解する
Author Nagattyo
Published at 2026年5月13日