Coverage for docs_src / server_sent_events / tutorial005_py310.py: 100%

13 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from collections.abc import AsyncIterable 1abghcdef

2 

3from fastapi import FastAPI 1abghcdef

4from fastapi.sse import EventSourceResponse, ServerSentEvent 1abghcdef

5from pydantic import BaseModel 1abghcdef

6 

7app = FastAPI() 1abghcdef

8 

9 

10class Prompt(BaseModel): 1abghcdef

11 text: str 1abcdef

12 

13 

14@app.post("/chat/stream", response_class=EventSourceResponse) 1abghcdef

15async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]: 1abghcdef

16 words = prompt.text.split() 1ijkl

17 for word in words: 1ijkl

18 yield ServerSentEvent(data=word, event="token") 1ijkl

19 yield ServerSentEvent(raw_data="[DONE]", event="done") 1ijkl