Coverage for docs_src / server_sent_events / tutorial004_py310.py: 100%
17 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1from collections.abc import AsyncIterable 1abhicdefg
2from typing import Annotated 1abhicdefg
4from fastapi import FastAPI, Header 1abhicdefg
5from fastapi.sse import EventSourceResponse, ServerSentEvent 1abhicdefg
6from pydantic import BaseModel 1abhicdefg
8app = FastAPI() 1abhicdefg
11class Item(BaseModel): 1abhicdefg
12 name: str 1abcdefg
13 price: float 1abcdefg
16items = [ 1abhicdefg
17 Item(name="Plumbus", price=32.99),
18 Item(name="Portal Gun", price=999.99),
19 Item(name="Meeseeks Box", price=49.99),
20]
23@app.get("/items/stream", response_class=EventSourceResponse) 1abhicdefg
24async def stream_items( 1abhicdefg
25 last_event_id: Annotated[int | None, Header()] = None,
26) -> AsyncIterable[ServerSentEvent]:
27 start = last_event_id + 1 if last_event_id is not None else 0 1jkqlrmnsopt
28 for i, item in enumerate(items): 1jkqlrmnsopt
29 if i < start: 1jkqlrmnsopt
30 continue 1jklmnop
31 yield ServerSentEvent(data=item, id=str(i)) 1jkqlrmnsopt