Coverage for tests / test_stream_json_validation_error.py: 100%
25 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1from collections.abc import AsyncIterable, Iterable 1adbc
3import pytest 1adbc
4from fastapi import FastAPI 1adbc
5from fastapi.exceptions import ResponseValidationError 1adbc
6from fastapi.testclient import TestClient 1adbc
7from pydantic import BaseModel 1adbc
10class Item(BaseModel): 1adbc
11 name: str 1abc
12 price: float 1abc
15app = FastAPI() 1adbc
18@app.get("/items/stream-invalid") 1adbc
19async def stream_items_invalid() -> AsyncIterable[Item]: 1adbc
20 yield {"name": "valid", "price": 1.0} 1efgh
21 yield {"name": "invalid", "price": "not-a-number"} 1efgh
24@app.get("/items/stream-invalid-sync") 1adbc
25def stream_items_invalid_sync() -> Iterable[Item]: 1adbc
26 yield {"name": "valid", "price": 1.0} 1ijkl
27 yield {"name": "invalid", "price": "not-a-number"} 1ijkl
30client = TestClient(app) 1adbc
33def test_stream_json_validation_error_async(): 1adbc
34 with pytest.raises(ResponseValidationError): 1efgh
35 client.get("/items/stream-invalid") 1efgh
38def test_stream_json_validation_error_sync(): 1adbc
39 with pytest.raises(ResponseValidationError): 1ijkl
40 client.get("/items/stream-invalid-sync") 1ijkl