Coverage for tests / test_tutorial / test_stream_json_lines / test_tutorial001.py: 100%

22 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1import importlib 1abcd

2import json 1abcd

3 

4import pytest 1abcd

5from fastapi.testclient import TestClient 1abcd

6from inline_snapshot import snapshot 1abcd

7 

8 

9@pytest.fixture( 1abcd

10 name="client", 

11 params=[ 

12 pytest.param("tutorial001_py310"), 

13 ], 

14) 

15def get_client(request: pytest.FixtureRequest): 1abcd

16 mod = importlib.import_module(f"docs_src.stream_json_lines.{request.param}") 1stuvwxyzABCDEFGHI

17 

18 client = TestClient(mod.app) 1stuvwxyzABCDEFGHI

19 return client 1stuvwxyzABCDEFGHI

20 

21 

22expected_items = [ 1abcd

23 {"name": "Plumbus", "description": "A multi-purpose household device."}, 

24 {"name": "Portal Gun", "description": "A portal opening device."}, 

25 {"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}, 

26] 

27 

28 

29@pytest.mark.parametrize( 1abcd

30 "path", 

31 [ 

32 "/items/stream", 

33 "/items/stream-no-async", 

34 "/items/stream-no-annotation", 

35 "/items/stream-no-async-no-annotation", 

36 ], 

37) 

38def test_stream_items(client: TestClient, path: str): 1abcd

39 response = client.get(path) 1efghijklmnopqr

40 assert response.status_code == 200, response.text 1efghijklmnopqr

41 assert response.headers["content-type"] == "application/jsonl" 1efghijklmnopqr

42 lines = [json.loads(line) for line in response.text.strip().splitlines()] 1efghijklmnopqr

43 assert lines == expected_items 1efghijklmnopqr

44 

45 

46def test_openapi_schema(client: TestClient): 1abcd

47 response = client.get("/openapi.json") 1JKLM

48 assert response.status_code == 200, response.text 1JKLM

49 assert response.json() == snapshot( 1JKLM

50 { 

51 "openapi": "3.1.0", 

52 "info": {"title": "FastAPI", "version": "0.1.0"}, 

53 "paths": { 

54 "/items/stream": { 

55 "get": { 

56 "responses": { 

57 "200": { 

58 "description": "Successful Response", 

59 "content": { 

60 "application/jsonl": { 

61 "itemSchema": { 

62 "$ref": "#/components/schemas/Item" 

63 }, 

64 } 

65 }, 

66 } 

67 }, 

68 "summary": "Stream Items", 

69 "operationId": "stream_items_items_stream_get", 

70 } 

71 }, 

72 "/items/stream-no-async": { 

73 "get": { 

74 "responses": { 

75 "200": { 

76 "description": "Successful Response", 

77 "content": { 

78 "application/jsonl": { 

79 "itemSchema": { 

80 "$ref": "#/components/schemas/Item" 

81 }, 

82 } 

83 }, 

84 } 

85 }, 

86 "summary": "Stream Items No Async", 

87 "operationId": "stream_items_no_async_items_stream_no_async_get", 

88 } 

89 }, 

90 "/items/stream-no-annotation": { 

91 "get": { 

92 "responses": { 

93 "200": { 

94 "description": "Successful Response", 

95 "content": { 

96 "application/jsonl": { 

97 "itemSchema": {}, 

98 } 

99 }, 

100 } 

101 }, 

102 "summary": "Stream Items No Annotation", 

103 "operationId": "stream_items_no_annotation_items_stream_no_annotation_get", 

104 } 

105 }, 

106 "/items/stream-no-async-no-annotation": { 

107 "get": { 

108 "responses": { 

109 "200": { 

110 "description": "Successful Response", 

111 "content": { 

112 "application/jsonl": { 

113 "itemSchema": {}, 

114 } 

115 }, 

116 } 

117 }, 

118 "summary": "Stream Items No Async No Annotation", 

119 "operationId": "stream_items_no_async_no_annotation_items_stream_no_async_no_annotation_get", 

120 } 

121 }, 

122 }, 

123 "components": { 

124 "schemas": { 

125 "Item": { 

126 "properties": { 

127 "name": {"type": "string", "title": "Name"}, 

128 "description": { 

129 "anyOf": [ 

130 {"type": "string"}, 

131 {"type": "null"}, 

132 ], 

133 "title": "Description", 

134 }, 

135 }, 

136 "type": "object", 

137 "required": ["name", "description"], 

138 "title": "Item", 

139 } 

140 } 

141 }, 

142 } 

143 )