Coverage for docs_src / server_sent_events / tutorial005_py310.py: 100%
13 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1from collections.abc import AsyncIterable 1abghcdef
3from fastapi import FastAPI 1abghcdef
4from fastapi.sse import EventSourceResponse, ServerSentEvent 1abghcdef
5from pydantic import BaseModel 1abghcdef
7app = FastAPI() 1abghcdef
10class Prompt(BaseModel): 1abghcdef
11 text: str 1abcdef
14@app.post("/chat/stream", response_class=EventSourceResponse) 1abghcdef
15async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]: 1abghcdef
16 words = prompt.text.split() 1ijkl
17 for word in words: 1ijkl
18 yield ServerSentEvent(data=word, event="token") 1ijkl
19 yield ServerSentEvent(raw_data="[DONE]", event="done") 1ijkl