- The AppSec Augury
- Posts
- 0x04: How to do JWT Authentication in Python & FastAPI (The Right Way)
0x04: How to do JWT Authentication in Python & FastAPI (The Right Way)
JWTs are great, if you can use them right.
Ah, JSON Web Tokens. A stateless standard involving signed payloads that can be used to verify identity.

Asterisk on that, actually.
I used to do penetration testing, and one of the most common issues I would find with JWTs is this: Let’s say you have a JWT stored in the browser that gets stolen somehow (XSS, physical attack, doesn’t really matter). If an attacker stole a user’s session cookie, then the user could feasibly log out of that session, the cookie would be invalidated, and then the web application would have to issue a new one.
JWTs don’t work like that.
JWTs have inherent expiry times, so they’re valid for a certain amount of time from when they’re issued. However, this also means that, inherently, if a JWT is ever compromised, that means that that JWT is valid until the expiry time is up. And depending on the nature of the JWT, that could be a long time. An attacker could have persistent access to that account with the victim having no way of remediating it.
It’s not the most likely thing to happen, and often requires a lot of luck, but you’ll save some good face knowing that if a JWT ever does get stolen, your users have a way of getting around it. Plus, it’s also helpful for general piece of mind and the ability to revoke sessions, which is near-standard in any application that allows multiple logins for the same account.
So let’s take a look at how to implement JWT authentication using Python and FastAPI, and do it right (though, since we’re not going to be using JWKs, this still isn’t a bulletproof, prod-ready system). As a note, the principles discussed here apply to any language and framework, we’ll just be using Python and FastAPI to give a concrete example.
Let’s dive in!
Background
JWTs
If this is your first time encountering JWTs, no worries, they are a relatively new technology. In summary, they allow stateless claims to be made by clients that they have been authenticated by a particular server.
Translation: They’re a handy substitute for session cookies.
These are represented as base64 encoded JSON data, and tagged with either a digital signature or MAC. For more information, I highly recommend reading the Auth0 explanation on JWTs.
Step 0: Project Setup
Okay, so we need a FastAPI app. Let’s start one up, and we don’t need anything fancy for now. I’ll be focusing more on the security side of things, so we won’t be using an actual database for the sake of brevity. We’ll also be basing this on the actual FastAPI security docs example, with some tweaks.
You’ll need a few things for this:
Python (I’m using 3.11)
Poetry (I’m using version 1.7)
A good code editor (I’m using VSCode)
Some elbow grease
Your brain
To start, let’s get a Poetry project going in the root directory of your project:
$ mkdir jwt-auth-demo$ cd jwt-auth-demo$ poetry init
You’ll need to install the following packages:
fastapi[all]
python-jose[cryptography]
passlib[argon2]
uvicorn
python-multipart
requests
To install a package with poetry, either declare it when initializing the project, or with the poetry add command:
$ poetry add \ fastapi[all] \ python-jose[cryptography] \ passlib[argon2] \ uvicorn \ python-multipart \ requests
We’ll also make a directory to hold the source code itself:
$ mkdir jwt_auth_demo
Then, make the following files within jwt_auth_demo:
__init__.py (blank, this just lets us use the folder as a Python module)
errors.py (this will let us keep errors in one place)
main.py (the bulk of the code)
schemas.py (the data models)
security.py (security-related code)
Step 1: Data Models
We’re going to need to keep track of two primary things: Users, and their sessions. We also might need some classes to make keeping track of tokens and the data contained within a little easier.
Add the following to the schemas.py file:
from pydantic import BaseModelfrom datetime import datetime# Response model received when logging inclass Token(BaseModel): access_token: str token_type: str# The data contained within a tokenclass TokenData(BaseModel): sub: str | None = None # The subject, or username in our case session: str | None = None exp: datetime | None = None # The expiry of the token# Basic user dataclass User(BaseModel): username: str email: str | None = None full_name: str | None = None disabled: bool | None = None# Full user data within the databaseclass UserInDB(User): hashed_password: str# Session dataclass Session(BaseModel): id: str username: str active: bool
Notice how UserInDB inherits from the User class. We don’t need to show the password, hashed or otherwise, in our responses!
Also, the TokenData contains our payload for the JWT. The sub and exp variables are actually in the JWT standard, standing for the subject and expiry time of the token, respectively.
Step 2: Defining Errors
We’ll need two types of errors: One for issues with authentication, and one for issues with the JWT itself.
Add the following code to the errors.py file:
from fastapi import HTTPException, statuscredentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"},)password_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"},)
Both of these return a 401 status code, just with different error messages.
Step 3: Security Functions
Our security.py file deals with two things in this project: Passwords, and creating and verifying the integrity of access tokens. The actual session management happens in main.py (which, in a real project, may well be broken down further).
First, let’s import our necessary modules:
from datetime import datetime, timedelta # Managing JWT expirationsfrom fastapi.security import OAuth2PasswordBearer # Our Password flowfrom passlib.context import CryptContext # Hashing and verifying passwordsfrom jose import jwt # The JWT functionality itselfimport secrets # Generating secret keysfrom jwt_auth_demo.errors import credentials_exceptionfrom jwt_auth_demo.schemas import TokenData
Next, we’ll set up our JWT configuration, as well as initialize our oauth scheme (the scheme FastAPI uses for password login), and get an Argon2 password context up and running so that we can hash and verify passwords.
SECRET_KEY = secrets.token_urlsafe()ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
Then, let’s add some functions for password management. In a realistic scenario, you’d only need to hash and verify, but I’m going to add one more function just to make things a little easier: a prompt for a testing password so that when the application boots up, we can enter a password to test our API.
Bonus, these hash and verify functions work for any KDF, so you could use bcrypt, scrypt, pbkdf2, whatever you need!
def prompt_password(): return input("Enter a password for testing: ")def hash_password(password): return pwd_context.hash(password)def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)
Finally, our two functions for creating and verifying our access tokens. These are our fabled JWT functions.
def create_access_token( data: TokenData, expires_delta: timedelta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),) -> str: expiry = datetime.utcnow() + expires_delta data.exp = expiry encoded_jwt = jwt.encode(dict(data), SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtdef verify_access_token(token: str) -> TokenData: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) token_data = TokenData(**payload) sub = token_data.sub session = token_data.session if not (sub and session): raise credentials_exception return token_data
One thing I want to draw your attention to is the use of the dict() function going into the jwt.encode() call. jwt.encode takes only MutableMapping objects (or, for our purposes, dictionaries), so we have to do that conversion first. Also, the use of the optional parameter expires_delta, defaulting to the time in minutes we set earlier, ensures that we don’t need to specify a validity duration.
Step 4: Putting it All Together
Finally, let’s look at main.py.
Our imports:
from typing import Annotated, Anyfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordRequestFormfrom jose import JWTErrorimport uuid # Plan on using UUIDs for session IDsfrom jwt_auth_demo.security import ( hash_password, prompt_password, verify_password, oauth2_scheme, create_access_token, verify_access_token,)from jwt_auth_demo.schemas import Token, TokenData, User, UserInDB, Sessionfrom jwt_auth_demo.errors import credentials_exception, password_exception
Then, we have our “database,” which is simply a couple of dictionaries for “tables.” We’ll also define functions for retrieving users and sessions:
# --- DATABASE ---fake_users_db = { "test": { "username": "test", "full_name": "Test Test", "email": "[email protected]", "hashed_password": hash_password(prompt_password()), "disabled": False, }}fake_sessions_db = dict()def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)def get_session(db, session): if session in db: session_dict = db[session] return Session(**session_dict)
These next four functions allow us to create sessions, validate that the session ID matches the user ID, retrieve the user’s own sessions, and then revoke the user’s sessions. This is useful: a number of applications that allow sign in from multiple locations also allow a user to “sign out of all devices,” or revoke all of their sessions.
# --- SESSIONS ---def create_session(sessions_db, username: str): session_id = str(uuid.uuid4()) session = Session(id=session_id, username=username, active=True) sessions_db[session_id] = dict(session) return session_iddef validate_session(sessions_db, username: str, session_id: str): session: Session | None = get_session(sessions_db, session_id) if not session: return False user_match = session.username == username return user_match and session.activedef get_own_sessions(sessions_db, username: str) -> dict[str, Any]: all_session_ids = list(fake_sessions_db.keys()) own_sessions = dict() for session_id in all_session_ids: session = Session(**sessions_db[session_id]) if session.username == username: own_sessions[session_id] = sessions_db[session_id] return own_sessionsdef deactivate_own_sessions(sessions_db, username: str): own_sessions: dict = get_own_sessions(sessions_db, username) own_session_ids = list(own_sessions.keys()) for session_id in own_session_ids: sessions_db[session_id]["active"] = False
Next, we have functions that deal with authentication, namely retrieving active users and authenticating them (note the Depends(oauth2_scheme) call, these refer to our Bearer token in the header):
# --- AUTH ---def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return userasync def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): try: token_data = verify_access_token(token) if not validate_session(fake_sessions_db, token_data.sub, token_data.session): raise credentials_exception except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.sub) if user is None: raise credentials_exception return userasync def get_current_active_user( current_user: Annotated[User, Depends(get_current_user)]): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user
Then, finally, we have the meat of the application itself. Note that when we defined our oauth scheme we also gave a token URL as a parameter: this maps to an endpoint in the app that will deliver access tokens upon successful login. This takes the form of an OAuth2PasswordRequestForm with regards to data, so if you have JSON input you’ll need to change this up a little bit.
# --- APP ---app = FastAPI()@app.post("/token", response_model=Token)async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()]): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise password_exception session = create_session(fake_sessions_db, user.username) token_data = TokenData(sub=user.username, session=session) access_token = create_access_token(data=token_data) return {"access_token": access_token, "token_type": "bearer"}
Then, endpoints for listing the current user and their items. These are useful for testing but also may have other applications. These are fairly straightforward, but they depend on the get_current_active_user() functions we saw earlier:
@app.get("/users/me/", response_model=User)async def read_users_me( current_user: Annotated[User, Depends(get_current_active_user)]): return [email protected]("/users/me/items/")async def read_own_items( current_user: Annotated[User, Depends(get_current_active_user)]): return [{"item_id": "Foo", "owner": current_user.username}]
Also note that for the above, the response model is User, not UserInDB (again, don’t want those password leaks).
Finally, we have functions to manage sessions. The important one to note is the revoke_all_sessions endpoint, which actually does the full revocation of the sessions.
# This is literally only for debug [email protected]("/sessions", response_model=list[Session])async def read_all_sessions(): return list(fake_sessions_db.values())@app.get("/users/me/sessions", response_model=list[Session])async def read_own_sessions( current_user: Annotated[User, Depends(get_current_active_user)]): return get_own_sessions(fake_sessions_db, current_user.username)@app.delete("/users/me/sessions", status_code=status.HTTP_204_NO_CONTENT)async def revoke_own_sessions( current_user: Annotated[User, Depends(get_current_active_user)]): deactivate_own_sessions(fake_sessions_db, current_user.username)
That’s all the code for the application itself. Time to test it!
Step 5: Testing
The primary use case for this setup is preventing the sort of attack where access tokens are stored and then reused, so let’s see if we can simulate that. We’ll be a little quick and dirty for this, I may go more in depth in FastAPI testing in a future article.
In a terminal, run the application:
$ poetry install$ poetry shell$ uvicorn jwt_auth_demo.main:app
Make up a password and type it in when prompted. Then, in a new Python file in the same project (I named mine test_session.py):
import requestsfrom jwt_auth_demo.security import prompt_passwordHOST = "http://localhost:8000"headers = {"accept": "application/json"}# A test for the access tokendef get_me(headers: dict): r = requests.get( f"{HOST}/users/me", headers=headers, ) r.raise_for_status() print(r.json())r = requests.post( url=f"{HOST}/token", data={"username": "test", "password": prompt_password()}, headers=headers,)r.raise_for_status()token = r.json()["access_token"]headers.update({"Authorization": f"Bearer {token}"})get_me(headers)# If the user logs out, the token is deleted from their browser.# But if an attacker snags it...get_me(headers)# Now we revoke the sessionsr = requests.delete( f"{HOST}/users/me/sessions", headers=headers, )r.raise_for_status()# Try again, and this time the request *should* fail...try: get_me(headers) raise Exception("Uh oh, we still have access.")except requests.HTTPError: print("Success! Sessions revoked.")
Then, in another terminal, run the tests:
$ poetry run python test_sessions.py Enter a password for testing: <obfuscated>{'username': 'test', 'email': '[email protected]', 'full_name': 'Test Test', 'disabled': False}{'username': 'test', 'email': '[email protected]', 'full_name': 'Test Test', 'disabled': False}Success! Sessions revoked.
Bingo! We’ve successfully implemented sessions with JWTs!
Conclusion
This was an introduction to using JWTs with Python and FastAPI that allows a user to truly revoke sessions. This is a more secure and more feature-rich way of working with JWTs, since it not only mitigates access token theft, but also allows a user to sign out of multiple sessions at once.
As I mentioned, the principles here apply to any language and framework.
Reply