Coverage for fastapi / sse.py: 100%
42 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1from typing import Annotated, Any 1abcd
3from annotated_doc import Doc 1abcd
4from pydantic import AfterValidator, BaseModel, Field, model_validator 1abcd
5from starlette.responses import StreamingResponse 1abcd
7# Canonical SSE event schema matching the OpenAPI 3.2 spec
8# (Section 4.14.4 "Special Considerations for Server-Sent Events")
9_SSE_EVENT_SCHEMA: dict[str, Any] = { 1abcd
10 "type": "object",
11 "properties": {
12 "data": {"type": "string"},
13 "event": {"type": "string"},
14 "id": {"type": "string"},
15 "retry": {"type": "integer", "minimum": 0},
16 },
17}
20class EventSourceResponse(StreamingResponse): 1abcd
21 """Streaming response with `text/event-stream` media type.
23 Use as `response_class=EventSourceResponse` on a *path operation* that uses `yield`
24 to enable Server Sent Events (SSE) responses.
26 Works with **any HTTP method** (`GET`, `POST`, etc.), which makes it compatible
27 with protocols like MCP that stream SSE over `POST`.
29 The actual encoding logic lives in the FastAPI routing layer. This class
30 serves mainly as a marker and sets the correct `Content-Type`.
31 """
33 media_type = "text/event-stream" 1abcd
36def _check_id_no_null(v: str | None) -> str | None: 1abcd
37 if v is not None and "\0" in v: 1]efklm^?_ghnop`ijqrs
38 raise ValueError("SSE 'id' must not contain null characters") 1]^_`
39 return v 1efklm?@ghnopijqrs
42class ServerSentEvent(BaseModel): 1abcd
43 """Represents a single Server-Sent Event.
45 When `yield`ed from a *path operation function* that uses
46 `response_class=EventSourceResponse`, each `ServerSentEvent` is encoded
47 into the [SSE wire format](https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream)
48 (`text/event-stream`).
50 If you yield a plain object (dict, Pydantic model, etc.) instead, it is
51 automatically JSON-encoded and sent as the `data:` field.
53 All `data` values **including plain strings** are JSON-serialized.
55 For example, `data="hello"` produces `data: "hello"` on the wire (with
56 quotes).
57 """
59 data: Annotated[ 1abcd
60 Any,
61 Doc(
62 """
63 The event payload.
65 Can be any JSON-serializable value: a Pydantic model, dict, list,
66 string, number, etc. It is **always** serialized to JSON: strings
67 are quoted (`"hello"` becomes `data: "hello"` on the wire).
69 Mutually exclusive with `raw_data`.
70 """
71 ),
72 ] = None
73 raw_data: Annotated[ 1abcd
74 str | None,
75 Doc(
76 """
77 Raw string to send as the `data:` field **without** JSON encoding.
79 Use this when you need to send pre-formatted text, HTML fragments,
80 CSV lines, or any non-JSON payload. The string is placed directly
81 into the `data:` field as-is.
83 Mutually exclusive with `data`.
84 """
85 ),
86 ] = None
87 event: Annotated[ 1abcd
88 str | None,
89 Doc(
90 """
91 Optional event type name.
93 Maps to `addEventListener(event, ...)` on the browser. When omitted,
94 the browser dispatches on the generic `message` event.
95 """
96 ),
97 ] = None
98 id: Annotated[ 1abcd
99 str | None,
100 AfterValidator(_check_id_no_null),
101 Doc(
102 """
103 Optional event ID.
105 The browser sends this value back as the `Last-Event-ID` header on
106 automatic reconnection. **Must not contain null (`\\0`) characters.**
107 """
108 ),
109 ] = None
110 retry: Annotated[ 1abcd
111 int | None,
112 Field(ge=0),
113 Doc(
114 """
115 Optional reconnection time in **milliseconds**.
117 Tells the browser how long to wait before reconnecting after the
118 connection is lost. Must be a non-negative integer.
119 """
120 ),
121 ] = None
122 comment: Annotated[ 1abcd
123 str | None,
124 Doc(
125 """
126 Optional comment line(s).
128 Comment lines start with `:` in the SSE wire format and are ignored by
129 `EventSource` clients. Useful for keep-alive pings to prevent
130 proxy/load-balancer timeouts.
131 """
132 ),
133 ] = None
135 @model_validator(mode="after") 1abcd
136 def _check_data_exclusive(self) -> "ServerSentEvent": 1abcd
137 if self.data is not None and self.raw_data is not None: 1{tueCfDklmv[?|wxgEhFnopy}zAiGjHqrsB
138 raise ValueError( 1{~|}
139 "Cannot set both 'data' and 'raw_data' on the same "
140 "ServerSentEvent. Use 'data' for JSON-serialized payloads "
141 "or 'raw_data' for pre-formatted strings."
142 )
143 return self 1tueCfDklmv[?wxgEhFnopyzAiGjHqrsB
146def format_sse_event( 1abcd
147 *,
148 data_str: Annotated[
149 str | None,
150 Doc(
151 """
152 Pre-serialized data string to use as the `data:` field.
153 """
154 ),
155 ] = None,
156 event: Annotated[
157 str | None,
158 Doc(
159 """
160 Optional event type name (`event:` field).
161 """
162 ),
163 ] = None,
164 id: Annotated[
165 str | None,
166 Doc(
167 """
168 Optional event ID (`id:` field).
169 """
170 ),
171 ] = None,
172 retry: Annotated[
173 int | None,
174 Doc(
175 """
176 Optional reconnection time in milliseconds (`retry:` field).
177 """
178 ),
179 ] = None,
180 comment: Annotated[
181 str | None,
182 Doc(
183 """
184 Optional comment line(s) (`:` prefix).
185 """
186 ),
187 ] = None,
188) -> bytes:
189 """Build SSE wire-format bytes from **pre-serialized** data.
191 The result always ends with `\n\n` (the event terminator).
192 """
193 lines: list[str] = [] 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
195 if comment is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
196 for line in comment.splitlines(): 1ef?@ghij
197 lines.append(f": {line}") 1ef?@ghij
199 if event is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
200 lines.append(f"event: {event}") 1tuefv[?wxghyzAijB
202 if data_str is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
203 for line in data_str.splitlines(): 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
204 lines.append(f"data: {line}") 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
206 if id is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
207 lines.append(f"id: {id}") 1efklm?@ghnopijqrs
209 if retry is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
210 lines.append(f"retry: {retry}") 1ef?@ghij
212 lines.append("") 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
213 lines.append("") 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
214 return "\n".join(lines).encode("utf-8") 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB
217# Keep-alive comment, per the SSE spec recommendation
218KEEPALIVE_COMMENT = b": ping\n\n" 1abcd
220# Seconds between keep-alive pings when a generator is idle.
221# Private but importable so tests can monkeypatch it.
222_PING_INTERVAL: float = 15.0 1abcd