Coverage for tests / test_sse.py: 100%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1import asyncio 1adbc

2import time 1adbc

3from collections.abc import AsyncIterable, Iterable 1adbc

4 

5import fastapi.routing 1adbc

6import pytest 1adbc

7from fastapi import APIRouter, FastAPI 1adbc

8from fastapi.responses import EventSourceResponse 1adbc

9from fastapi.sse import ServerSentEvent 1adbc

10from fastapi.testclient import TestClient 1adbc

11from pydantic import BaseModel 1adbc

12 

13 

14class Item(BaseModel): 1adbc

15 name: str 1abc

16 description: str | None = None 1adbc

17 

18 

19items = [ 1adbc

20 Item(name="Plumbus", description="A multi-purpose household device."), 

21 Item(name="Portal Gun", description="A portal opening device."), 

22 Item(name="Meeseeks Box", description="A box that summons a Meeseeks."), 

23] 

24 

25 

26app = FastAPI() 1adbc

27 

28 

29@app.get("/items/stream", response_class=EventSourceResponse) 1adbc

30async def sse_items() -> AsyncIterable[Item]: 1adbc

31 for item in items: 1i0j1k2p3

32 yield item 1i0j1k2p3

33 

34 

35@app.get("/items/stream-sync", response_class=EventSourceResponse) 1adbc

36def sse_items_sync() -> Iterable[Item]: 1adbc

37 yield from items 1TUVW

38 

39 

40@app.get("/items/stream-no-annotation", response_class=EventSourceResponse) 1adbc

41async def sse_items_no_annotation(): 1adbc

42 for item in items: 1DCEF

43 yield item 1DCEF

44 

45 

46@app.get("/items/stream-sync-no-annotation", response_class=EventSourceResponse) 1adbc

47def sse_items_sync_no_annotation(): 1adbc

48 yield from items 1XGYZ

49 

50 

51@app.get("/items/stream-dict", response_class=EventSourceResponse) 1adbc

52async def sse_items_dict(): 1adbc

53 for item in items: 1HIJK

54 yield {"name": item.name, "description": item.description} 1HIJK

55 

56 

57@app.get("/items/stream-sse-event", response_class=EventSourceResponse) 1adbc

58async def sse_items_event(): 1adbc

59 yield ServerSentEvent(data="hello", event="greeting", id="1") 1efgh

60 yield ServerSentEvent(data={"key": "value"}, event="json-data", id="2") 1efgh

61 yield ServerSentEvent(comment="just a comment") 1efgh

62 yield ServerSentEvent(data="retry-test", retry=5000) 1efgh

63 

64 

65@app.get("/items/stream-mixed", response_class=EventSourceResponse) 1adbc

66async def sse_items_mixed() -> AsyncIterable[Item]: 1adbc

67 yield items[0] 1yzAB

68 yield ServerSentEvent(data="custom-event", event="special") 1yzAB

69 yield items[1] 1yzAB

70 

71 

72@app.get("/items/stream-string", response_class=EventSourceResponse) 1adbc

73async def sse_items_string(): 1adbc

74 yield ServerSentEvent(data="plain text data") 14567

75 

76 

77@app.post("/items/stream-post", response_class=EventSourceResponse) 1adbc

78async def sse_items_post() -> AsyncIterable[Item]: 1adbc

79 for item in items: 1LMNO

80 yield item 1LMNO

81 

82 

83@app.get("/items/stream-raw", response_class=EventSourceResponse) 1adbc

84async def sse_items_raw(): 1adbc

85 yield ServerSentEvent(raw_data="plain text without quotes") 1lmno

86 yield ServerSentEvent(raw_data="<div>html fragment</div>", event="html") 1lmno

87 yield ServerSentEvent(raw_data="cpu,87.3,1709145600", event="csv") 1lmno

88 

89 

90router = APIRouter() 1adbc

91 

92 

93@router.get("/events", response_class=EventSourceResponse) 1adbc

94async def stream_events(): 1adbc

95 yield {"msg": "hello"} 1PQRS

96 yield {"msg": "world"} 1PQRS

97 

98 

99app.include_router(router, prefix="/api") 1adbc

100 

101 

102@pytest.fixture(name="client") 1adbc

103def client_fixture(): 1adbc

104 with TestClient(app) as c: 28 zb9 Ab! Bb# Cb$ Db% Eb' Fb( Gb) Hb* Ib+ Jb, KbC - G . / Lb: Mb; Nb= Ob? Pb@ Qb[ Rb] Sb^ Tb_ Ub` Vb{ Wb| Xb} Yb~ Zbab0bbb1bcb2bdb3beb4bfb5bgb6bhb7bib8b

105 yield c 28 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbib

106 

107 

108def test_async_generator_with_model(client: TestClient): 1adbc

109 response = client.get("/items/stream") 1ijkp

110 assert response.status_code == 200 1ijkp

111 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1ijkp

112 assert response.headers["cache-control"] == "no-cache" 1ijkp

113 assert response.headers["x-accel-buffering"] == "no" 1ijkp

114 

115 lines = response.text.strip().split("\n") 1ijkp

116 data_lines = [line for line in lines if line.startswith("data: ")] 1ijkp

117 assert len(data_lines) == 3 1ijkp

118 assert '"name":"Plumbus"' in data_lines[0] or '"name": "Plumbus"' in data_lines[0] 1ijkp

119 assert ( 1ijk

120 '"name":"Portal Gun"' in data_lines[1] 

121 or '"name": "Portal Gun"' in data_lines[1] 

122 ) 

123 assert ( 1ijk

124 '"name":"Meeseeks Box"' in data_lines[2] 

125 or '"name": "Meeseeks Box"' in data_lines[2] 

126 ) 

127 

128 

129def test_sync_generator_with_model(client: TestClient): 1adbc

130 response = client.get("/items/stream-sync") 1TUVW

131 assert response.status_code == 200 1TUVW

132 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1TUVW

133 

134 data_lines = [ 1TUVW

135 line for line in response.text.strip().split("\n") if line.startswith("data: ") 

136 ] 

137 assert len(data_lines) == 3 1TUVW

138 

139 

140def test_async_generator_no_annotation(client: TestClient): 1adbc

141 response = client.get("/items/stream-no-annotation") 1DCEF

142 assert response.status_code == 200 1DCEF

143 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1DCEF

144 

145 data_lines = [ 1DCEF

146 line for line in response.text.strip().split("\n") if line.startswith("data: ") 

147 ] 

148 assert len(data_lines) == 3 1DCEF

149 

150 

151def test_sync_generator_no_annotation(client: TestClient): 1adbc

152 response = client.get("/items/stream-sync-no-annotation") 1XGYZ

153 assert response.status_code == 200 1XGYZ

154 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1XGYZ

155 

156 data_lines = [ 1XGYZ

157 line for line in response.text.strip().split("\n") if line.startswith("data: ") 

158 ] 

159 assert len(data_lines) == 3 1XGYZ

160 

161 

162def test_dict_items(client: TestClient): 1adbc

163 response = client.get("/items/stream-dict") 1HIJK

164 assert response.status_code == 200 1HIJK

165 data_lines = [ 1HIJK

166 line for line in response.text.strip().split("\n") if line.startswith("data: ") 

167 ] 

168 assert len(data_lines) == 3 1HIJK

169 assert '"name"' in data_lines[0] 1HIJK

170 

171 

172def test_post_method_sse(client: TestClient): 1adbc

173 """SSE should work with POST (needed for MCP compatibility).""" 

174 response = client.post("/items/stream-post") 1LMNO

175 assert response.status_code == 200 1LMNO

176 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1LMNO

177 data_lines = [ 1LMNO

178 line for line in response.text.strip().split("\n") if line.startswith("data: ") 

179 ] 

180 assert len(data_lines) == 3 1LMNO

181 

182 

183def test_sse_events_with_fields(client: TestClient): 1adbc

184 response = client.get("/items/stream-sse-event") 1efgh

185 assert response.status_code == 200 1efgh

186 text = response.text 1efgh

187 

188 assert "event: greeting\n" in text 1efgh

189 assert 'data: "hello"\n' in text 1efgh

190 assert "id: 1\n" in text 1efgh

191 

192 assert "event: json-data\n" in text 1efgh

193 assert "id: 2\n" in text 1efgh

194 assert 'data: {"key": "value"}\n' in text 1efgh

195 

196 assert ": just a comment\n" in text 1efgh

197 

198 assert "retry: 5000\n" in text 1efgh

199 assert 'data: "retry-test"\n' in text 1efgh

200 

201 

202def test_mixed_plain_and_sse_events(client: TestClient): 1adbc

203 response = client.get("/items/stream-mixed") 1yzAB

204 assert response.status_code == 200 1yzAB

205 text = response.text 1yzAB

206 

207 assert "event: special\n" in text 1yzAB

208 assert 'data: "custom-event"\n' in text 1yzAB

209 assert '"name"' in text 1yzAB

210 

211 

212def test_string_data_json_encoded(client: TestClient): 1adbc

213 """Strings are always JSON-encoded (quoted).""" 

214 response = client.get("/items/stream-string") 14567

215 assert response.status_code == 200 14567

216 assert 'data: "plain text data"\n' in response.text 14567

217 

218 

219def test_server_sent_event_null_id_rejected(): 1adbc

220 with pytest.raises(ValueError, match="null"): 2jbkblbmb

221 ServerSentEvent(data="test", id="has\0null") 2jbkblbmb

222 

223 

224def test_server_sent_event_negative_retry_rejected(): 1adbc

225 with pytest.raises(ValueError): 2nbobpbqb

226 ServerSentEvent(data="test", retry=-1) 2nbobpbqb

227 

228 

229def test_server_sent_event_float_retry_rejected(): 1adbc

230 with pytest.raises(ValueError): 2rbsbtbub

231 ServerSentEvent(data="test", retry=1.5) # type: ignore[arg-type] 2rbsbtbub

232 

233 

234def test_raw_data_sent_without_json_encoding(client: TestClient): 1adbc

235 """raw_data is sent as-is, not JSON-encoded.""" 

236 response = client.get("/items/stream-raw") 1lmno

237 assert response.status_code == 200 1lmno

238 text = response.text 1lmno

239 

240 # raw_data should appear without JSON quotes 

241 assert "data: plain text without quotes\n" in text 1lmno

242 # Not JSON-quoted 

243 assert 'data: "plain text without quotes"' not in text 1lmno

244 

245 assert "event: html\n" in text 1lmno

246 assert "data: <div>html fragment</div>\n" in text 1lmno

247 

248 assert "event: csv\n" in text 1lmno

249 assert "data: cpu,87.3,1709145600\n" in text 1lmno

250 

251 

252def test_data_and_raw_data_mutually_exclusive(): 1adbc

253 """Cannot set both data and raw_data.""" 

254 with pytest.raises(ValueError, match="Cannot set both"): 2vbwbxbyb

255 ServerSentEvent(data="json", raw_data="raw") 2vbwbxbyb

256 

257 

258def test_sse_on_router_included_in_app(client: TestClient): 1adbc

259 response = client.get("/api/events") 1PQRS

260 assert response.status_code == 200 1PQRS

261 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1PQRS

262 data_lines = [ 1PQRS

263 line for line in response.text.strip().split("\n") if line.startswith("data: ") 

264 ] 

265 assert len(data_lines) == 2 1PQRS

266 

267 

268# Keepalive ping tests 

269 

270 

271keepalive_app = FastAPI() 1adbc

272 

273 

274@keepalive_app.get("/slow-async", response_class=EventSourceResponse) 1adbc

275async def slow_async_stream(): 1adbc

276 yield {"n": 1} 1qrst

277 # Sleep longer than the (monkeypatched) ping interval so a keepalive 

278 # comment is emitted before the next item. 

279 await asyncio.sleep(0.3) 1qrst

280 yield {"n": 2} 1qrst

281 

282 

283@keepalive_app.get("/slow-sync", response_class=EventSourceResponse) 1adbc

284def slow_sync_stream(): 1adbc

285 yield {"n": 1} 1uvwx

286 time.sleep(0.3) 1uvwx

287 yield {"n": 2} 1uvwx

288 

289 

290def test_keepalive_ping_async(monkeypatch: pytest.MonkeyPatch): 1adbc

291 monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05) 1qrst

292 with TestClient(keepalive_app) as c: 1qrst

293 response = c.get("/slow-async") 1qrst

294 assert response.status_code == 200 1qrst

295 text = response.text 1qrst

296 # The keepalive comment ": ping" should appear between the two data events 

297 assert ": ping\n" in text 1qrst

298 data_lines = [line for line in text.split("\n") if line.startswith("data: ")] 1qrst

299 assert len(data_lines) == 2 1qrst

300 

301 

302def test_keepalive_ping_sync(monkeypatch: pytest.MonkeyPatch): 1adbc

303 monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05) 1uvwx

304 with TestClient(keepalive_app) as c: 1uvwx

305 response = c.get("/slow-sync") 1uvwx

306 assert response.status_code == 200 1uvwx

307 text = response.text 1uvwx

308 assert ": ping\n" in text 1uvwx

309 data_lines = [line for line in text.split("\n") if line.startswith("data: ")] 1uvwx

310 assert len(data_lines) == 2 1uvwx

311 

312 

313def test_no_keepalive_when_fast(client: TestClient): 1adbc

314 """No keepalive comment when items arrive quickly.""" 

315 response = client.get("/items/stream") 10123

316 assert response.status_code == 200 10123

317 # KEEPALIVE_COMMENT is ": ping\n\n". 

318 assert ": ping\n" not in response.text 10123