Coverage for docs_src / server_sent_events / tutorial004_py310.py: 100%

17 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from collections.abc import AsyncIterable 1abhicdefg

2from typing import Annotated 1abhicdefg

3 

4from fastapi import FastAPI, Header 1abhicdefg

5from fastapi.sse import EventSourceResponse, ServerSentEvent 1abhicdefg

6from pydantic import BaseModel 1abhicdefg

7 

8app = FastAPI() 1abhicdefg

9 

10 

11class Item(BaseModel): 1abhicdefg

12 name: str 1abcdefg

13 price: float 1abcdefg

14 

15 

16items = [ 1abhicdefg

17 Item(name="Plumbus", price=32.99), 

18 Item(name="Portal Gun", price=999.99), 

19 Item(name="Meeseeks Box", price=49.99), 

20] 

21 

22 

23@app.get("/items/stream", response_class=EventSourceResponse) 1abhicdefg

24async def stream_items( 1abhicdefg

25 last_event_id: Annotated[int | None, Header()] = None, 

26) -> AsyncIterable[ServerSentEvent]: 

27 start = last_event_id + 1 if last_event_id is not None else 0 1jkqlrmnsopt

28 for i, item in enumerate(items): 1jkqlrmnsopt

29 if i < start: 1jkqlrmnsopt

30 continue 1jklmnop

31 yield ServerSentEvent(data=item, id=str(i)) 1jkqlrmnsopt