Coverage for docs_src / security / tutorial005_py310.py: 100%

95 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from datetime import datetime, timedelta, timezone 1abhicdefg

2 

3import jwt 1abhicdefg

4from fastapi import Depends, FastAPI, HTTPException, Security, status 1abhicdefg

5from fastapi.security import ( 1abhicdefg

6 OAuth2PasswordBearer, 

7 OAuth2PasswordRequestForm, 

8 SecurityScopes, 

9) 

10from jwt.exceptions import InvalidTokenError 1abhicdefg

11from pwdlib import PasswordHash 1abhicdefg

12from pydantic import BaseModel, ValidationError 1abhicdefg

13 

14# to get a string like this run: 

15# openssl rand -hex 32 

16SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abhicdefg

17ALGORITHM = "HS256" 1abhicdefg

18ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abhicdefg

19 

20 

21fake_users_db = { 1abhicdefg

22 "johndoe": { 

23 "username": "johndoe", 

24 "full_name": "John Doe", 

25 "email": "[email protected]", 

26 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc", 

27 "disabled": False, 

28 }, 

29 "alice": { 

30 "username": "alice", 

31 "full_name": "Alice Chains", 

32 "email": "[email protected]", 

33 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE", 

34 "disabled": True, 

35 }, 

36} 

37 

38 

39class Token(BaseModel): 1abhicdefg

40 access_token: str 1abcdefg

41 token_type: str 1abcdefg

42 

43 

44class TokenData(BaseModel): 1abhicdefg

45 username: str | None = None 1abhicdefg

46 scopes: list[str] = [] 1abhicdefg

47 

48 

49class User(BaseModel): 1abhicdefg

50 username: str 1abcdefg

51 email: str | None = None 1abhicdefg

52 full_name: str | None = None 1abhicdefg

53 disabled: bool | None = None 1abhicdefg

54 

55 

56class UserInDB(User): 1abhicdefg

57 hashed_password: str 1abcdefg

58 

59 

60password_hash = PasswordHash.recommended() 1abhicdefg

61 

62DUMMY_HASH = password_hash.hash("dummypassword") 1abhicdefg

63 

64oauth2_scheme = OAuth2PasswordBearer( 1abhicdefg

65 tokenUrl="token", 

66 scopes={"me": "Read information about the current user.", "items": "Read items."}, 

67) 

68 

69app = FastAPI() 1abhicdefg

70 

71 

72def verify_password(plain_password, hashed_password): 1abhicdefg

73 return password_hash.verify(plain_password, hashed_password) 1yLPjskpt5zMANQlumqv6BORnworx7

74 

75 

76def get_password_hash(password): 1abhicdefg

77 return password_hash.hash(password) 189!#

78 

79 

80def get_user(db, username: str): 1abhicdefg

81 if username in db: 1yLPjskptEFzMANQlumqvGHBORnworxIJ

82 user_dict = db[username] 1yLjskptzMANlumqvBOnworx

83 return UserInDB(**user_dict) 1yLjskptzMANlumqvBOnworx

84 

85 

86def authenticate_user(fake_db, username: str, password: str): 1abhicdefg

87 user = get_user(fake_db, username) 1yLPjskptzMANQlumqvBORnworx

88 if not user: 1yLPjskptzMANQlumqvBORnworx

89 verify_password(password, DUMMY_HASH) 1P1QR

90 return False 1P1QR

91 if not verify_password(password, user.hashed_password): 1yLjskptzMANlumqvBOnworx

92 return False 1LMNO

93 return user 1yjskptzDAlumqvBnworx

94 

95 

96def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abhicdefg

97 to_encode = data.copy() 1YyjskptzDZAlumqv0Bnworx

98 if expires_delta: 1YyjskptzDZAlumqv0Bnworx

99 expire = datetime.now(timezone.utc) + expires_delta 1yjskptzDAlumqvBnworx

100 else: 

101 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1Y$Z0

102 to_encode.update({"exp": expire}) 1YyjskptzDZAlumqv0Bnworx

103 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1YyjskptzDZAlumqv0Bnworx

104 return encoded_jwt 1YyjskptzDZAlumqv0Bnworx

105 

106 

107async def get_current_user( 1abhicdefg

108 security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) 

109): 

110 if security_scopes.scopes: 1VjskptSEFCKWlumqvTGHXnworxUIJ

111 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 1VjkptSEFCKWlmqvTGHXnorxUIJ

112 else: 

113 authenticate_value = "Bearer" 1s2uw

114 credentials_exception = HTTPException( 1VjskptSEFCKWlumqvTGHXnworxUIJ

115 status_code=status.HTTP_401_UNAUTHORIZED, 

116 detail="Could not validate credentials", 

117 headers={"WWW-Authenticate": authenticate_value}, 

118 ) 

119 try: 1VjskptSEFCKWlumqvTGHXnworxUIJ

120 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1VjskptSEFCKWlumqvTGHXnworxUIJ

121 username: str = payload.get("sub") 1jskptSEFCKlumqvTGHnworxUIJ

122 if username is None: 1jskptSEFCKlumqvTGHnworxUIJ

123 raise credentials_exception 1S%TU

124 scope: str = payload.get("scope", "") 1jskptEFCKlumqvGHnworxIJ

125 token_scopes = scope.split(" ") 1jskptEFCKlumqvGHnworxIJ

126 token_data = TokenData(scopes=token_scopes, username=username) 1jskptEFCKlumqvGHnworxIJ

127 except (InvalidTokenError, ValidationError): 1VS3WTXU

128 raise credentials_exception 1V3WX

129 user = get_user(fake_users_db, username=token_data.username) 1jskptEFCKlumqvGHnworxIJ

130 if user is None: 1jskptEFCKlumqvGHnworxIJ

131 raise credentials_exception 1EFKGHIJ

132 for scope in security_scopes.scopes: 1jskptCDlumqvnworx

133 if scope not in token_data.scopes: 1jkptCDlmqvnorx

134 raise HTTPException( 1t'vx

135 status_code=status.HTTP_401_UNAUTHORIZED, 

136 detail="Not enough permissions", 

137 headers={"WWW-Authenticate": authenticate_value}, 

138 ) 

139 return user 1jskpCDlumqnwor

140 

141 

142async def get_current_active_user( 1abhicdefg

143 current_user: User = Security(get_current_user, scopes=["me"]), 

144): 

145 if current_user.disabled: 1jkpCDlmqnor

146 raise HTTPException(status_code=400, detail="Inactive user") 1pDqr

147 return current_user 1jk4Clmno

148 

149 

150@app.post("/token") 1abhicdefg

151async def login_for_access_token( 1abhicdefg

152 form_data: OAuth2PasswordRequestForm = Depends(), 

153) -> Token: 

154 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1yLPjskptzMANQlumqvBORnworx

155 if not user: 1yLPjskptzMANQlumqvBORnworx

156 raise HTTPException(status_code=400, detail="Incorrect username or password") 1LPM1NQOR

157 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1yjskptzDAlumqvBnworx

158 access_token = create_access_token( 1yjskptzDAlumqvBnworx

159 data={"sub": user.username, "scope": " ".join(form_data.scopes)}, 

160 expires_delta=access_token_expires, 

161 ) 

162 return Token(access_token=access_token, token_type="bearer") 1yjskptzDAlumqvBnworx

163 

164 

165@app.get("/users/me/") 1abhicdefg

166async def read_users_me(current_user: User = Depends(get_current_active_user)) -> User: 1abhicdefg

167 return current_user 1kCmo

168 

169 

170@app.get("/users/me/items/") 1abhicdefg

171async def read_own_items( 1abhicdefg

172 current_user: User = Security(get_current_active_user, scopes=["items"]), 

173): 

174 return [{"item_id": "Foo", "owner": current_user.username}] 1j4ln

175 

176 

177@app.get("/status/") 1abhicdefg

178async def read_system_status(current_user: User = Depends(get_current_user)): 1abhicdefg

179 return {"status": "ok"} 1s2uw