Coverage for docs_src / server_sent_events / tutorial002_py310.py: 100%
14 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1from collections.abc import AsyncIterable 1afgbcde
3from fastapi import FastAPI 1afgbcde
4from fastapi.sse import EventSourceResponse, ServerSentEvent 1afgbcde
5from pydantic import BaseModel 1afgbcde
7app = FastAPI() 1afgbcde
10class Item(BaseModel): 1afgbcde
11 name: str 1abcde
12 price: float 1abcde
15items = [ 1afgbcde
16 Item(name="Plumbus", price=32.99),
17 Item(name="Portal Gun", price=999.99),
18 Item(name="Meeseeks Box", price=49.99),
19]
22@app.get("/items/stream", response_class=EventSourceResponse) 1afgbcde
23async def stream_items() -> AsyncIterable[ServerSentEvent]: 1afgbcde
24 yield ServerSentEvent(comment="stream of item updates") 1hijk
25 for i, item in enumerate(items): 1hijk
26 yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000) 1hijk