What We're Building
By the end of this stage your API will have:
- User registration with hashed passwords
- Login that returns a JWT token
- Protected routes that require a valid token
- Current user detection from token
This is exactly how real production APIs work — same pattern used by every major application.
How JWT Authentication Works
Since you build with NestJS you know this flow. Quick recap in FastAPI context:
1. User registers → password gets hashed → stored in database
2. User logs in → password verified → JWT token generated → sent to client
3. Client sends token in every request header
4. Server verifies token → extracts user info → allows or denies access
Installing Dependencies
pip install python-jose[cryptography] passlib[bcrypt] python-multipart
- python-jose — creates and verifies JWT tokens
- passlib[bcrypt] — hashes passwords with bcrypt
- python-multipart — needed for OAuth2 form login
Updated Project Structure
fastapi-learning/
├── venv/
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth.py ← NEW: all auth logic lives here
├── dependencies.py ← NEW: reusable dependencies
└── requirements.txt
Step 1 — Update models.py
Add password field to User:
# models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey from sqlalchemy.orm import relationship from sqlalchemy.sql import func from database import Base
class User(Base): __tablename__ = "users"
id = Column(Integer, primary_key=True, index=True) name = Column(String(50), nullable=False) email = Column(String(100), unique=True, nullable=False, index=True) password = Column(String(255), nullable=False) # hashed password stored here age = Column(Integer, nullable=False) city = Column(String(50), default="Unknown") is_active = Column(Boolean, default=True) created_at = Column(DateTime, server_default=func.now())
posts = relationship("Post", back_populates="author")
class Post(Base): __tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True) title = Column(String(100), nullable=False) content = Column(Text, nullable=False) published = Column(Boolean, default=False) created_at = Column(DateTime, server_default=func.now()) author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
author = relationship("User", back_populates="posts")
Step 2 — auth.py — All Auth Logic
# auth.py
from datetime import datetime, timedelta from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import HTTPException, status
# ── Configuration ───────────────────────────────── SECRET_KEY = "your-super-secret-key-change-this-in-production" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
# ── Password Hashing ────────────────────────────── pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str: """Convert plain password to bcrypt hash.""" return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool: """Check if plain password matches the stored hash.""" return pwd_context.verify(plain_password, hashed_password)
# ── JWT Token ───────────────────────────────────── def create_access_token(data: dict) -> str: """Create a JWT token with expiry.""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> dict: """Verify JWT token and return its payload.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"} )
Step 3 — dependencies.py — Get Current User
# dependencies.py
from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from database import get_db import auth import crud
# This tells FastAPI where the login endpoint is # Adds a lock icon on protected routes in /docs oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ): """Extract current user from JWT token.""" payload = auth.verify_token(token) user_id: int = payload.get("user_id")
if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload" )
user = crud.get_user(db, user_id) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User no longer exists" )
if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account is deactivated" )
return user
def get_current_active_user( current_user = Depends(get_current_user) ): """Same as get_current_user — alias for clarity.""" return current_user
Step 4 — Update schemas.py
Add auth-related schemas:
NoticePostCreateno longer hasauthor_id— logged in user's ID will be used automatically.
Step 5 — Update crud.py
# crud.py
from sqlalchemy.orm import Session import models import schemas import auth
# ── User CRUD ─────────────────────────────────────
def get_user(db: Session, user_id: int): return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str): return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 10): return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate): # Hash password before storing hashed = auth.hash_password(user.password)
db_user = models.User( name=user.name, email=user.email, password=hashed, # never store plain password age=user.age, city=user.city )
db.add(db_user) db.commit() db.refresh(db_user) return db_user
def authenticate_user(db: Session, email: str, password: str): """Verify email and password — return user if valid, None if not.""" user = get_user_by_email(db, email) if not user: return None if not auth.verify_password(password, user.password): return None return user
def update_user(db: Session, user_id: int, user: schemas.UserUpdate): db_user = get_user(db, user_id) if not db_user: return None update_data = user.model_dump(exclude_none=True) for field, value in update_data.items(): setattr(db_user, field, value) db.commit() db.refresh(db_user) return db_user
def delete_user(db: Session, user_id: int): db_user = get_user(db, user_id) if not db_user: return None db.delete(db_user) db.commit() return db_user
# ── Post CRUD ─────────────────────────────────────
def get_post(db: Session, post_id: int): return db.query(models.Post).filter(models.Post.id == post_id).first()
def get_posts( db: Session, skip: int = 0, limit: int = 10, published: bool | None = None, author_id: int | None = None ): query = db.query(models.Post) if published is not None: query = query.filter(models.Post.published == published) if author_id is not None: query = query.filter(models.Post.author_id == author_id) return query.offset(skip).limit(limit).all()
def create_post(db: Session, post: schemas.PostCreate, author_id: int): db_post = models.Post( title=post.title, content=post.content, author_id=author_id # use logged-in user's id ) db.add(db_post) db.commit() db.refresh(db_post) return db_post
def update_post(db: Session, post_id: int, post: schemas.PostUpdate): db_post = get_post(db, post_id) if not db_post: return None update_data = post.model_dump(exclude_none=True) for field, value in update_data.items(): setattr(db_post, field, value) db.commit() db.refresh(db_post) return db_post
def delete_post(db: Session, post_id: int): db_post = get_post(db, post_id) if not db_post: return None db.delete(db_post) db.commit() return db_post
Step 6 — main.py — Complete with Auth Routes
# main.py
from fastapi import FastAPI, HTTPException, Depends, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session import models import schemas import crud import auth from database import engine, get_db from dependencies import get_current_user
models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="Authenticated API", version="4.0.0")
# ── Auth Routes ───────────────────────────────────
@app.post("/auth/register", response_model=schemas.UserResponse, status_code=201) def register(user: schemas.UserCreate, db: Session = Depends(get_db)): existing = crud.get_user_by_email(db, user.email) if existing: raise HTTPException(status_code=400, detail="Email already registered") return crud.create_user(db, user)
@app.post("/auth/login", response_model=schemas.TokenResponse) def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)): user = crud.authenticate_user(db, credentials.email, credentials.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password" )
token = auth.create_access_token(data={"user_id": user.id})
return { "access_token": token, "token_type": "bearer", "user": user }
# OAuth2 login — for /docs Authorize button to work @app.post("/auth/token") def login_for_docs( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) ): user = crud.authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException(status_code=401, detail="Invalid credentials") token = auth.create_access_token(data={"user_id": user.id}) return {"access_token": token, "token_type": "bearer"}
# ── User Routes ───────────────────────────────────
@app.get("/users", response_model=list[schemas.UserResponse]) def get_users( skip: int = 0, limit: int = 10, db: Session = Depends(get_db), current_user = Depends(get_current_user) # protected route ): return crud.get_users(db, skip, limit)
@app.get("/users/me", response_model=schemas.UserResponse) def get_me(current_user = Depends(get_current_user)): """Get currently logged in user's profile.""" return current_user
@app.get("/users/{user_id}", response_model=schemas.UserResponse) def get_user( user_id: int, db: Session = Depends(get_db), current_user = Depends(get_current_user) ): user = crud.get_user(db, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") return user
@app.patch("/users/me", response_model=schemas.UserResponse) def update_me( user_update: schemas.UserUpdate, db: Session = Depends(get_db), current_user = Depends(get_current_user) ): """Update currently logged in user's profile.""" return crud.update_user(db, current_user.id, user_update)
@app.delete("/users/me", status_code=status.HTTP_204_NO_CONTENT) def delete_me( db: Session = Depends(get_db), current_user = Depends(get_current_user) ): """Delete currently logged in user's account.""" crud.delete_user(db, current_user.id)
# ── Post Routes ───────────────────────────────────
@app.post("/posts", response_model=schemas.PostResponse, status_code=201) def create_post( post: schemas.PostCreate, db: Session = Depends(get_db), current_user = Depends(get_current_user) ): return crud.create_post(db, post, author_id=current_user.id)
@app.get("/posts", response_model=list[schemas.PostResponse]) def get_posts( skip: int = 0, limit: int = 10, published: bool | None = None, db: Session = Depends(get_db) # no auth required — anyone can read posts ): return crud.get_posts(db, skip, limit, published)
@app.get("/posts/my", response_model=list[schemas.PostResponse]) def get_my_posts( db: Session = Depends(get_db), current_user = Depends(get_current_user) ): """Get all posts by currently logged in user.""" return crud.get_posts(db, author_id=current_user.id)
@app.get("/posts/{post_id}", response_model=schemas.PostWithAuthor) def get_post(post_id: int, db: Session = Depends(get_db)): post = crud.get_post(db, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found") return post
@app.patch("/posts/{post_id}", response_model=schemas.PostResponse) def update_post( post_id: int, post_update: schemas.PostUpdate, db: Session = Depends(get_db), current_user = Depends(get_current_user) ): post = crud.get_post(db, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found")
# Only author can update their own post if post.author_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to update this post")
return crud.update_post(db, post_id, post_update)
@app.delete("/posts/{post_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_post( post_id: int, db: Session = Depends(get_db), current_user = Depends(get_current_user) ): post = crud.get_post(db, post_id) if not post: raise HTTPException(status_code=404, detail="Post not found")
# Only author can delete their own post if post.author_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to delete this post")
crud.delete_post(db, post_id)
Testing Auth Flow in /docs
Step 1 — Register:
POST /auth/register
{
"name": "Gagan Singh",
"email": "gagan@email.com",
"password": "secret123",
"age": 22,
"city": "Delhi"
}
Step 2 — Login:
POST /auth/login
{
"email": "gagan@email.com",
"password": "secret123"
}
Response:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"user": {...}
}
Step 3 — Authorize in /docs:
Click the Authorize button (lock icon) at the top right of /docs. Enter your token. Now all protected routes work.
Step 4 — Test protected routes:
GET /users/me— returns your profilePOST /posts— creates a post as youPATCH /posts/1— works only if you're the author
Using Token in Thunder Client / Postman
Add header to every protected request:
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
This is the standard Bearer token format — same as your NestJS frontend uses.
Environment Variables — Secure Your Secret Key
Never hardcode secrets. Use environment variables:
Create a .env file:
SECRET_KEY=your-super-secret-key-change-this-in-production-make-it-long
DATABASE_URL=sqlite:///./app.db
ACCESS_TOKEN_EXPIRE_MINUTES=30
Install python-dotenv:
pip install python-dotenv
Create config.py:
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
secret_key: str
database_url: str = "sqlite:///./app.db"
access_token_expire_minutes: int = 30
class Config:
env_file = ".env"
settings = Settings()
Install pydantic-settings:
pip install pydantic-settings
Update auth.py to use settings:
from config import settings
SECRET_KEY = settings.secret_key
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
Update database.py:
from config import settings
DATABASE_URL = settings.database_url
Add .env to .gitignore — never commit secrets to git.
Role-Based Access — Admin vs User
A common pattern — restricting certain routes to admin users only:
Add role to User model:
class User(Base):
# ... existing fields
role = Column(String(20), default="user") # "user" or "admin"
Create admin dependency in dependencies.py:
def get_admin_user(current_user = Depends(get_current_user)):
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
return current_user
Use it on admin-only routes:
@app.delete("/admin/users/{user_id}", status_code=204)
def admin_delete_user(
user_id: int,
db: Session = Depends(get_db),
admin = Depends(get_admin_user) # only admins
):
crud.delete_user(db, user_id)
This is the same pattern as Guards in NestJS.
Password Reset Flow — How It Works
In real apps password reset needs:
- User requests reset with their email
- Generate a temporary token, save it, email it to user
- User clicks link with token
- Verify token, let user set new password
Basic implementation:
import secrets
def create_password_reset_token() -> str:
return secrets.token_urlsafe(32) # secure random token
@app.post("/auth/forgot-password")
def forgot_password(email: str, db: Session = Depends(get_db)):
user = crud.get_user_by_email(db, email)
if not user:
# Don't reveal if email exists or not — security best practice
return {"message": "If that email exists, a reset link has been sent"}
reset_token = create_password_reset_token()
# In real app: save token to database with expiry, send email
# For now just return it (in production NEVER return it directly)
return {"reset_token": reset_token, "message": "Use this token to reset password"}
For sending emails you'd use fastapi-mail library. We'll keep that for the final project.
Security Checklist
Things every production API must have:
✅ Passwords hashed with bcrypt — done
✅ JWT tokens with expiry — done
✅ Protected routes — done
✅ Author-only post editing — done
✅ Role-based access — done
✅ Secrets in environment variables — done
✅ Email normalization — done (lowercase in validator)
❌ Rate limiting — prevents brute force attacks
❌ HTTPS — always in production
❌ Input sanitization — prevent SQL injection (SQLAlchemy handles this)
❌ Refresh tokens — long-lived sessions
Rate limiting example with slowapi:
pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/auth/login")
@limiter.limit("5/minute") # max 5 login attempts per minute
def login(request: Request, credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
...
Final Project Structure
fastapi-learning/
├── venv/
├── .env ← secrets (never commit)
├── .gitignore
├── main.py ← routes
├── database.py ← db connection
├── models.py ← SQLAlchemy tables
├── schemas.py ← Pydantic models
├── crud.py ← db operations
├── auth.py ← JWT and password logic
├── dependencies.py ← reusable dependencies
├── config.py ← settings from .env
└── requirements.txt
Exercise 🏋️
Add these features to the current project:
1. Change Password Route:
PATCH /users/me/password
{
"current_password": "secret123",
"new_password": "newSecret456"
}
- Verify current password before allowing change
- Hash and save new password
2. Deactivate Account:
PATCH /users/me/deactivate
- Sets
is_active = False - Deactivated users cannot login
3. Admin Routes:
GET /admin/users— list all users including inactive (admin only)DELETE /admin/users/{id}— force delete any user (admin only)PATCH /admin/users/{id}/activate— reactivate a deactivated user
4. Post Ownership:
- Regular users can only edit/delete their own posts
- Admin can edit/delete any post
No comments:
Post a Comment