Coverage for tests / test_tutorial / test_server_sent_events / test_tutorial002.py: 100%
29 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1import importlib 1efgh
3import pytest 1efgh
4from fastapi.testclient import TestClient 1efgh
5from inline_snapshot import snapshot 1efgh
8@pytest.fixture( 1efgh
9 name="client",
10 params=[
11 pytest.param("tutorial002_py310"),
12 ],
13)
14def get_client(request: pytest.FixtureRequest): 1efgh
15 mod = importlib.import_module(f"docs_src.server_sent_events.{request.param}") 1ijklmnop
16 client = TestClient(mod.app) 1ijklmnop
17 return client 1ijklmnop
20def test_stream_items(client: TestClient): 1efgh
21 response = client.get("/items/stream") 1abcd
22 assert response.status_code == 200, response.text 1abcd
23 assert response.headers["content-type"] == "text/event-stream; charset=utf-8" 1abcd
25 lines = response.text.strip().split("\n") 1abcd
27 # First event is a comment-only event
28 assert lines[0] == ": stream of item updates" 1abcd
30 # Remaining lines contain event:, data:, id:, retry: fields
31 event_lines = [line for line in lines if line.startswith("event: ")] 1abcd
32 assert len(event_lines) == 3 1abcd
33 assert all(line == "event: item_update" for line in event_lines) 1abcd
35 data_lines = [line for line in lines if line.startswith("data: ")] 1abcd
36 assert len(data_lines) == 3 1abcd
38 id_lines = [line for line in lines if line.startswith("id: ")] 1abcd
39 assert id_lines == ["id: 1", "id: 2", "id: 3"] 1abcd
41 retry_lines = [line for line in lines if line.startswith("retry: ")] 1abcd
42 assert len(retry_lines) == 3 1abcd
43 assert all(line == "retry: 5000" for line in retry_lines) 1abcd
46def test_openapi_schema(client: TestClient): 1efgh
47 response = client.get("/openapi.json") 1qrst
48 assert response.status_code == 200, response.text 1qrst
49 assert response.json() == snapshot( 1qrst
50 {
51 "openapi": "3.1.0",
52 "info": {"title": "FastAPI", "version": "0.1.0"},
53 "paths": {
54 "/items/stream": {
55 "get": {
56 "summary": "Stream Items",
57 "operationId": "stream_items_items_stream_get",
58 "responses": {
59 "200": {
60 "description": "Successful Response",
61 "content": {
62 "text/event-stream": {
63 "itemSchema": {
64 "type": "object",
65 "properties": {
66 "data": {"type": "string"},
67 "event": {"type": "string"},
68 "id": {"type": "string"},
69 "retry": {
70 "type": "integer",
71 "minimum": 0,
72 },
73 },
74 }
75 }
76 },
77 }
78 },
79 }
80 }
81 },
82 }
83 )