Coverage for tests / test_tutorial / test_server_sent_events / test_tutorial002.py: 100%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1import importlib 1efgh

2 

3import pytest 1efgh

4from fastapi.testclient import TestClient 1efgh

5from inline_snapshot import snapshot 1efgh

6 

7 

8@pytest.fixture( 1efgh

9 name="client", 

10 params=[ 

11 pytest.param("tutorial002_py310"), 

12 ], 

13) 

14def get_client(request: pytest.FixtureRequest): 1efgh

15 mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}") 1ijklmnop

16 client = TestClient(mod.app) 1ijklmnop

17 return client 1ijklmnop

18 

19 

20def test_stream_items(client: TestClient): 1efgh

21 response = client.get("/items/stream") 1abcd

22 assert response.status_code == 200, response.text 1abcd

23 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1abcd

24 

25 lines = response.text.strip().split("\n") 1abcd

26 

27 # First event is a comment-only event 

28 assert lines[0] == ": stream of item updates" 1abcd

29 

30 # Remaining lines contain event:, data:, id:, retry: fields 

31 event_lines = [line for line in lines if line.startswith("event: ")] 1abcd

32 assert len(event_lines) == 3 1abcd

33 assert all(line == "event: item_update" for line in event_lines) 1abcd

34 

35 data_lines = [line for line in lines if line.startswith("data: ")] 1abcd

36 assert len(data_lines) == 3 1abcd

37 

38 id_lines = [line for line in lines if line.startswith("id: ")] 1abcd

39 assert id_lines == ["id: 1", "id: 2", "id: 3"] 1abcd

40 

41 retry_lines = [line for line in lines if line.startswith("retry: ")] 1abcd

42 assert len(retry_lines) == 3 1abcd

43 assert all(line == "retry: 5000" for line in retry_lines) 1abcd

44 

45 

46def test_openapi_schema(client: TestClient): 1efgh

47 response = client.get("/openapi.json") 1qrst

48 assert response.status_code == 200, response.text 1qrst

49 assert response.json() == snapshot( 1qrst

50 { 

51 "openapi": "3.1.0", 

52 "info": {"title": "FastAPI", "version": "0.1.0"}, 

53 "paths": { 

54 "/items/stream": { 

55 "get": { 

56 "summary": "Stream Items", 

57 "operationId": "stream_items_items_stream_get", 

58 "responses": { 

59 "200": { 

60 "description": "Successful Response", 

61 "content": { 

62 "text/event-stream": { 

63 "itemSchema": { 

64 "type": "object", 

65 "properties": { 

66 "data": {"type": "string"}, 

67 "event": {"type": "string"}, 

68 "id": {"type": "string"}, 

69 "retry": { 

70 "type": "integer", 

71 "minimum": 0, 

72 }, 

73 }, 

74 } 

75 } 

76 }, 

77 } 

78 }, 

79 } 

80 } 

81 }, 

82 } 

83 )