Coverage for tests / test_stream_json_validation_error.py: 100%

25 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from collections.abc import AsyncIterable, Iterable 1adbc

2 

3import pytest 1adbc

4from fastapi import FastAPI 1adbc

5from fastapi.exceptions import ResponseValidationError 1adbc

6from fastapi.testclient import TestClient 1adbc

7from pydantic import BaseModel 1adbc

8 

9 

10class Item(BaseModel): 1adbc

11 name: str 1abc

12 price: float 1abc

13 

14 

15app = FastAPI() 1adbc

16 

17 

18@app.get("/items/stream-invalid") 1adbc

19async def stream_items_invalid() -> AsyncIterable[Item]: 1adbc

20 yield {"name": "valid", "price": 1.0} 1efgh

21 yield {"name": "invalid", "price": "not-a-number"} 1efgh

22 

23 

24@app.get("/items/stream-invalid-sync") 1adbc

25def stream_items_invalid_sync() -> Iterable[Item]: 1adbc

26 yield {"name": "valid", "price": 1.0} 1ijkl

27 yield {"name": "invalid", "price": "not-a-number"} 1ijkl

28 

29 

30client = TestClient(app) 1adbc

31 

32 

33def test_stream_json_validation_error_async(): 1adbc

34 with pytest.raises(ResponseValidationError): 1efgh

35 client.get("/items/stream-invalid") 1efgh

36 

37 

38def test_stream_json_validation_error_sync(): 1adbc

39 with pytest.raises(ResponseValidationError): 1ijkl

40 client.get("/items/stream-invalid-sync") 1ijkl