Coverage for docs_src / security / tutorial003_an_py310.py: 100%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from typing import Annotated 1abghcdef

2 

3from fastapi import Depends, FastAPI, HTTPException, status 1abghcdef

4from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1abghcdef

5from pydantic import BaseModel 1abghcdef

6 

7fake_users_db = { 1abghcdef

8 "johndoe": { 

9 "username": "johndoe", 

10 "full_name": "John Doe", 

11 "email": "[email protected]", 

12 "hashed_password": "fakehashedsecret", 

13 "disabled": False, 

14 }, 

15 "alice": { 

16 "username": "alice", 

17 "full_name": "Alice Wonderson", 

18 "email": "[email protected]", 

19 "hashed_password": "fakehashedsecret2", 

20 "disabled": True, 

21 }, 

22} 

23 

24app = FastAPI() 1abghcdef

25 

26 

27def fake_hash_password(password: str): 1abghcdef

28 return "fakehashed" + password 1pqrstuvw

29 

30 

31oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1abghcdef

32 

33 

34class User(BaseModel): 1abghcdef

35 username: str 1abcdef

36 email: str | None = None 1abghcdef

37 full_name: str | None = None 1abghcdef

38 disabled: bool | None = None 1abghcdef

39 

40 

41class UserInDB(User): 1abghcdef

42 hashed_password: str 1abcdef

43 

44 

45def get_user(db, username: str): 1abghcdef

46 if username in db: 1mxijnykozl

47 user_dict = db[username] 1mijnkol

48 return UserInDB(**user_dict) 1mijnkol

49 

50 

51def fake_decode_token(token): 1abghcdef

52 # This doesn't provide any security at all 

53 # Check the next version 

54 user = get_user(fake_users_db, token) 1mxijnykozl

55 return user 1mxijnykozl

56 

57 

58async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): 1abghcdef

59 user = fake_decode_token(token) 1mxijnykozl

60 if not user: 1mxijnykozl

61 raise HTTPException( 1xDyz

62 status_code=status.HTTP_401_UNAUTHORIZED, 

63 detail="Not authenticated", 

64 headers={"WWW-Authenticate": "Bearer"}, 

65 ) 

66 return user 1mijnkol

67 

68 

69async def get_current_active_user( 1abghcdef

70 current_user: Annotated[User, Depends(get_current_user)], 

71): 

72 if current_user.disabled: 1mijnkol

73 raise HTTPException(status_code=400, detail="Inactive user") 1mEno

74 return current_user 1ijkl

75 

76 

77@app.post("/token") 1abghcdef

78async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]): 1abghcdef

79 user_dict = fake_users_db.get(form_data.username) 1pqArstuBvwC

80 if not user_dict: 1pqArstuBvwC

81 raise HTTPException(status_code=400, detail="Incorrect username or password") 1AFBC

82 user = UserInDB(**user_dict) 1pqrstuvw

83 hashed_password = fake_hash_password(form_data.password) 1pqrstuvw

84 if not hashed_password == user.hashed_password: 1pqrstuvw

85 raise HTTPException(status_code=400, detail="Incorrect username or password") 1qsuw

86 

87 return {"access_token": user.username, "token_type": "bearer"} 1prtv

88 

89 

90@app.get("/users/me") 1abghcdef

91async def read_users_me( 1abghcdef

92 current_user: Annotated[User, Depends(get_current_active_user)], 

93): 

94 return current_user 1ijkl