Coverage for docs_src / security / tutorial004_py310.py: 100%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-04-06 01:24 +0000

1from datetime import datetime, timedelta, timezone 1abhicdefg

2 

3import jwt 1abhicdefg

4from fastapi import Depends, FastAPI, HTTPException, status 1abhicdefg

5from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1abhicdefg

6from jwt.exceptions import InvalidTokenError 1abhicdefg

7from pwdlib import PasswordHash 1abhicdefg

8from pydantic import BaseModel 1abhicdefg

9 

10# to get a string like this run: 

11# openssl rand -hex 32 

12SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abhicdefg

13ALGORITHM = "HS256" 1abhicdefg

14ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abhicdefg

15 

16 

17fake_users_db = { 1abhicdefg

18 "johndoe": { 

19 "username": "johndoe", 

20 "full_name": "John Doe", 

21 "email": "[email protected]", 

22 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc", 

23 "disabled": False, 

24 } 

25} 

26 

27 

28class Token(BaseModel): 1abhicdefg

29 access_token: str 1abcdefg

30 token_type: str 1abcdefg

31 

32 

33class TokenData(BaseModel): 1abhicdefg

34 username: str | None = None 1abhicdefg

35 

36 

37class User(BaseModel): 1abhicdefg

38 username: str 1abcdefg

39 email: str | None = None 1abhicdefg

40 full_name: str | None = None 1abhicdefg

41 disabled: bool | None = None 1abhicdefg

42 

43 

44class UserInDB(User): 1abhicdefg

45 hashed_password: str 1abcdefg

46 

47 

48password_hash = PasswordHash.recommended() 1abhicdefg

49 

50DUMMY_HASH = password_hash.hash("dummypassword") 1abhicdefg

51 

52oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1abhicdefg

53 

54app = FastAPI() 1abhicdefg

55 

56 

57def verify_password(plain_password, hashed_password): 1abhicdefg

58 return password_hash.verify(plain_password, hashed_password) 1txGjklYumvyHnopZwzIqrs0

59 

60 

61def get_password_hash(password): 1abhicdefg

62 return password_hash.hash(password) 11l2U3p4s

63 

64 

65def get_user(db, username: str): 1abhicdefg

66 if username in db: 1txGjklABumvyHnopCDwzIqrsEF

67 user_dict = db[username] 1txjklumvynopwzqrs

68 return UserInDB(**user_dict) 1txjklumvynopwzqrs

69 

70 

71def authenticate_user(fake_db, username: str, password: str): 1abhicdefg

72 user = get_user(fake_db, username) 1txGjklumvyHnopwzIqrs

73 if not user: 1txGjklumvyHnopwzIqrs

74 verify_password(password, DUMMY_HASH) 1GVHI

75 return False 1GVHI

76 if not verify_password(password, user.hashed_password): 1txjklumvynopwzqrs

77 return False 1xWyz

78 return user 1tjklumvnopwqrs

79 

80 

81def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abhicdefg

82 to_encode = data.copy() 1NtjklumOvnopPwqrs

83 if expires_delta: 1NtjklumOvnopPwqrs

84 expire = datetime.now(timezone.utc) + expires_delta 1tjklumvnopwqrs

85 else: 

86 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1N5OP

87 to_encode.update({"exp": expire}) 1NtjklumOvnopPwqrs

88 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1NtjklumOvnopPwqrs

89 return encoded_jwt 1NtjklumOvnopPwqrs

90 

91 

92async def get_current_user(token: str = Depends(oauth2_scheme)): 1abhicdefg

93 credentials_exception = HTTPException( 1QjklKABmJRnopLCDSqrsMEF

94 status_code=status.HTTP_401_UNAUTHORIZED, 

95 detail="Could not validate credentials", 

96 headers={"WWW-Authenticate": "Bearer"}, 

97 ) 

98 try: 1QjklKABmJRnopLCDSqrsMEF

99 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1QjklKABmJRnopLCDSqrsMEF

100 username = payload.get("sub") 1jklKABmJnopLCDqrsMEF

101 if username is None: 1jklKABmJnopLCDqrsMEF

102 raise credentials_exception 1K6LM

103 token_data = TokenData(username=username) 1jklABmJnopCDqrsEF

104 except InvalidTokenError: 1QKXRLSM

105 raise credentials_exception 1QXRS

106 user = get_user(fake_users_db, username=token_data.username) 1jklABmJnopCDqrsEF

107 if user is None: 1jklABmJnopCDqrsEF

108 raise credentials_exception 1AB7JCDEF

109 return user 1jklTmnopqrs

110 

111 

112async def get_current_active_user(current_user: User = Depends(get_current_user)): 1abhicdefg

113 if current_user.disabled: 1jklTmnopqrs

114 raise HTTPException(status_code=400, detail="Inactive user") 1lUps

115 return current_user 1jkTmnoqr

116 

117 

118@app.post("/token") 1abhicdefg

119async def login_for_access_token( 1abhicdefg

120 form_data: OAuth2PasswordRequestForm = Depends(), 

121) -> Token: 

122 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1txGjklumvyHnopwzIqrs

123 if not user: 1txGjklumvyHnopwzIqrs

124 raise HTTPException( 1xGWyHzI

125 status_code=status.HTTP_401_UNAUTHORIZED, 

126 detail="Incorrect username or password", 

127 headers={"WWW-Authenticate": "Bearer"}, 

128 ) 

129 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1tjklumvnopwqrs

130 access_token = create_access_token( 1tjklumvnopwqrs

131 data={"sub": user.username}, expires_delta=access_token_expires 

132 ) 

133 return Token(access_token=access_token, token_type="bearer") 1tjklumvnopwqrs

134 

135 

136@app.get("/users/me/") 1abhicdefg

137async def read_users_me(current_user: User = Depends(get_current_active_user)) -> User: 1abhicdefg

138 return current_user 1kmor

139 

140 

141@app.get("/users/me/items/") 1abhicdefg

142async def read_own_items(current_user: User = Depends(get_current_active_user)): 1abhicdefg

143 return [{"item_id": "Foo", "owner": current_user.username}] 1jTnq