Coverage for tests / test_sse.py: 100%
193 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1import asyncio 1adbc
2import time 1adbc
3from collections.abc import AsyncIterable, Iterable 1adbc
5import fastapi.routing 1adbc
6import pytest 1adbc
7from fastapi import APIRouter, FastAPI 1adbc
8from fastapi.responses import EventSourceResponse 1adbc
9from fastapi.sse import ServerSentEvent 1adbc
10from fastapi.testclient import TestClient 1adbc
11from pydantic import BaseModel 1adbc
14class Item(BaseModel): 1adbc
15 name: str 1abc
16 description: str | None = None 1adbc
19items = [ 1adbc
20 Item(name="Plumbus", description="A multi-purpose household device."),
21 Item(name="Portal Gun", description="A portal opening device."),
22 Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
23]
26app = FastAPI() 1adbc
29@app.get("/items/stream", response_class=EventSourceResponse) 1adbc
30async def sse_items() -> AsyncIterable[Item]: 1adbc
31 for item in items: 1i0j1k2p3
32 yield item 1i0j1k2p3
35@app.get("/items/stream-sync", response_class=EventSourceResponse) 1adbc
36def sse_items_sync() -> Iterable[Item]: 1adbc
37 yield from items 1TUVW
40@app.get("/items/stream-no-annotation", response_class=EventSourceResponse) 1adbc
41async def sse_items_no_annotation(): 1adbc
42 for item in items: 1DCEF
43 yield item 1DCEF
46@app.get("/items/stream-sync-no-annotation", response_class=EventSourceResponse) 1adbc
47def sse_items_sync_no_annotation(): 1adbc
48 yield from items 1XGYZ
51@app.get("/items/stream-dict", response_class=EventSourceResponse) 1adbc
52async def sse_items_dict(): 1adbc
53 for item in items: 1HIJK
54 yield {"name": item.name, "description": item.description} 1HIJK
57@app.get("/items/stream-sse-event", response_class=EventSourceResponse) 1adbc
58async def sse_items_event(): 1adbc
59 yield ServerSentEvent(data="hello", event="greeting", id="1") 1efgh
60 yield ServerSentEvent(data={"key": "value"}, event="json-data", id="2") 1efgh
61 yield ServerSentEvent(comment="just a comment") 1efgh
62 yield ServerSentEvent(data="retry-test", retry=5000) 1efgh
65@app.get("/items/stream-mixed", response_class=EventSourceResponse) 1adbc
66async def sse_items_mixed() -> AsyncIterable[Item]: 1adbc
67 yield items[0] 1yzAB
68 yield ServerSentEvent(data="custom-event", event="special") 1yzAB
69 yield items[1] 1yzAB
72@app.get("/items/stream-string", response_class=EventSourceResponse) 1adbc
73async def sse_items_string(): 1adbc
74 yield ServerSentEvent(data="plain text data") 14567
77@app.post("/items/stream-post", response_class=EventSourceResponse) 1adbc
78async def sse_items_post() -> AsyncIterable[Item]: 1adbc
79 for item in items: 1LMNO
80 yield item 1LMNO
83@app.get("/items/stream-raw", response_class=EventSourceResponse) 1adbc
84async def sse_items_raw(): 1adbc
85 yield ServerSentEvent(raw_data="plain text without quotes") 1lmno
86 yield ServerSentEvent(raw_data="<div>html fragment</div>", event="html") 1lmno
87 yield ServerSentEvent(raw_data="cpu,87.3,1709145600", event="csv") 1lmno
90router = APIRouter() 1adbc
93@router.get("/events", response_class=EventSourceResponse) 1adbc
94async def stream_events(): 1adbc
95 yield {"msg": "hello"} 1PQRS
96 yield {"msg": "world"} 1PQRS
99app.include_router(router, prefix="/api") 1adbc
102@pytest.fixture(name="client") 1adbc
103def client_fixture(): 1adbc
104 with TestClient(app) as c: 28 zb9 Ab! Bb# Cb$ Db% Eb' Fb( Gb) Hb* Ib+ Jb, KbC - G . / Lb: Mb; Nb= Ob? Pb@ Qb[ Rb] Sb^ Tb_ Ub` Vb{ Wb| Xb} Yb~ Zbab0bbb1bcb2bdb3beb4bfb5bgb6bhb7bib8b
105 yield c 28 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbib
108def test_async_generator_with_model(client: TestClient): 1adbc
109 response = client.get("/items/stream") 1ijkp
110 assert response.status_code == 200 1ijkp
111 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1ijkp
112 assert response.headers["cache-control"] == "no-cache" 1ijkp
113 assert response.headers["x-accel-buffering"] == "no" 1ijkp
115 lines = response.text.strip().split("\n") 1ijkp
116 data_lines = [line for line in lines if line.startswith("data: ")] 1ijkp
117 assert len(data_lines) == 3 1ijkp
118 assert '"name":"Plumbus"' in data_lines[0] or '"name": "Plumbus"' in data_lines[0] 1ijkp
119 assert ( 1ijk
120 '"name":"Portal Gun"' in data_lines[1]
121 or '"name": "Portal Gun"' in data_lines[1]
122 )
123 assert ( 1ijk
124 '"name":"Meeseeks Box"' in data_lines[2]
125 or '"name": "Meeseeks Box"' in data_lines[2]
126 )
129def test_sync_generator_with_model(client: TestClient): 1adbc
130 response = client.get("/items/stream-sync") 1TUVW
131 assert response.status_code == 200 1TUVW
132 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1TUVW
134 data_lines = [ 1TUVW
135 line for line in response.text.strip().split("\n") if line.startswith("data: ")
136 ]
137 assert len(data_lines) == 3 1TUVW
140def test_async_generator_no_annotation(client: TestClient): 1adbc
141 response = client.get("/items/stream-no-annotation") 1DCEF
142 assert response.status_code == 200 1DCEF
143 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1DCEF
145 data_lines = [ 1DCEF
146 line for line in response.text.strip().split("\n") if line.startswith("data: ")
147 ]
148 assert len(data_lines) == 3 1DCEF
151def test_sync_generator_no_annotation(client: TestClient): 1adbc
152 response = client.get("/items/stream-sync-no-annotation") 1XGYZ
153 assert response.status_code == 200 1XGYZ
154 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1XGYZ
156 data_lines = [ 1XGYZ
157 line for line in response.text.strip().split("\n") if line.startswith("data: ")
158 ]
159 assert len(data_lines) == 3 1XGYZ
162def test_dict_items(client: TestClient): 1adbc
163 response = client.get("/items/stream-dict") 1HIJK
164 assert response.status_code == 200 1HIJK
165 data_lines = [ 1HIJK
166 line for line in response.text.strip().split("\n") if line.startswith("data: ")
167 ]
168 assert len(data_lines) == 3 1HIJK
169 assert '"name"' in data_lines[0] 1HIJK
172def test_post_method_sse(client: TestClient): 1adbc
173 """SSE should work with POST (needed for MCP compatibility)."""
174 response = client.post("/items/stream-post") 1LMNO
175 assert response.status_code == 200 1LMNO
176 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1LMNO
177 data_lines = [ 1LMNO
178 line for line in response.text.strip().split("\n") if line.startswith("data: ")
179 ]
180 assert len(data_lines) == 3 1LMNO
183def test_sse_events_with_fields(client: TestClient): 1adbc
184 response = client.get("/items/stream-sse-event") 1efgh
185 assert response.status_code == 200 1efgh
186 text = response.text 1efgh
188 assert "event: greeting\n" in text 1efgh
189 assert 'data: "hello"\n' in text 1efgh
190 assert "id: 1\n" in text 1efgh
192 assert "event: json-data\n" in text 1efgh
193 assert "id: 2\n" in text 1efgh
194 assert 'data: {"key": "value"}\n' in text 1efgh
196 assert ": just a comment\n" in text 1efgh
198 assert "retry: 5000\n" in text 1efgh
199 assert 'data: "retry-test"\n' in text 1efgh
202def test_mixed_plain_and_sse_events(client: TestClient): 1adbc
203 response = client.get("/items/stream-mixed") 1yzAB
204 assert response.status_code == 200 1yzAB
205 text = response.text 1yzAB
207 assert "event: special\n" in text 1yzAB
208 assert 'data: "custom-event"\n' in text 1yzAB
209 assert '"name"' in text 1yzAB
212def test_string_data_json_encoded(client: TestClient): 1adbc
213 """Strings are always JSON-encoded (quoted)."""
214 response = client.get("/items/stream-string") 14567
215 assert response.status_code == 200 14567
216 assert 'data: "plain text data"\n' in response.text 14567
219def test_server_sent_event_null_id_rejected(): 1adbc
220 with pytest.raises(ValueError, match="null"): 2jbkblbmb
221 ServerSentEvent(data="test", id="has\0null") 2jbkblbmb
224def test_server_sent_event_negative_retry_rejected(): 1adbc
225 with pytest.raises(ValueError): 2nbobpbqb
226 ServerSentEvent(data="test", retry=-1) 2nbobpbqb
229def test_server_sent_event_float_retry_rejected(): 1adbc
230 with pytest.raises(ValueError): 2rbsbtbub
231 ServerSentEvent(data="test", retry=1.5) # type: ignore[arg-type] 2rbsbtbub
234def test_raw_data_sent_without_json_encoding(client: TestClient): 1adbc
235 """raw_data is sent as-is, not JSON-encoded."""
236 response = client.get("/items/stream-raw") 1lmno
237 assert response.status_code == 200 1lmno
238 text = response.text 1lmno
240 # raw_data should appear without JSON quotes
241 assert "data: plain text without quotes\n" in text 1lmno
242 # Not JSON-quoted
243 assert 'data: "plain text without quotes"' not in text 1lmno
245 assert "event: html\n" in text 1lmno
246 assert "data: <div>html fragment</div>\n" in text 1lmno
248 assert "event: csv\n" in text 1lmno
249 assert "data: cpu,87.3,1709145600\n" in text 1lmno
252def test_data_and_raw_data_mutually_exclusive(): 1adbc
253 """Cannot set both data and raw_data."""
254 with pytest.raises(ValueError, match="Cannot set both"): 2vbwbxbyb
255 ServerSentEvent(data="json", raw_data="raw") 2vbwbxbyb
258def test_sse_on_router_included_in_app(client: TestClient): 1adbc
259 response = client.get("/api/events") 1PQRS
260 assert response.status_code == 200 1PQRS
261 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1PQRS
262 data_lines = [ 1PQRS
263 line for line in response.text.strip().split("\n") if line.startswith("data: ")
264 ]
265 assert len(data_lines) == 2 1PQRS
268# Keepalive ping tests
271keepalive_app = FastAPI() 1adbc
274@keepalive_app.get("/slow-async", response_class=EventSourceResponse) 1adbc
275async def slow_async_stream(): 1adbc
276 yield {"n": 1} 1qrst
277 # Sleep longer than the (monkeypatched) ping interval so a keepalive
278 # comment is emitted before the next item.
279 await asyncio.sleep(0.3) 1qrst
280 yield {"n": 2} 1qrst
283@keepalive_app.get("/slow-sync", response_class=EventSourceResponse) 1adbc
284def slow_sync_stream(): 1adbc
285 yield {"n": 1} 1uvwx
286 time.sleep(0.3) 1uvwx
287 yield {"n": 2} 1uvwx
290def test_keepalive_ping_async(monkeypatch: pytest.MonkeyPatch): 1adbc
291 monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05) 1qrst
292 with TestClient(keepalive_app) as c: 1qrst
293 response = c.get("/slow-async") 1qrst
294 assert response.status_code == 200 1qrst
295 text = response.text 1qrst
296 # The keepalive comment ": ping" should appear between the two data events
297 assert ": ping\n" in text 1qrst
298 data_lines = [line for line in text.split("\n") if line.startswith("data: ")] 1qrst
299 assert len(data_lines) == 2 1qrst
302def test_keepalive_ping_sync(monkeypatch: pytest.MonkeyPatch): 1adbc
303 monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05) 1uvwx
304 with TestClient(keepalive_app) as c: 1uvwx
305 response = c.get("/slow-sync") 1uvwx
306 assert response.status_code == 200 1uvwx
307 text = response.text 1uvwx
308 assert ": ping\n" in text 1uvwx
309 data_lines = [line for line in text.split("\n") if line.startswith("data: ")] 1uvwx
310 assert len(data_lines) == 2 1uvwx
313def test_no_keepalive_when_fast(client: TestClient): 1adbc
314 """No keepalive comment when items arrive quickly."""
315 response = client.get("/items/stream") 10123
316 assert response.status_code == 200 10123
317 # KEEPALIVE_COMMENT is ": ping\n\n".
318 assert ": ping\n" not in response.text 10123