Coverage for tests / test_stream_cancellation.py: 100%

36 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1""" 

2Test that async streaming endpoints can be cancelled without hanging. 

3 

4Ref: https://github.com/fastapi/fastapi/issues/14680 

5""" 

6 

7from collections.abc import AsyncIterable 1opqr

8 

9import anyio 1opqr

10import pytest 1opqr

11from fastapi import FastAPI 1opqr

12from fastapi.responses import StreamingResponse 1opqr

13 

14pytestmark = [ 1opqr

15 pytest.mark.anyio, 

16 pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning"), 

17] 

18 

19 

20app = FastAPI() 1opqr

21 

22 

23@app.get("/stream-raw", response_class=StreamingResponse) 1opqr

24async def stream_raw() -> AsyncIterable[str]: 1opqr

25 """Async generator with no internal await - would hang without checkpoint.""" 

26 i = 0 1abcdefgh

27 while True: 1abcdefgh

28 yield f"item {i}\n" 1abcdefgh

29 i += 1 1abcdefgh

30 

31 

32@app.get("/stream-jsonl") 1opqr

33async def stream_jsonl() -> AsyncIterable[int]: 1opqr

34 """JSONL async generator with no internal await.""" 

35 i = 0 1ijstklmn

36 while True: 1ijstklmn

37 yield i 1ijstklmn

38 i += 1 1ijstklmn

39 

40 

41async def _run_asgi_and_cancel(app: FastAPI, path: str, timeout: float) -> bool: 1opqr

42 """Call the ASGI app for *path* and cancel after *timeout* seconds. 

43 

44 Returns `True` if the cancellation was delivered (i.e. it did not hang). 

45 """ 

46 chunks: list[bytes] = [] 1ijabcdklefmngh

47 

48 async def receive(): # type: ignore[no-untyped-def] 1ijabcdklefmngh

49 # Simulate a client that never disconnects, rely on cancellation 

50 await anyio.sleep(float("inf")) 1ijabcdklefmngh

51 return {"type": "http.disconnect"} # pragma: no cover 

52 

53 async def send(message: dict) -> None: # type: ignore[type-arg] 1ijabcdklefmngh

54 if message["type"] == "http.response.body": 1ijabcdklefmngh

55 chunks.append(message.get("body", b"")) 1ijabcdklefmngh

56 

57 scope = { 1ijabcdklefmngh

58 "type": "http", 

59 "asgi": {"version": "3.0", "spec_version": "2.0"}, 

60 "http_version": "1.1", 

61 "method": "GET", 

62 "path": path, 

63 "query_string": b"", 

64 "root_path": "", 

65 "headers": [], 

66 "server": ("test", 80), 

67 } 

68 

69 with anyio.move_on_after(timeout) as cancel_scope: 1ijabcdklefmngh

70 await app(scope, receive, send) # type: ignore[arg-type] 1ijabcdklefmngh

71 

72 # If we got here within the timeout the generator was cancellable. 

73 # cancel_scope.cancelled_caught is True when move_on_after fired. 

74 return cancel_scope.cancelled_caught or len(chunks) > 0 1ijabcdklefmngh

75 

76 

77async def test_raw_stream_cancellation() -> None: 1opqr

78 """Raw streaming endpoint should be cancellable within a reasonable time.""" 

79 cancelled = await _run_asgi_and_cancel(app, "/stream-raw", timeout=3.0) 1abcdefgh

80 # The key assertion: we reached this line at all (didn't hang). 

81 # cancelled will be True because the infinite generator was interrupted. 

82 assert cancelled 1abcdefgh

83 

84 

85async def test_jsonl_stream_cancellation() -> None: 1opqr

86 """JSONL streaming endpoint should be cancellable within a reasonable time.""" 

87 cancelled = await _run_asgi_and_cancel(app, "/stream-jsonl", timeout=3.0) 1ijstklmn

88 assert cancelled 1ijstklmn