Coverage for docs_src / server_sent_events / tutorial002_py310.py: 100%

14 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from collections.abc import AsyncIterable 1afgbcde

2 

3from fastapi import FastAPI 1afgbcde

4from fastapi.sse import EventSourceResponse, ServerSentEvent 1afgbcde

5from pydantic import BaseModel 1afgbcde

6 

7app = FastAPI() 1afgbcde

8 

9 

10class Item(BaseModel): 1afgbcde

11 name: str 1abcde

12 price: float 1abcde

13 

14 

15items = [ 1afgbcde

16 Item(name="Plumbus", price=32.99), 

17 Item(name="Portal Gun", price=999.99), 

18 Item(name="Meeseeks Box", price=49.99), 

19] 

20 

21 

22@app.get("/items/stream", response_class=EventSourceResponse) 1afgbcde

23async def stream_items() -> AsyncIterable[ServerSentEvent]: 1afgbcde

24 yield ServerSentEvent(comment="stream of item updates") 1hijk

25 for i, item in enumerate(items): 1hijk

26 yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000) 1hijk