Coverage for docs_src / server_sent_events / tutorial003_py310.py: 100%
9 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1from collections.abc import AsyncIterable 1abcdefg
3from fastapi import FastAPI 1abcdefg
4from fastapi.sse import EventSourceResponse, ServerSentEvent 1abcdefg
6app = FastAPI() 1abcdefg
9@app.get("/logs/stream", response_class=EventSourceResponse) 1abcdefg
10async def stream_logs() -> AsyncIterable[ServerSentEvent]: 1abcdefg
11 logs = [ 1hijk
12 "2025-01-01 INFO Application started",
13 "2025-01-01 DEBUG Connected to database",
14 "2025-01-01 WARN High memory usage detected",
15 ]
16 for log_line in logs: 1hijk
17 yield ServerSentEvent(raw_data=log_line) 1hijk