Coverage for docs_src / server_sent_events / tutorial001_py310.py: 100%

25 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from collections.abc import AsyncIterable, Iterable 1abhicdefg

2 

3from fastapi import FastAPI 1abhicdefg

4from fastapi.sse import EventSourceResponse 1abhicdefg

5from pydantic import BaseModel 1abhicdefg

6 

7app = FastAPI() 1abhicdefg

8 

9 

10class Item(BaseModel): 1abhicdefg

11 name: str 1abcdefg

12 description: str | None 1abcdefg

13 

14 

15items = [ 1abhicdefg

16 Item(name="Plumbus", description="A multi-purpose household device."), 

17 Item(name="Portal Gun", description="A portal opening device."), 

18 Item(name="Meeseeks Box", description="A box that summons a Meeseeks."), 

19] 

20 

21 

22@app.get("/items/stream", response_class=EventSourceResponse) 1abhicdefg

23async def sse_items() -> AsyncIterable[Item]: 1abhicdefg

24 for item in items: 1jklm

25 yield item 1jklm

26 

27 

28@app.get("/items/stream-no-async", response_class=EventSourceResponse) 1abhicdefg

29def sse_items_no_async() -> Iterable[Item]: 1abhicdefg

30 for item in items: 1nopq

31 yield item 1nopq

32 

33 

34@app.get("/items/stream-no-annotation", response_class=EventSourceResponse) 1abhicdefg

35async def sse_items_no_annotation(): 1abhicdefg

36 for item in items: 1rstu

37 yield item 1rstu

38 

39 

40@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse) 1abhicdefg

41def sse_items_no_async_no_annotation(): 1abhicdefg

42 for item in items: 1vwxy

43 yield item 1vwxy