Coverage for docs_src / security / tutorial004_an_py310.py: 100%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from datetime import datetime, timedelta, timezone 1abhicdefg

2from typing import Annotated 1abhicdefg

3 

4import jwt 1abhicdefg

5from fastapi import Depends, FastAPI, HTTPException, status 1abhicdefg

6from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1abhicdefg

7from jwt.exceptions import InvalidTokenError 1abhicdefg

8from pwdlib import PasswordHash 1abhicdefg

9from pydantic import BaseModel 1abhicdefg

10 

11# to get a string like this run: 

12# openssl rand -hex 32 

13SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abhicdefg

14ALGORITHM = "HS256" 1abhicdefg

15ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abhicdefg

16 

17 

18fake_users_db = { 1abhicdefg

19 "johndoe": { 

20 "username": "johndoe", 

21 "full_name": "John Doe", 

22 "email": "[email protected]", 

23 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc", 

24 "disabled": False, 

25 } 

26} 

27 

28 

29class Token(BaseModel): 1abhicdefg

30 access_token: str 1abcdefg

31 token_type: str 1abcdefg

32 

33 

34class TokenData(BaseModel): 1abhicdefg

35 username: str | None = None 1abhicdefg

36 

37 

38class User(BaseModel): 1abhicdefg

39 username: str 1abcdefg

40 email: str | None = None 1abhicdefg

41 full_name: str | None = None 1abhicdefg

42 disabled: bool | None = None 1abhicdefg

43 

44 

45class UserInDB(User): 1abhicdefg

46 hashed_password: str 1abcdefg

47 

48 

49password_hash = PasswordHash.recommended() 1abhicdefg

50 

51DUMMY_HASH = password_hash.hash("dummypassword") 1abhicdefg

52 

53oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1abhicdefg

54 

55app = FastAPI() 1abhicdefg

56 

57 

58def verify_password(plain_password, hashed_password): 1abhicdefg

59 return password_hash.verify(plain_password, hashed_password) 1txGjklYuHvyImnoZwzJpqr0

60 

61 

62def get_password_hash(password): 1abhicdefg

63 return password_hash.hash(password) 11l2U3o4r

64 

65 

66def get_user(db, username: str): 1abhicdefg

67 if username in db: 1txGjklABuHvyImnoCDwzJpqrEF

68 user_dict = db[username] 1txjklusvymnowzpqr

69 return UserInDB(**user_dict) 1txjklusvymnowzpqr

70 

71 

72def authenticate_user(fake_db, username: str, password: str): 1abhicdefg

73 user = get_user(fake_db, username) 1txGjkluHvyImnowzJpqr

74 if not user: 1txGjkluHvyImnowzJpqr

75 verify_password(password, DUMMY_HASH) 1GHIJ

76 return False 1GHIJ

77 if not verify_password(password, user.hashed_password): 1txjklusvymnowzpqr

78 return False 1xXyz

79 return user 1tjklusvmnowpqr

80 

81 

82def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abhicdefg

83 to_encode = data.copy() 1NtjklusOvmnoPwpqr

84 if expires_delta: 1NtjklusOvmnoPwpqr

85 expire = datetime.now(timezone.utc) + expires_delta 1tjklusvmnowpqr

86 else: 

87 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1N5OP

88 to_encode.update({"exp": expire}) 1NtjklusOvmnoPwpqr

89 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1NtjklusOvmnoPwpqr

90 return encoded_jwt 1NtjklusOvmnoPwpqr

91 

92 

93async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): 1abhicdefg

94 credentials_exception = HTTPException( 1QjklKABRsSmnoLCDTpqrMEF

95 status_code=status.HTTP_401_UNAUTHORIZED, 

96 detail="Could not validate credentials", 

97 headers={"WWW-Authenticate": "Bearer"}, 

98 ) 

99 try: 1QjklKABRsSmnoLCDTpqrMEF

100 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1QjklKABRsSmnoLCDTpqrMEF

101 username = payload.get("sub") 1jklKABsWmnoLCDpqrMEF

102 if username is None: 1jklKABsWmnoLCDpqrMEF

103 raise credentials_exception 1KWLM

104 token_data = TokenData(username=username) 1jklABsVmnoCDpqrEF

105 except InvalidTokenError: 1QKRSLTM

106 raise credentials_exception 1QRST

107 user = get_user(fake_users_db, username=token_data.username) 1jklABsVmnoCDpqrEF

108 if user is None: 1jklABsVmnoCDpqrEF

109 raise credentials_exception 1ABVCDEF

110 return user 1jklsUmnopqr

111 

112 

113async def get_current_active_user( 1abhicdefg

114 current_user: Annotated[User, Depends(get_current_user)], 

115): 

116 if current_user.disabled: 1jklsUmnopqr

117 raise HTTPException(status_code=400, detail="Inactive user") 1lUor

118 return current_user 1jksmnpq

119 

120 

121@app.post("/token") 1abhicdefg

122async def login_for_access_token( 1abhicdefg

123 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

124) -> Token: 

125 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1txGjkluHvyImnowzJpqr

126 if not user: 1txGjkluHvyImnowzJpqr

127 raise HTTPException( 1xGXHyIzJ

128 status_code=status.HTTP_401_UNAUTHORIZED, 

129 detail="Incorrect username or password", 

130 headers={"WWW-Authenticate": "Bearer"}, 

131 ) 

132 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1tjklusvmnowpqr

133 access_token = create_access_token( 1tjklusvmnowpqr

134 data={"sub": user.username}, expires_delta=access_token_expires 

135 ) 

136 return Token(access_token=access_token, token_type="bearer") 1tjklusvmnowpqr

137 

138 

139@app.get("/users/me/") 1abhicdefg

140async def read_users_me( 1abhicdefg

141 current_user: Annotated[User, Depends(get_current_active_user)], 

142) -> User: 

143 return current_user 1ksnq

144 

145 

146@app.get("/users/me/items/") 1abhicdefg

147async def read_own_items( 1abhicdefg

148 current_user: Annotated[User, Depends(get_current_active_user)], 

149): 

150 return [{"item_id": "Foo", "owner": current_user.username}] 1j6mp