

AI要約#
はじめに#
この記事は、SSE について調べ、実際に実装してみた際の情報をまとめています。
SSE とは#
SSE(Server-Sent Events)は W3C 標準 の技術 ↗ で、HTTP コネクションを維持したままサーバーがクライアントへ非同期にイベントをプッシュできる。ブラウザは EventSource API でネイティブに対応している。
| 項目 | 内容 |
|---|---|
| 通信方向 | Server → Client(単方向) |
| プロトコル基盤 | HTTP / HTTPS |
| コンテンツタイプ | text/event-stream |
| ブラウザ API | EventSource |
| 標準化 | W3C / WHATWG Living Standard(全モダンブラウザ対応) |
クライアントからサーバーへの追加送信は、通常の HTTP リクエストを別途行う形になる。
イベントストリームの形式#
SSE のデータはテキストベースで、\n\n(空行)でイベントを区切るシンプルな形式。
id: 42
event: priceUpdate
data: {"symbol":"BTC","price":8500000}
retry: 3000
data: シンプルなメッセージ(event フィールドなし)
: これはコメント行(クライアントには届かない)bashフィールド一覧#
| フィールド | 必須 | 説明 |
|---|---|---|
id | 任意 | イベント ID。再接続時の Last-Event-ID ヘッダーに使用される |
event | 任意 | イベント種別。省略時は "message" として扱われる |
data | 実質必須 | ペイロード。複数行記述可(各行に data: を付与) |
retry | 任意 | 再接続間隔(ミリ秒) |
: comment | — | コロン始まりはコメント |
特徴・仕組み#
メリット#
- HTTP ベースで既存インフラをそのまま利用可能(プロキシ・CDN・ロードバランサーとの相性良好)
- ブラウザに EventSource API が組み込み(追加ライブラリ不要)
- 自動再接続機能を標準装備
- Last-Event-ID によるイベント再送・耐障害性
- 実装がシンプルでデバッグしやすい
- CORS は通常の HTTP と同じ方法で対応可
制限・注意点#
- サーバー → クライアントの一方向のみ(双方向通信には WebSocket が適切)
- HTTP/1.1 ではドメインあたり接続数制限(6本)
- バイナリデータは Base64 等でエンコードが必要
- サーバー側でコネクションを長期維持するためリソース管理が必要
接続シーケンス#
sequenceDiagram
participant C as Client (EventSource)
participant S as Server
C->>S: GET /events<br/>Accept: text/event-stream
S-->>C: 200 OK<br/>Content-Type: text/event-stream
loop 接続を維持したまま継続
S-->>C: id: 1\ndata: event1\n\n
S-->>C: id: 2\ndata: event2\n\n
S-->>C: : keepalive(コメント)
end
note over C,S: ネットワーク障害などで切断
C->>S: GET /events<br/>Last-Event-ID: 2(自動再接続)
S-->>C: 200 OK(id: 2 以降のイベントから再開)mermaidHTTP/2 での挙動#
HTTP/2 では単一の TCP コネクションで複数の SSE ストリームを多重化できる。同じ接続で複数の要求を送信可能なため、HTTP/1.1 の接続数制限を実質解消できる。本番環境では HTTP/2 + TLS の構成を推奨。
他プロトコルとの比較#
| 項目 | SSE | WebSocket | Long Polling | HTTP/2 Push |
|---|---|---|---|---|
| 通信方向 | Server → Client | 双方向 | Server → Client | Server → Client |
| プロトコル | HTTP | ws://(別プロトコル) | HTTP | HTTP/2 |
| 自動再接続 | ✅ 標準装備 | ✖️ 自前実装 | ✖️ 自前実装 | ✖️ 自前実装 |
| ブラウザ実装 | EventSource API | WebSocket API | fetch / XHR | Push API(限定的) |
| オーバーヘッド | 低(初回 HTTP headers のみ) | 低 | 高(毎回 HTTP headers) | 低 |
| プロキシ通過 | ✅ 容易 | △ 要設定 | ✅ 容易 | △ HTTP/2 対応必要 |
| データ形式 | テキストのみ | テキスト・バイナリ | 任意 | 任意 |
| 実装難易度 | 低 | 中 | 低〜中 | 中〜高 |
| 最適なユースケース | 通知・フィード・AI 出力 | チャット・ゲーム・同時編集 | 旧環境・シンプルな通知 | リソースプッシュ(CSS/JS) |
SSE が優れる場面#
- サーバー → クライアントへの一方向プッシュで十分
- 既存の HTTP インフラ(nginx, CDN)を活かしたい
- 実装コストを最小限にしたい
- 自動再接続・イベント ID の仕組みが欲しい
- AI のストリーミングレスポンス
用途#
| ユースケース | 説明 |
|---|---|
| AI ストリーミング出力 | LLM のレスポンスを生成しながらリアルタイム表示 |
| リアルタイム通知 | GitHub 通知・Jira コメントなど、サーバー起因イベントのプッシュ配信 |
| ライブダッシュボード | 株価・IoT センサー値のリアルタイム更新 |
| ライブフィード | ニュースフィード・SNS タイムライン・スポーツスコアの更新 |
| 長時間ジョブの進捗 | PDF 生成・動画変換・ML トレーニングなどの進捗表示 |
なぜ AI ストリーミングに SSE が選ばれるのか#
LLM はレスポンスを逐次生成(autoregressive) する。全文生成を待ってから返すと数十秒かかる場合があり、UX が著しく悪化する。SSE を使えば最初のトークンを即座に表示でき、ユーザーは応答が始まったことをすぐに認識できる。
| 観点 | 詳細 |
|---|---|
| 生成の性質 | LLM はトークンを 1 つずつ順番に生成するため、ストリーミングと相性が良い |
| 通信の方向 | ユーザーの入力(プロンプト)は 1 回の POST、応答は SSE で返す一方向で十分 |
| インフラ適合 | 既存の HTTP/HTTPS スタックをそのまま利用でき、CDN エッジでも扱いやすい |
| 実装コスト | WebSocket より構成がシンプルで、CORS も通常の HTTP と同じ方法で対応できる |
全体のアーキテクチャ#
sequenceDiagram
participant U as ユーザー (Browser)
participant API as API Server
participant LLM as LLM Engine
U->>API: POST /chat<br/>{"message": "こんにちは"}
API->>LLM: プロンプト送信(推論開始)
API-->>U: 200 OK<br/>Content-Type: text/event-stream
loop トークンを逐次ストリーミング
LLM-->>API: token: "こん"
API-->>U: data: {"token":"こん","done":false}
LLM-->>API: token: "にちは"
API-->>U: data: {"token":"にちは","done":false}
LLM-->>API: token: "!"
API-->>U: data: {"token":"!","done":false}
end
LLM-->>API: 生成完了
API-->>U: data: {"token":"","done":true}mermaidAnthropic API(Claude)の実際のストリームイベント#
Claude API は SSE で以下のようなイベントを順番に送信する
event: message_start
data: {"type":"message_start","message":{"id":"msg_01...","role":"assistant",...}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"こん"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"にちは"}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":5}}
event: message_stop
data: {"type":"message_stop"}bashGo で Claude API のストリームを中継する実装例#
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
)
// ClaudeStreamHandler はクライアントへ Claude のストリームをそのまま中継する
func ClaudeStreamHandler(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// リクエストボディからユーザーメッセージを取得
var req struct {
Message string `json:"message"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request", http.StatusBadRequest)
return
}
// Anthropic API へストリーミングリクエスト
payload := map[string]any{
"model": "claude-sonnet-4.6",
"max_tokens": 1024,
"stream": true,
"messages": []map[string]string{
{"role": "user", "content": req.Message},
},
}
body, _ := json.Marshal(payload)
apiReq, _ := http.NewRequestWithContext(r.Context(), "POST",
"https://api.anthropic.com/v1/messages", bytes.NewReader(body))
apiReq.Header.Set("x-api-key", "YOUR_API_KEY")
apiReq.Header.Set("anthropic-version", "2023-06-01")
apiReq.Header.Set("content-type", "application/json")
resp, err := http.DefaultClient.Do(apiReq)
if err != nil {
fmt.Fprintf(w, "event: error\ndata: {\"message\":\"%v\"}\n\n", err)
flusher.Flush()
return
}
defer resp.Body.Close()
// SSE をそのままクライアントへ中継
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// content_block_delta からテキストだけ抽出して転送する場合
if strings.HasPrefix(line, "data: ") {
fmt.Fprintf(w, "%s\n\n", line)
flusher.Flush()
}
// クライアント切断を検知して終了
select {
case <-r.Context().Done():
return
default:
}
}
}goストリーミング受信の状態管理#
stateDiagram-v2
[*] --> Idle
Idle --> Connecting : POST /chat 送信
Connecting --> Streaming : 200 OK 受信
Streaming --> Streaming : content_block_delta 受信 トークンを UI に追加
Streaming --> Done : message_stop 受信
Streaming --> Error : ネットワーク切断 / エラー
Error --> Connecting : 自動再接続(Last-Event-ID)
Done --> Idle : 次のメッセージ待ちmermaid実装上の注意点#
| 項目 | 内容 |
|---|---|
| バックプレッシャー | LLM の生成速度 > クライアントの消費速度になることはほぼないが、中継サーバーのチャンネルバッファには余裕を持たせる |
| タイムアウト設定 | 長文生成は数分かかる場合がある。Go の http.Client.Timeout を十分長く設定する |
| エラーイベントの送信 | LLM API 側のエラーはカスタムイベントで通知し、クライアントが適切にハンドリングできるようにする |
| コスト管理 | ストリーミング中にユーザーがキャンセルした場合でも LLM 側の推論は続く。context.WithCancel でリクエストを明示的にキャンセルする |
| 部分的なレスポンスの保存 | 切断時に途中まで生成されたテキストを保存しておくと、再接続後にシームレスに再開できる |
Go での実装#
基本的な SSE サーバー(標準ライブラリのみ)#
package main
import (
"fmt"
"math/rand"
"net/http"
"time"
)
func sseHandler(w http.ResponseWriter, r *http.Request) {
// SSE に必要なヘッダーを設定
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// CORS が必要な場合
w.Header().Set("Access-Control-Allow-Origin", "*")
// http.Flusher を取得(チャンク送信に必須)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
// クライアント切断を検知
ctx := r.Context()
eventID := 1
for {
select {
case <-ctx.Done(): // クライアントが切断した
return
default:
}
// イベントを送信
price := 8000000 + rand.Intn(500000)
fmt.Fprintf(w, "id: %d\n", eventID)
fmt.Fprintf(w, "event: priceUpdate\n")
fmt.Fprintf(w, "data: {\"price\":%d}\n", price)
fmt.Fprintf(w, "\n") // イベント終端の空行
flusher.Flush() // バッファをフラッシュ(重要!)
eventID++
time.Sleep(1 * time.Second)
}
}
func main() {
http.HandleFunc("/events", sseHandler)
fmt.Println("Server running on :8080")
http.ListenAndServe(":8080", nil)
}go実装のポイント
Content-Type: text/event-stream+Cache-Control: no-cacheのヘッダーセットhttp.Flusherによる即時フラッシュ(これがないとバッファリングされて届かない)r.Context().Done()によるクライアント切断の検知
ブロードキャストアーキテクチャ#
graph LR
SRC["イベントソース\n(DB / Queue / Timer)"]
subgraph Server["Go Server"]
B["Broker\n(goroutine)"]
H1["Handler\ngoroutine"]
H2["Handler\ngoroutine"]
H3["Handler\ngoroutine"]
end
C1["Client 1\nEventSource"]
C2["Client 2\nEventSource"]
C3["Client 3\nEventSource"]
SRC -->|broadcast chan| B
B -->|chan string| H1
B -->|chan string| H2
B -->|chan string| H3
H1 -->|SSE stream| C1
H2 -->|SSE stream| C2
H3 -->|SSE stream| C3mermaid複数クライアントへのブロードキャスト#
package main
import (
"fmt"
"net/http"
"sync"
)
// Broker はクライアントへのブロードキャストを管理する
type Broker struct {
clients map[chan string]struct{}
mu sync.RWMutex
subscribe chan chan string
unsub chan chan string
broadcast chan string
}
func NewBroker() *Broker {
b := &Broker{
clients: make(map[chan string]struct{}),
subscribe: make(chan chan string),
unsub: make(chan chan string),
broadcast: make(chan string, 100),
}
go b.run()
return b
}
func (b *Broker) run() {
for {
select {
case client := <-b.subscribe:
b.clients[client] = struct{}{}
case client := <-b.unsub:
delete(b.clients, client)
close(client)
case msg := <-b.broadcast:
for client := range b.clients {
select {
case client <- msg: // ノンブロッキング送信
default: // 遅延クライアントはスキップ
}
}
}
}
}
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
client := make(chan string, 10)
b.subscribe <- client
defer func() { b.unsub <- client }()
for {
select {
case <-r.Context().Done():
return
case msg := <-client:
fmt.Fprintf(w, "data: %s\n\n", msg)
flusher.Flush()
}
}
}go接続数の管理#
// セマフォで同時接続数を制限
type Server struct {
sem chan struct{} // バッファサイズ = 最大同時接続数
}
func NewServer(maxConns int) *Server {
return &Server{sem: make(chan struct{}, maxConns)}
}
func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
select {
case s.sem <- struct{}{}: // 接続枠を取得
defer func() { <-s.sem }() // 終了時に解放
default:
http.Error(w, "Too many connections", http.StatusTooManyRequests)
return
}
// ... SSE 処理
}go認証#
EventSource はリクエストヘッダーを自由にセットできないため、通常の Authorization: Bearer ... ヘッダー認証がそのままでは使えない。
https://developer.mozilla.org/en-US/docs/Web/API/EventSource/EventSource ↗
// ❌ EventSource:ヘッダー設定不可・GET しか使えない
new EventSource('/events', { headers: { 'Authorization': 'Bearer token' } });javascript認証方法の比較#
| 方法 | 向いている場面 | 注意点 |
|---|---|---|
fetch + Bearer トークン | API サーバーが別ドメイン | 自動再接続は自前実装が必要 |
| Cookie 認証 | 同一ドメインの Web アプリ・セッション管理が既にある構成 | Access-Control-Allow-Origin: * は不可。withCredentials: true が必要 |
| ワンタイムトークン方式 | EventSource を使いたいが Bearer トークンも必要な場合 | 事前に POST でトークンを発行し、クエリパラメータに含めて SSE 接続する。サーバー側での管理が必要 |
例: fetch + Bearer トークンの認証フロー#
sequenceDiagram
participant C as Client (SPA)
participant API as API Server
participant LLM as LLM Engine
C->>API: POST /auth { email, password }
API-->>C: 200 OK { token: "eyJ..." }
C->>API: POST /chat<br/>Authorization: Bearer eyJ...<br/>body: { message: "..." }
API->>LLM: プロンプト送信
API-->>C: 200 OK(SSE ストリーム開始)
loop トークン逐次配信
LLM-->>API: token
API-->>C: data: {"token":"..."}
end
API-->>C: data: {"done":true}mermaid再接続について#
AI チャットでは自動再接続は基本的に不要。途中で切断した場合はエラーを表示してユーザーに再送信を促す設計で十分。LLM の生成途中からストリームを再開する仕組みはサーバー側の実装が複雑になるため、費用対効果が低い。
ライブラリ選定(Go)#
| 標準ライブラリ | r3labs/sse/v2 | manucorporat/sse | |
|---|---|---|---|
| 依存 | なし | あり | あり |
| ブロードキャスト | 自前 | ⚪︎ | 自前 |
| トピック管理 | 自前 | ⚪︎ | ✖️ |
| 学習コスト | 低(Go の知識のみ) | 低〜中 | 低 |
| 選ぶ基準 | 依存を増やしたくない・1対1 SSE で十分 | ブロードキャスト・トピック管理が必要 | gin 使用・シンプルな用途 |