Coverage for fastapi / sse.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from typing import Annotated, Any 1abcd

2 

3from annotated_doc import Doc 1abcd

4from pydantic import AfterValidator, BaseModel, Field, model_validator 1abcd

5from starlette.responses import StreamingResponse 1abcd

6 

7# Canonical SSE event schema matching the OpenAPI 3.2 spec 

8# (Section 4.14.4 "Special Considerations for Server-Sent Events") 

9_SSE_EVENT_SCHEMA: dict[str, Any] = { 1abcd

10 "type": "object", 

11 "properties": { 

12 "data": {"type": "string"}, 

13 "event": {"type": "string"}, 

14 "id": {"type": "string"}, 

15 "retry": {"type": "integer", "minimum": 0}, 

16 }, 

17} 

18 

19 

20class EventSourceResponse(StreamingResponse): 1abcd

21 """Streaming response with `text/event-stream` media type. 

22 

23 Use as `response_class=EventSourceResponse` on a *path operation* that uses `yield` 

24 to enable Server Sent Events (SSE) responses. 

25 

26 Works with **any HTTP method** (`GET`, `POST`, etc.), which makes it compatible 

27 with protocols like MCP that stream SSE over `POST`. 

28 

29 The actual encoding logic lives in the FastAPI routing layer. This class 

30 serves mainly as a marker and sets the correct `Content-Type`. 

31 """ 

32 

33 media_type = "text/event-stream" 1abcd

34 

35 

36def _check_id_no_null(v: str | None) -> str | None: 1abcd

37 if v is not None and "\0" in v: 1]efklm^?_ghnop`ijqrs

38 raise ValueError("SSE 'id' must not contain null characters") 1]^_`

39 return v 1efklm?@ghnopijqrs

40 

41 

42class ServerSentEvent(BaseModel): 1abcd

43 """Represents a single Server-Sent Event. 

44 

45 When `yield`ed from a *path operation function* that uses 

46 `response_class=EventSourceResponse`, each `ServerSentEvent` is encoded 

47 into the [SSE wire format](https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream) 

48 (`text/event-stream`). 

49 

50 If you yield a plain object (dict, Pydantic model, etc.) instead, it is 

51 automatically JSON-encoded and sent as the `data:` field. 

52 

53 All `data` values **including plain strings** are JSON-serialized. 

54 

55 For example, `data="hello"` produces `data: "hello"` on the wire (with 

56 quotes). 

57 """ 

58 

59 data: Annotated[ 1abcd

60 Any, 

61 Doc( 

62 """ 

63 The event payload. 

64 

65 Can be any JSON-serializable value: a Pydantic model, dict, list, 

66 string, number, etc. It is **always** serialized to JSON: strings 

67 are quoted (`"hello"` becomes `data: "hello"` on the wire). 

68 

69 Mutually exclusive with `raw_data`. 

70 """ 

71 ), 

72 ] = None 

73 raw_data: Annotated[ 1abcd

74 str | None, 

75 Doc( 

76 """ 

77 Raw string to send as the `data:` field **without** JSON encoding. 

78 

79 Use this when you need to send pre-formatted text, HTML fragments, 

80 CSV lines, or any non-JSON payload. The string is placed directly 

81 into the `data:` field as-is. 

82 

83 Mutually exclusive with `data`. 

84 """ 

85 ), 

86 ] = None 

87 event: Annotated[ 1abcd

88 str | None, 

89 Doc( 

90 """ 

91 Optional event type name. 

92 

93 Maps to `addEventListener(event, ...)` on the browser. When omitted, 

94 the browser dispatches on the generic `message` event. 

95 """ 

96 ), 

97 ] = None 

98 id: Annotated[ 1abcd

99 str | None, 

100 AfterValidator(_check_id_no_null), 

101 Doc( 

102 """ 

103 Optional event ID. 

104 

105 The browser sends this value back as the `Last-Event-ID` header on 

106 automatic reconnection. **Must not contain null (`\\0`) characters.** 

107 """ 

108 ), 

109 ] = None 

110 retry: Annotated[ 1abcd

111 int | None, 

112 Field(ge=0), 

113 Doc( 

114 """ 

115 Optional reconnection time in **milliseconds**. 

116 

117 Tells the browser how long to wait before reconnecting after the 

118 connection is lost. Must be a non-negative integer. 

119 """ 

120 ), 

121 ] = None 

122 comment: Annotated[ 1abcd

123 str | None, 

124 Doc( 

125 """ 

126 Optional comment line(s). 

127 

128 Comment lines start with `:` in the SSE wire format and are ignored by 

129 `EventSource` clients. Useful for keep-alive pings to prevent 

130 proxy/load-balancer timeouts. 

131 """ 

132 ), 

133 ] = None 

134 

135 @model_validator(mode="after") 1abcd

136 def _check_data_exclusive(self) -> "ServerSentEvent": 1abcd

137 if self.data is not None and self.raw_data is not None: 1{tueCfDklmv[?|wxgEhFnopy}zAiGjHqrsB

138 raise ValueError( 1{~|}

139 "Cannot set both 'data' and 'raw_data' on the same " 

140 "ServerSentEvent. Use 'data' for JSON-serialized payloads " 

141 "or 'raw_data' for pre-formatted strings." 

142 ) 

143 return self 1tueCfDklmv[?wxgEhFnopyzAiGjHqrsB

144 

145 

146def format_sse_event( 1abcd

147 *, 

148 data_str: Annotated[ 

149 str | None, 

150 Doc( 

151 """ 

152 Pre-serialized data string to use as the `data:` field. 

153 """ 

154 ), 

155 ] = None, 

156 event: Annotated[ 

157 str | None, 

158 Doc( 

159 """ 

160 Optional event type name (`event:` field). 

161 """ 

162 ), 

163 ] = None, 

164 id: Annotated[ 

165 str | None, 

166 Doc( 

167 """ 

168 Optional event ID (`id:` field). 

169 """ 

170 ), 

171 ] = None, 

172 retry: Annotated[ 

173 int | None, 

174 Doc( 

175 """ 

176 Optional reconnection time in milliseconds (`retry:` field). 

177 """ 

178 ), 

179 ] = None, 

180 comment: Annotated[ 

181 str | None, 

182 Doc( 

183 """ 

184 Optional comment line(s) (`:` prefix). 

185 """ 

186 ), 

187 ] = None, 

188) -> bytes: 

189 """Build SSE wire-format bytes from **pre-serialized** data. 

190 

191 The result always ends with `\n\n` (the event terminator). 

192 """ 

193 lines: list[str] = [] 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

194 

195 if comment is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

196 for line in comment.splitlines(): 1ef?@ghij

197 lines.append(f": {line}") 1ef?@ghij

198 

199 if event is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

200 lines.append(f"event: {event}") 1tuefv[?wxghyzAijB

201 

202 if data_str is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

203 for line in data_str.splitlines(): 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

204 lines.append(f"data: {line}") 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

205 

206 if id is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

207 lines.append(f"id: {id}") 1efklm?@ghnopijqrs

208 

209 if retry is not None: 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

210 lines.append(f"retry: {retry}") 1ef?@ghij

211 

212 lines.append("") 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

213 lines.append("") 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

214 return "\n".join(lines).encode("utf-8") 1IJKLMtNOuePCQRSTUVfDklmvWXYZ012w34xg5E6789!#hFnopy$%'()z*+Ai,G-./:;=jHqrsB

215 

216 

217# Keep-alive comment, per the SSE spec recommendation 

218KEEPALIVE_COMMENT = b": ping\n\n" 1abcd

219 

220# Seconds between keep-alive pings when a generator is idle. 

221# Private but importable so tests can monkeypatch it. 

222_PING_INTERVAL: float = 15.0 1abcd