Coverage for docs_src / security / tutorial004_an_py310.py: 100%
84 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1from datetime import datetime, timedelta, timezone 1abhicdefg
2from typing import Annotated 1abhicdefg
4import jwt 1abhicdefg
5from fastapi import Depends, FastAPI, HTTPException, status 1abhicdefg
6from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1abhicdefg
7from jwt.exceptions import InvalidTokenError 1abhicdefg
8from pwdlib import PasswordHash 1abhicdefg
9from pydantic import BaseModel 1abhicdefg
11# to get a string like this run:
12# openssl rand -hex 32
13SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abhicdefg
14ALGORITHM = "HS256" 1abhicdefg
15ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abhicdefg
18fake_users_db = { 1abhicdefg
19 "johndoe": {
20 "username": "johndoe",
21 "full_name": "John Doe",
22 "email": "[email protected]",
23 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc",
24 "disabled": False,
25 }
26}
29class Token(BaseModel): 1abhicdefg
30 access_token: str 1abcdefg
31 token_type: str 1abcdefg
34class TokenData(BaseModel): 1abhicdefg
35 username: str | None = None 1abhicdefg
38class User(BaseModel): 1abhicdefg
39 username: str 1abcdefg
40 email: str | None = None 1abhicdefg
41 full_name: str | None = None 1abhicdefg
42 disabled: bool | None = None 1abhicdefg
45class UserInDB(User): 1abhicdefg
46 hashed_password: str 1abcdefg
49password_hash = PasswordHash.recommended() 1abhicdefg
51DUMMY_HASH = password_hash.hash("dummypassword") 1abhicdefg
53oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1abhicdefg
55app = FastAPI() 1abhicdefg
58def verify_password(plain_password, hashed_password): 1abhicdefg
59 return password_hash.verify(plain_password, hashed_password) 1txGjklYuHvyImnoZwzJpqr0
62def get_password_hash(password): 1abhicdefg
63 return password_hash.hash(password) 11l2U3o4r
66def get_user(db, username: str): 1abhicdefg
67 if username in db: 1txGjklABuHvyImnoCDwzJpqrEF
68 user_dict = db[username] 1txjklusvymnowzpqr
69 return UserInDB(**user_dict) 1txjklusvymnowzpqr
72def authenticate_user(fake_db, username: str, password: str): 1abhicdefg
73 user = get_user(fake_db, username) 1txGjkluHvyImnowzJpqr
74 if not user: 1txGjkluHvyImnowzJpqr
75 verify_password(password, DUMMY_HASH) 1GHIJ
76 return False 1GHIJ
77 if not verify_password(password, user.hashed_password): 1txjklusvymnowzpqr
78 return False 1xXyz
79 return user 1tjklusvmnowpqr
82def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abhicdefg
83 to_encode = data.copy() 1NtjklusOvmnoPwpqr
84 if expires_delta: 1NtjklusOvmnoPwpqr
85 expire = datetime.now(timezone.utc) + expires_delta 1tjklusvmnowpqr
86 else:
87 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1N5OP
88 to_encode.update({"exp": expire}) 1NtjklusOvmnoPwpqr
89 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1NtjklusOvmnoPwpqr
90 return encoded_jwt 1NtjklusOvmnoPwpqr
93async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): 1abhicdefg
94 credentials_exception = HTTPException( 1QjklKABRsSmnoLCDTpqrMEF
95 status_code=status.HTTP_401_UNAUTHORIZED,
96 detail="Could not validate credentials",
97 headers={"WWW-Authenticate": "Bearer"},
98 )
99 try: 1QjklKABRsSmnoLCDTpqrMEF
100 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1QjklKABRsSmnoLCDTpqrMEF
101 username = payload.get("sub") 1jklKABsWmnoLCDpqrMEF
102 if username is None: 1jklKABsWmnoLCDpqrMEF
103 raise credentials_exception 1KWLM
104 token_data = TokenData(username=username) 1jklABsVmnoCDpqrEF
105 except InvalidTokenError: 1QKRSLTM
106 raise credentials_exception 1QRST
107 user = get_user(fake_users_db, username=token_data.username) 1jklABsVmnoCDpqrEF
108 if user is None: 1jklABsVmnoCDpqrEF
109 raise credentials_exception 1ABVCDEF
110 return user 1jklsUmnopqr
113async def get_current_active_user( 1abhicdefg
114 current_user: Annotated[User, Depends(get_current_user)],
115):
116 if current_user.disabled: 1jklsUmnopqr
117 raise HTTPException(status_code=400, detail="Inactive user") 1lUor
118 return current_user 1jksmnpq
121@app.post("/token") 1abhicdefg
122async def login_for_access_token( 1abhicdefg
123 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
124) -> Token:
125 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1txGjkluHvyImnowzJpqr
126 if not user: 1txGjkluHvyImnowzJpqr
127 raise HTTPException( 1xGXHyIzJ
128 status_code=status.HTTP_401_UNAUTHORIZED,
129 detail="Incorrect username or password",
130 headers={"WWW-Authenticate": "Bearer"},
131 )
132 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1tjklusvmnowpqr
133 access_token = create_access_token( 1tjklusvmnowpqr
134 data={"sub": user.username}, expires_delta=access_token_expires
135 )
136 return Token(access_token=access_token, token_type="bearer") 1tjklusvmnowpqr
139@app.get("/users/me/") 1abhicdefg
140async def read_users_me( 1abhicdefg
141 current_user: Annotated[User, Depends(get_current_active_user)],
142) -> User:
143 return current_user 1ksnq
146@app.get("/users/me/items/") 1abhicdefg
147async def read_own_items( 1abhicdefg
148 current_user: Annotated[User, Depends(get_current_active_user)],
149):
150 return [{"item_id": "Foo", "owner": current_user.username}] 1j6mp