Coverage for docs_src / security / tutorial005_py310.py: 100%
95 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-04-06 01:24 +0000
1from datetime import datetime, timedelta, timezone 1abhicdefg
3import jwt 1abhicdefg
4from fastapi import Depends, FastAPI, HTTPException, Security, status 1abhicdefg
5from fastapi.security import ( 1abhicdefg
6 OAuth2PasswordBearer,
7 OAuth2PasswordRequestForm,
8 SecurityScopes,
9)
10from jwt.exceptions import InvalidTokenError 1abhicdefg
11from pwdlib import PasswordHash 1abhicdefg
12from pydantic import BaseModel, ValidationError 1abhicdefg
14# to get a string like this run:
15# openssl rand -hex 32
16SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abhicdefg
17ALGORITHM = "HS256" 1abhicdefg
18ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abhicdefg
21fake_users_db = { 1abhicdefg
22 "johndoe": {
23 "username": "johndoe",
24 "full_name": "John Doe",
25 "email": "[email protected]",
26 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc",
27 "disabled": False,
28 },
29 "alice": {
30 "username": "alice",
31 "full_name": "Alice Chains",
32 "email": "[email protected]",
33 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE",
34 "disabled": True,
35 },
36}
39class Token(BaseModel): 1abhicdefg
40 access_token: str 1abcdefg
41 token_type: str 1abcdefg
44class TokenData(BaseModel): 1abhicdefg
45 username: str | None = None 1abhicdefg
46 scopes: list[str] = [] 1abhicdefg
49class User(BaseModel): 1abhicdefg
50 username: str 1abcdefg
51 email: str | None = None 1abhicdefg
52 full_name: str | None = None 1abhicdefg
53 disabled: bool | None = None 1abhicdefg
56class UserInDB(User): 1abhicdefg
57 hashed_password: str 1abcdefg
60password_hash = PasswordHash.recommended() 1abhicdefg
62DUMMY_HASH = password_hash.hash("dummypassword") 1abhicdefg
64oauth2_scheme = OAuth2PasswordBearer( 1abhicdefg
65 tokenUrl="token",
66 scopes={"me": "Read information about the current user.", "items": "Read items."},
67)
69app = FastAPI() 1abhicdefg
72def verify_password(plain_password, hashed_password): 1abhicdefg
73 return password_hash.verify(plain_password, hashed_password) 1yLPjskpt5zMANQlumqv6BORnworx7
76def get_password_hash(password): 1abhicdefg
77 return password_hash.hash(password) 189!#
80def get_user(db, username: str): 1abhicdefg
81 if username in db: 1yLPjskptEFzMANQlumqvGHBORnworxIJ
82 user_dict = db[username] 1yLjskptzMANlumqvBOnworx
83 return UserInDB(**user_dict) 1yLjskptzMANlumqvBOnworx
86def authenticate_user(fake_db, username: str, password: str): 1abhicdefg
87 user = get_user(fake_db, username) 1yLPjskptzMANQlumqvBORnworx
88 if not user: 1yLPjskptzMANQlumqvBORnworx
89 verify_password(password, DUMMY_HASH) 1P1QR
90 return False 1P1QR
91 if not verify_password(password, user.hashed_password): 1yLjskptzMANlumqvBOnworx
92 return False 1LMNO
93 return user 1yjskptzDAlumqvBnworx
96def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abhicdefg
97 to_encode = data.copy() 1YyjskptzDZAlumqv0Bnworx
98 if expires_delta: 1YyjskptzDZAlumqv0Bnworx
99 expire = datetime.now(timezone.utc) + expires_delta 1yjskptzDAlumqvBnworx
100 else:
101 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1Y$Z0
102 to_encode.update({"exp": expire}) 1YyjskptzDZAlumqv0Bnworx
103 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1YyjskptzDZAlumqv0Bnworx
104 return encoded_jwt 1YyjskptzDZAlumqv0Bnworx
107async def get_current_user( 1abhicdefg
108 security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
109):
110 if security_scopes.scopes: 1VjskptSEFCKWlumqvTGHXnworxUIJ
111 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 1VjkptSEFCKWlmqvTGHXnorxUIJ
112 else:
113 authenticate_value = "Bearer" 1s2uw
114 credentials_exception = HTTPException( 1VjskptSEFCKWlumqvTGHXnworxUIJ
115 status_code=status.HTTP_401_UNAUTHORIZED,
116 detail="Could not validate credentials",
117 headers={"WWW-Authenticate": authenticate_value},
118 )
119 try: 1VjskptSEFCKWlumqvTGHXnworxUIJ
120 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1VjskptSEFCKWlumqvTGHXnworxUIJ
121 username: str = payload.get("sub") 1jskptSEFCKlumqvTGHnworxUIJ
122 if username is None: 1jskptSEFCKlumqvTGHnworxUIJ
123 raise credentials_exception 1S%TU
124 scope: str = payload.get("scope", "") 1jskptEFCKlumqvGHnworxIJ
125 token_scopes = scope.split(" ") 1jskptEFCKlumqvGHnworxIJ
126 token_data = TokenData(scopes=token_scopes, username=username) 1jskptEFCKlumqvGHnworxIJ
127 except (InvalidTokenError, ValidationError): 1VS3WTXU
128 raise credentials_exception 1V3WX
129 user = get_user(fake_users_db, username=token_data.username) 1jskptEFCKlumqvGHnworxIJ
130 if user is None: 1jskptEFCKlumqvGHnworxIJ
131 raise credentials_exception 1EFKGHIJ
132 for scope in security_scopes.scopes: 1jskptCDlumqvnworx
133 if scope not in token_data.scopes: 1jkptCDlmqvnorx
134 raise HTTPException( 1t'vx
135 status_code=status.HTTP_401_UNAUTHORIZED,
136 detail="Not enough permissions",
137 headers={"WWW-Authenticate": authenticate_value},
138 )
139 return user 1jskpCDlumqnwor
142async def get_current_active_user( 1abhicdefg
143 current_user: User = Security(get_current_user, scopes=["me"]),
144):
145 if current_user.disabled: 1jkpCDlmqnor
146 raise HTTPException(status_code=400, detail="Inactive user") 1pDqr
147 return current_user 1jk4Clmno
150@app.post("/token") 1abhicdefg
151async def login_for_access_token( 1abhicdefg
152 form_data: OAuth2PasswordRequestForm = Depends(),
153) -> Token:
154 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1yLPjskptzMANQlumqvBORnworx
155 if not user: 1yLPjskptzMANQlumqvBORnworx
156 raise HTTPException(status_code=400, detail="Incorrect username or password") 1LPM1NQOR
157 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1yjskptzDAlumqvBnworx
158 access_token = create_access_token( 1yjskptzDAlumqvBnworx
159 data={"sub": user.username, "scope": " ".join(form_data.scopes)},
160 expires_delta=access_token_expires,
161 )
162 return Token(access_token=access_token, token_type="bearer") 1yjskptzDAlumqvBnworx
165@app.get("/users/me/") 1abhicdefg
166async def read_users_me(current_user: User = Depends(get_current_active_user)) -> User: 1abhicdefg
167 return current_user 1kCmo
170@app.get("/users/me/items/") 1abhicdefg
171async def read_own_items( 1abhicdefg
172 current_user: User = Security(get_current_active_user, scopes=["items"]),
173):
174 return [{"item_id": "Foo", "owner": current_user.username}] 1j4ln
177@app.get("/status/") 1abhicdefg
178async def read_system_status(current_user: User = Depends(get_current_user)): 1abhicdefg
179 return {"status": "ok"} 1s2uw