Coverage for docs_src / stream_data / tutorial001_py310.py: 100%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from collections.abc import AsyncIterable, Iterable 1abcdefghi

2 

3from fastapi import FastAPI 1abcdefghi

4from fastapi.responses import StreamingResponse 1abcdefghi

5 

6app = FastAPI() 1abcdefghi

7 

8 

9message = """ 1abcdefghi

10Rick: (stumbles in drunkenly, and turns on the lights) Morty! You gotta come on. You got--... you gotta come with me. 

11Morty: (rubs his eyes) What, Rick? What's going on? 

12Rick: I got a surprise for you, Morty. 

13Morty: It's the middle of the night. What are you talking about? 

14Rick: (spills alcohol on Morty's bed) Come on, I got a surprise for you. (drags Morty by the ankle) Come on, hurry up. (pulls Morty out of his bed and into the hall) 

15Morty: Ow! Ow! You're tugging me too hard! 

16Rick: We gotta go, gotta get outta here, come on. Got a surprise for you Morty. 

17""" 

18 

19 

20@app.get("/story/stream", response_class=StreamingResponse) 1abcdefghi

21async def stream_story() -> AsyncIterable[str]: 1abcdefghi

22 for line in message.splitlines(): 1jklm

23 yield line 1jklm

24 

25 

26@app.get("/story/stream-no-async", response_class=StreamingResponse) 1abcdefghi

27def stream_story_no_async() -> Iterable[str]: 1abcdefghi

28 for line in message.splitlines(): 1nopq

29 yield line 1nopq

30 

31 

32@app.get("/story/stream-no-annotation", response_class=StreamingResponse) 1abcdefghi

33async def stream_story_no_annotation(): 1abcdefghi

34 for line in message.splitlines(): 1rstu

35 yield line 1rstu

36 

37 

38@app.get("/story/stream-no-async-no-annotation", response_class=StreamingResponse) 1abcdefghi

39def stream_story_no_async_no_annotation(): 1abcdefghi

40 for line in message.splitlines(): 1vwxy

41 yield line 1vwxy

42 

43 

44@app.get("/story/stream-bytes", response_class=StreamingResponse) 1abcdefghi

45async def stream_story_bytes() -> AsyncIterable[bytes]: 1abcdefghi

46 for line in message.splitlines(): 1zABC

47 yield line.encode("utf-8") 1zABC

48 

49 

50@app.get("/story/stream-no-async-bytes", response_class=StreamingResponse) 1abcdefghi

51def stream_story_no_async_bytes() -> Iterable[bytes]: 1abcdefghi

52 for line in message.splitlines(): 1DEFG

53 yield line.encode("utf-8") 1DEFG

54 

55 

56@app.get("/story/stream-no-annotation-bytes", response_class=StreamingResponse) 1abcdefghi

57async def stream_story_no_annotation_bytes(): 1abcdefghi

58 for line in message.splitlines(): 1HIJK

59 yield line.encode("utf-8") 1HIJK

60 

61 

62@app.get("/story/stream-no-async-no-annotation-bytes", response_class=StreamingResponse) 1abcdefghi

63def stream_story_no_async_no_annotation_bytes(): 1abcdefghi

64 for line in message.splitlines(): 1LMNO

65 yield line.encode("utf-8") 1LMNO