Coverage for tests / test_stream_cancellation.py: 100%
36 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1"""
2Test that async streaming endpoints can be cancelled without hanging.
4Ref: https://github.com/fastapi/fastapi/issues/14680
5"""
7from collections.abc import AsyncIterable 1opqr
9import anyio 1opqr
10import pytest 1opqr
11from fastapi import FastAPI 1opqr
12from fastapi.responses import StreamingResponse 1opqr
14pytestmark = [ 1opqr
15 pytest.mark.anyio,
16 pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning"),
17]
20app = FastAPI() 1opqr
23@app.get("/stream-raw", response_class=StreamingResponse) 1opqr
24async def stream_raw() -> AsyncIterable[str]: 1opqr
25 """Async generator with no internal await - would hang without checkpoint."""
26 i = 0 1abcdefgh
27 while True: 1abcdefgh
28 yield f"item {i}\n" 1abcdefgh
29 i += 1 1abcdefgh
32@app.get("/stream-jsonl") 1opqr
33async def stream_jsonl() -> AsyncIterable[int]: 1opqr
34 """JSONL async generator with no internal await."""
35 i = 0 1ijstklmn
36 while True: 1ijstklmn
37 yield i 1ijstklmn
38 i += 1 1ijstklmn
41async def _run_asgi_and_cancel(app: FastAPI, path: str, timeout: float) -> bool: 1opqr
42 """Call the ASGI app for *path* and cancel after *timeout* seconds.
44 Returns `True` if the cancellation was delivered (i.e. it did not hang).
45 """
46 chunks: list[bytes] = [] 1ijabcdklefmngh
48 async def receive(): # type: ignore[no-untyped-def] 1ijabcdklefmngh
49 # Simulate a client that never disconnects, rely on cancellation
50 await anyio.sleep(float("inf")) 1ijabcdklefmngh
51 return {"type": "http.disconnect"} # pragma: no cover
53 async def send(message: dict) -> None: # type: ignore[type-arg] 1ijabcdklefmngh
54 if message["type"] == "http.response.body": 1ijabcdklefmngh
55 chunks.append(message.get("body", b"")) 1ijabcdklefmngh
57 scope = { 1ijabcdklefmngh
58 "type": "http",
59 "asgi": {"version": "3.0", "spec_version": "2.0"},
60 "http_version": "1.1",
61 "method": "GET",
62 "path": path,
63 "query_string": b"",
64 "root_path": "",
65 "headers": [],
66 "server": ("test", 80),
67 }
69 with anyio.move_on_after(timeout) as cancel_scope: 1ijabcdklefmngh
70 await app(scope, receive, send) # type: ignore[arg-type] 1ijabcdklefmngh
72 # If we got here within the timeout the generator was cancellable.
73 # cancel_scope.cancelled_caught is True when move_on_after fired.
74 return cancel_scope.cancelled_caught or len(chunks) > 0 1ijabcdklefmngh
77async def test_raw_stream_cancellation() -> None: 1opqr
78 """Raw streaming endpoint should be cancellable within a reasonable time."""
79 cancelled = await _run_asgi_and_cancel(app, "/stream-raw", timeout=3.0) 1abcdefgh
80 # The key assertion: we reached this line at all (didn't hang).
81 # cancelled will be True because the infinite generator was interrupted.
82 assert cancelled 1abcdefgh
85async def test_jsonl_stream_cancellation() -> None: 1opqr
86 """JSONL streaming endpoint should be cancellable within a reasonable time."""
87 cancelled = await _run_asgi_and_cancel(app, "/stream-jsonl", timeout=3.0) 1ijstklmn
88 assert cancelled 1ijstklmn