Coverage for docs_src / security / tutorial004_py310.py: 100%
83 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1from datetime import datetime, timedelta, timezone 1abhicdefg
3import jwt 1abhicdefg
4from fastapi import Depends, FastAPI, HTTPException, status 1abhicdefg
5from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1abhicdefg
6from jwt.exceptions import InvalidTokenError 1abhicdefg
7from pwdlib import PasswordHash 1abhicdefg
8from pydantic import BaseModel 1abhicdefg
10# to get a string like this run:
11# openssl rand -hex 32
12SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abhicdefg
13ALGORITHM = "HS256" 1abhicdefg
14ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abhicdefg
17fake_users_db = { 1abhicdefg
18 "johndoe": {
19 "username": "johndoe",
20 "full_name": "John Doe",
21 "email": "[email protected]",
22 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc",
23 "disabled": False,
24 }
25}
28class Token(BaseModel): 1abhicdefg
29 access_token: str 1abcdefg
30 token_type: str 1abcdefg
33class TokenData(BaseModel): 1abhicdefg
34 username: str | None = None 1abhicdefg
37class User(BaseModel): 1abhicdefg
38 username: str 1abcdefg
39 email: str | None = None 1abhicdefg
40 full_name: str | None = None 1abhicdefg
41 disabled: bool | None = None 1abhicdefg
44class UserInDB(User): 1abhicdefg
45 hashed_password: str 1abcdefg
48password_hash = PasswordHash.recommended() 1abhicdefg
50DUMMY_HASH = password_hash.hash("dummypassword") 1abhicdefg
52oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1abhicdefg
54app = FastAPI() 1abhicdefg
57def verify_password(plain_password, hashed_password): 1abhicdefg
58 return password_hash.verify(plain_password, hashed_password) 1txGjklYumvyHnopZwzIqrs0
61def get_password_hash(password): 1abhicdefg
62 return password_hash.hash(password) 11l2U3p4s
65def get_user(db, username: str): 1abhicdefg
66 if username in db: 1txGjklABumvyHnopCDwzIqrsEF
67 user_dict = db[username] 1txjklumvynopwzqrs
68 return UserInDB(**user_dict) 1txjklumvynopwzqrs
71def authenticate_user(fake_db, username: str, password: str): 1abhicdefg
72 user = get_user(fake_db, username) 1txGjklumvyHnopwzIqrs
73 if not user: 1txGjklumvyHnopwzIqrs
74 verify_password(password, DUMMY_HASH) 1GVHI
75 return False 1GVHI
76 if not verify_password(password, user.hashed_password): 1txjklumvynopwzqrs
77 return False 1xWyz
78 return user 1tjklumvnopwqrs
81def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abhicdefg
82 to_encode = data.copy() 1NtjklumOvnopPwqrs
83 if expires_delta: 1NtjklumOvnopPwqrs
84 expire = datetime.now(timezone.utc) + expires_delta 1tjklumvnopwqrs
85 else:
86 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1N5OP
87 to_encode.update({"exp": expire}) 1NtjklumOvnopPwqrs
88 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1NtjklumOvnopPwqrs
89 return encoded_jwt 1NtjklumOvnopPwqrs
92async def get_current_user(token: str = Depends(oauth2_scheme)): 1abhicdefg
93 credentials_exception = HTTPException( 1QjklKABmJRnopLCDSqrsMEF
94 status_code=status.HTTP_401_UNAUTHORIZED,
95 detail="Could not validate credentials",
96 headers={"WWW-Authenticate": "Bearer"},
97 )
98 try: 1QjklKABmJRnopLCDSqrsMEF
99 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1QjklKABmJRnopLCDSqrsMEF
100 username = payload.get("sub") 1jklKABmJnopLCDqrsMEF
101 if username is None: 1jklKABmJnopLCDqrsMEF
102 raise credentials_exception 1K6LM
103 token_data = TokenData(username=username) 1jklABmJnopCDqrsEF
104 except InvalidTokenError: 1QKXRLSM
105 raise credentials_exception 1QXRS
106 user = get_user(fake_users_db, username=token_data.username) 1jklABmJnopCDqrsEF
107 if user is None: 1jklABmJnopCDqrsEF
108 raise credentials_exception 1AB7JCDEF
109 return user 1jklTmnopqrs
112async def get_current_active_user(current_user: User = Depends(get_current_user)): 1abhicdefg
113 if current_user.disabled: 1jklTmnopqrs
114 raise HTTPException(status_code=400, detail="Inactive user") 1lUps
115 return current_user 1jkTmnoqr
118@app.post("/token") 1abhicdefg
119async def login_for_access_token( 1abhicdefg
120 form_data: OAuth2PasswordRequestForm = Depends(),
121) -> Token:
122 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1txGjklumvyHnopwzIqrs
123 if not user: 1txGjklumvyHnopwzIqrs
124 raise HTTPException( 1xGWyHzI
125 status_code=status.HTTP_401_UNAUTHORIZED,
126 detail="Incorrect username or password",
127 headers={"WWW-Authenticate": "Bearer"},
128 )
129 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1tjklumvnopwqrs
130 access_token = create_access_token( 1tjklumvnopwqrs
131 data={"sub": user.username}, expires_delta=access_token_expires
132 )
133 return Token(access_token=access_token, token_type="bearer") 1tjklumvnopwqrs
136@app.get("/users/me/") 1abhicdefg
137async def read_users_me(current_user: User = Depends(get_current_active_user)) -> User: 1abhicdefg
138 return current_user 1kmor
141@app.get("/users/me/items/") 1abhicdefg
142async def read_own_items(current_user: User = Depends(get_current_active_user)): 1abhicdefg
143 return [{"item_id": "Foo", "owner": current_user.username}] 1jTnq