Authentication & Security in Fast APIs

What We're Building

By the end of this stage your API will have:

  • User registration with hashed passwords
  • Login that returns a JWT token
  • Protected routes that require a valid token
  • Current user detection from token

This is exactly how real production APIs work — same pattern used by every major application.


How JWT Authentication Works

Since you build with NestJS you know this flow. Quick recap in FastAPI context:

1. User registers → password gets hashed → stored in database
2. User logs in → password verified → JWT token generated → sent to client
3. Client sends token in every request header
4. Server verifies token → extracts user info → allows or denies access

Installing Dependencies

pip install python-jose[cryptography] passlib[bcrypt] python-multipart
  • python-jose — creates and verifies JWT tokens
  • passlib[bcrypt] — hashes passwords with bcrypt
  • python-multipart — needed for OAuth2 form login

Updated Project Structure

fastapi-learning/
├── venv/
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth.py            ← NEW: all auth logic lives here
├── dependencies.py    ← NEW: reusable dependencies
└── requirements.txt

Step 1 — Update models.py

Add password field to User:


    # models.py

    from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey
    from sqlalchemy.orm import relationship
    from sqlalchemy.sql import func
    from database import Base


    class User(Base):
        __tablename__ = "users"

        id = Column(Integer, primary_key=True, index=True)
        name = Column(String(50), nullable=False)
        email = Column(String(100), unique=True, nullable=False, index=True)
        password = Column(String(255), nullable=False)      # hashed password stored here
        age = Column(Integer, nullable=False)
        city = Column(String(50), default="Unknown")
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, server_default=func.now())

        posts = relationship("Post", back_populates="author")


    class Post(Base):
        __tablename__ = "posts"

        id = Column(Integer, primary_key=True, index=True)
        title = Column(String(100), nullable=False)
        content = Column(Text, nullable=False)
        published = Column(Boolean, default=False)
        created_at = Column(DateTime, server_default=func.now())
        author_id = Column(Integer, ForeignKey("users.id"), nullable=False)

        author = relationship("User", back_populates="posts")


Step 2 — auth.py — All Auth Logic


    # auth.py

    from datetime import datetime, timedelta
    from jose import JWTError, jwt
    from passlib.context import CryptContext
    from fastapi import HTTPException, status

    # ── Configuration ─────────────────────────────────
    SECRET_KEY = "your-super-secret-key-change-this-in-production"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30

    # ── Password Hashing ──────────────────────────────
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


    def hash_password(password: str) -> str:
        """Convert plain password to bcrypt hash."""
        return pwd_context.hash(password)


    def verify_password(plain_password: str, hashed_password: str) -> bool:
        """Check if plain password matches the stored hash."""
        return pwd_context.verify(plain_password, hashed_password)


    # ── JWT Token ─────────────────────────────────────
    def create_access_token(data: dict) -> str:
        """Create a JWT token with expiry."""
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


    def verify_token(token: str) -> dict:
        """Verify JWT token and return its payload."""
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            return payload
        except JWTError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid or expired token",
                headers={"WWW-Authenticate": "Bearer"}
            )


Step 3 — dependencies.py — Get Current User


    # dependencies.py

    from fastapi import Depends, HTTPException, status
    from fastapi.security import OAuth2PasswordBearer
    from sqlalchemy.orm import Session
    from database import get_db
    import auth
    import crud

    # This tells FastAPI where the login endpoint is
    # Adds a lock icon on protected routes in /docs
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


    def get_current_user(
        token: str = Depends(oauth2_scheme),
        db: Session = Depends(get_db)
    ):
        """Extract current user from JWT token."""
        payload = auth.verify_token(token)
        user_id: int = payload.get("user_id")

        if user_id is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token payload"
            )

        user = crud.get_user(db, user_id)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="User no longer exists"
            )

        if not user.is_active:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Account is deactivated"
            )

        return user


    def get_current_active_user(
        current_user = Depends(get_current_user)
    ):
        """Same as get_current_user — alias for clarity."""
        return current_user


Step 4 — Update schemas.py

Add auth-related schemas:


    # schemas.py

    from pydantic import BaseModel, Field, field_validator
    from datetime import datetime


    # ── User Schemas ─────────────────────────────────

    class UserCreate(BaseModel):
        name: str = Field(min_length=2, max_length=50)
        email: str
        password: str = Field(min_length=8)
        age: int = Field(ge=0, le=150)
        city: str = Field(default="Unknown")

        @field_validator("email")
        @classmethod
        def normalize_email(cls, value: str) -> str:
            return value.lower().strip()

        @field_validator("password")
        @classmethod
        def password_strength(cls, value: str) -> str:
            if value.isdigit():
                raise ValueError("Password cannot be only numbers")
            if value.isalpha():
                raise ValueError("Password must include at least one number")
            return value


    class UserUpdate(BaseModel):
        name: str | None = Field(default=None, min_length=2)
        age: int | None = Field(default=None, ge=0, le=150)
        city: str | None = None


    class UserResponse(BaseModel):
        id: int
        name: str
        email: str
        age: int
        city: str
        is_active: bool
        created_at: datetime

        model_config = {"from_attributes": True}


    # ── Auth Schemas ─────────────────────────────────

    class LoginRequest(BaseModel):
        email: str
        password: str


    class TokenResponse(BaseModel):
        access_token: str
        token_type: str = "bearer"
        user: UserResponse


    # ── Post Schemas ─────────────────────────────────

    class PostCreate(BaseModel):
        title: str = Field(min_length=5, max_length=100)
        content: str = Field(min_length=20)


    class PostUpdate(BaseModel):
        title: str | None = Field(default=None, min_length=5)
        content: str | None = Field(default=None, min_length=20)
        published: bool | None = None


    class PostResponse(BaseModel):
        id: int
        title: str
        content: str
        published: bool
        author_id: int
        created_at: datetime

        model_config = {"from_attributes": True}


    class PostWithAuthor(BaseModel):
        id: int
        title: str
        content: str
        published: bool
        created_at: datetime
        author: UserResponse

        model_config = {"from_attributes": True}

Notice PostCreate no longer has author_id — logged in user's ID will be used automatically.

Step 5 — Update crud.py


    # crud.py

    from sqlalchemy.orm import Session
    import models
    import schemas
    import auth


    # ── User CRUD ─────────────────────────────────────

    def get_user(db: Session, user_id: int):
        return db.query(models.User).filter(models.User.id == user_id).first()


    def get_user_by_email(db: Session, email: str):
        return db.query(models.User).filter(models.User.email == email).first()


    def get_users(db: Session, skip: int = 0, limit: int = 10):
        return db.query(models.User).offset(skip).limit(limit).all()


    def create_user(db: Session, user: schemas.UserCreate):
        # Hash password before storing
        hashed = auth.hash_password(user.password)

        db_user = models.User(
            name=user.name,
            email=user.email,
            password=hashed,        # never store plain password
            age=user.age,
            city=user.city
        )

        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        return db_user


    def authenticate_user(db: Session, email: str, password: str):
        """Verify email and password — return user if valid, None if not."""
        user = get_user_by_email(db, email)
        if not user:
            return None
        if not auth.verify_password(password, user.password):
            return None
        return user


    def update_user(db: Session, user_id: int, user: schemas.UserUpdate):
        db_user = get_user(db, user_id)
        if not db_user:
            return None
        update_data = user.model_dump(exclude_none=True)
        for field, value in update_data.items():
            setattr(db_user, field, value)
        db.commit()
        db.refresh(db_user)
        return db_user


    def delete_user(db: Session, user_id: int):
        db_user = get_user(db, user_id)
        if not db_user:
            return None
        db.delete(db_user)
        db.commit()
        return db_user


    # ── Post CRUD ─────────────────────────────────────

    def get_post(db: Session, post_id: int):
        return db.query(models.Post).filter(models.Post.id == post_id).first()


    def get_posts(
        db: Session,
        skip: int = 0,
        limit: int = 10,
        published: bool | None = None,
        author_id: int | None = None
    ):
        query = db.query(models.Post)
        if published is not None:
            query = query.filter(models.Post.published == published)
        if author_id is not None:
            query = query.filter(models.Post.author_id == author_id)
        return query.offset(skip).limit(limit).all()


    def create_post(db: Session, post: schemas.PostCreate, author_id: int):
        db_post = models.Post(
            title=post.title,
            content=post.content,
            author_id=author_id         # use logged-in user's id
        )
        db.add(db_post)
        db.commit()
        db.refresh(db_post)
        return db_post


    def update_post(db: Session, post_id: int, post: schemas.PostUpdate):
        db_post = get_post(db, post_id)
        if not db_post:
            return None
        update_data = post.model_dump(exclude_none=True)
        for field, value in update_data.items():
            setattr(db_post, field, value)
        db.commit()
        db.refresh(db_post)
        return db_post


    def delete_post(db: Session, post_id: int):
        db_post = get_post(db, post_id)
        if not db_post:
            return None
        db.delete(db_post)
        db.commit()
        return db_post


Step 6 — main.py — Complete with Auth Routes


    # main.py

    from fastapi import FastAPI, HTTPException, Depends, status
    from fastapi.security import OAuth2PasswordRequestForm
    from sqlalchemy.orm import Session
    import models
    import schemas
    import crud
    import auth
    from database import engine, get_db
    from dependencies import get_current_user

    models.Base.metadata.create_all(bind=engine)

    app = FastAPI(title="Authenticated API", version="4.0.0")


    # ── Auth Routes ───────────────────────────────────

    @app.post("/auth/register", response_model=schemas.UserResponse, status_code=201)
    def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
        existing = crud.get_user_by_email(db, user.email)
        if existing:
            raise HTTPException(status_code=400, detail="Email already registered")
        return crud.create_user(db, user)


    @app.post("/auth/login", response_model=schemas.TokenResponse)
    def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
        user = crud.authenticate_user(db, credentials.email, credentials.password)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid email or password"
            )

        token = auth.create_access_token(data={"user_id": user.id})

        return {
            "access_token": token,
            "token_type": "bearer",
            "user": user
        }


    # OAuth2 login — for /docs Authorize button to work
    @app.post("/auth/token")
    def login_for_docs(
        form_data: OAuth2PasswordRequestForm = Depends(),
        db: Session = Depends(get_db)
    ):
        user = crud.authenticate_user(db, form_data.username, form_data.password)
        if not user:
            raise HTTPException(status_code=401, detail="Invalid credentials")
        token = auth.create_access_token(data={"user_id": user.id})
        return {"access_token": token, "token_type": "bearer"}


    # ── User Routes ───────────────────────────────────

    @app.get("/users", response_model=list[schemas.UserResponse])
    def get_users(
        skip: int = 0,
        limit: int = 10,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)    # protected route
    ):
        return crud.get_users(db, skip, limit)


    @app.get("/users/me", response_model=schemas.UserResponse)
    def get_me(current_user = Depends(get_current_user)):
        """Get currently logged in user's profile."""
        return current_user


    @app.get("/users/{user_id}", response_model=schemas.UserResponse)
    def get_user(
        user_id: int,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        user = crud.get_user(db, user_id)
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        return user


    @app.patch("/users/me", response_model=schemas.UserResponse)
    def update_me(
        user_update: schemas.UserUpdate,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        """Update currently logged in user's profile."""
        return crud.update_user(db, current_user.id, user_update)


    @app.delete("/users/me", status_code=status.HTTP_204_NO_CONTENT)
    def delete_me(
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        """Delete currently logged in user's account."""
        crud.delete_user(db, current_user.id)


    # ── Post Routes ───────────────────────────────────

    @app.post("/posts", response_model=schemas.PostResponse, status_code=201)
    def create_post(
        post: schemas.PostCreate,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        return crud.create_post(db, post, author_id=current_user.id)


    @app.get("/posts", response_model=list[schemas.PostResponse])
    def get_posts(
        skip: int = 0,
        limit: int = 10,
        published: bool | None = None,
        db: Session = Depends(get_db)
        # no auth required — anyone can read posts
    ):
        return crud.get_posts(db, skip, limit, published)


    @app.get("/posts/my", response_model=list[schemas.PostResponse])
    def get_my_posts(
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        """Get all posts by currently logged in user."""
        return crud.get_posts(db, author_id=current_user.id)


    @app.get("/posts/{post_id}", response_model=schemas.PostWithAuthor)
    def get_post(post_id: int, db: Session = Depends(get_db)):
        post = crud.get_post(db, post_id)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")
        return post


    @app.patch("/posts/{post_id}", response_model=schemas.PostResponse)
    def update_post(
        post_id: int,
        post_update: schemas.PostUpdate,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        post = crud.get_post(db, post_id)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")

        # Only author can update their own post
        if post.author_id != current_user.id:
            raise HTTPException(status_code=403, detail="Not authorized to update this post")

        return crud.update_post(db, post_id, post_update)


    @app.delete("/posts/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
    def delete_post(
        post_id: int,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        post = crud.get_post(db, post_id)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")

        # Only author can delete their own post
        if post.author_id != current_user.id:
            raise HTTPException(status_code=403, detail="Not authorized to delete this post")

        crud.delete_post(db, post_id)


Testing Auth Flow in /docs

Step 1 — Register:

POST /auth/register
{
    "name": "Gagan Singh",
    "email": "gagan@email.com",
    "password": "secret123",
    "age": 22,
    "city": "Delhi"
}

Step 2 — Login:

POST /auth/login
{
    "email": "gagan@email.com",
    "password": "secret123"
}

Response:

{
    "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
    "token_type": "bearer",
    "user": {...}
}

Step 3 — Authorize in /docs: Click the Authorize button (lock icon) at the top right of /docs. Enter your token. Now all protected routes work.

Step 4 — Test protected routes:

  • GET /users/me — returns your profile
  • POST /posts — creates a post as you
  • PATCH /posts/1 — works only if you're the author

Using Token in Thunder Client / Postman

Add header to every protected request:

Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

This is the standard Bearer token format — same as your NestJS frontend uses.


Environment Variables — Secure Your Secret Key

Never hardcode secrets. Use environment variables:

Create a .env file:

SECRET_KEY=your-super-secret-key-change-this-in-production-make-it-long
DATABASE_URL=sqlite:///./app.db
ACCESS_TOKEN_EXPIRE_MINUTES=30

Install python-dotenv:

pip install python-dotenv

Create config.py:

# config.py

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    secret_key: str
    database_url: str = "sqlite:///./app.db"
    access_token_expire_minutes: int = 30

    class Config:
        env_file = ".env"

settings = Settings()

Install pydantic-settings:

pip install pydantic-settings

Update auth.py to use settings:

from config import settings

SECRET_KEY = settings.secret_key
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes

Update database.py:

from config import settings
DATABASE_URL = settings.database_url

Add .env to .gitignore — never commit secrets to git.


Role-Based Access — Admin vs User

A common pattern — restricting certain routes to admin users only:

Add role to User model:

class User(Base):
    # ... existing fields
    role = Column(String(20), default="user")    # "user" or "admin"

Create admin dependency in dependencies.py:

def get_admin_user(current_user = Depends(get_current_user)):
    if current_user.role != "admin":
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin access required"
        )
    return current_user

Use it on admin-only routes:

@app.delete("/admin/users/{user_id}", status_code=204)
def admin_delete_user(
    user_id: int,
    db: Session = Depends(get_db),
    admin = Depends(get_admin_user)    # only admins
):
    crud.delete_user(db, user_id)

This is the same pattern as Guards in NestJS.


Password Reset Flow — How It Works

In real apps password reset needs:

  1. User requests reset with their email
  2. Generate a temporary token, save it, email it to user
  3. User clicks link with token
  4. Verify token, let user set new password

Basic implementation:

import secrets

def create_password_reset_token() -> str:
    return secrets.token_urlsafe(32)    # secure random token

@app.post("/auth/forgot-password")
def forgot_password(email: str, db: Session = Depends(get_db)):
    user = crud.get_user_by_email(db, email)
    if not user:
        # Don't reveal if email exists or not — security best practice
        return {"message": "If that email exists, a reset link has been sent"}

    reset_token = create_password_reset_token()
    # In real app: save token to database with expiry, send email
    # For now just return it (in production NEVER return it directly)
    return {"reset_token": reset_token, "message": "Use this token to reset password"}

For sending emails you'd use fastapi-mail library. We'll keep that for the final project.


Security Checklist

Things every production API must have:

✅ Passwords hashed with bcrypt — done
✅ JWT tokens with expiry — done
✅ Protected routes — done
✅ Author-only post editing — done
✅ Role-based access — done
✅ Secrets in environment variables — done
✅ Email normalization — done (lowercase in validator)
❌ Rate limiting — prevents brute force attacks
❌ HTTPS — always in production
❌ Input sanitization — prevent SQL injection (SQLAlchemy handles this)
❌ Refresh tokens — long-lived sessions

Rate limiting example with slowapi:

pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/auth/login")
@limiter.limit("5/minute")    # max 5 login attempts per minute
def login(request: Request, credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
    ...

Final Project Structure

fastapi-learning/
├── venv/
├── .env                   ← secrets (never commit)
├── .gitignore
├── main.py                ← routes
├── database.py            ← db connection
├── models.py              ← SQLAlchemy tables
├── schemas.py             ← Pydantic models
├── crud.py                ← db operations
├── auth.py                ← JWT and password logic
├── dependencies.py        ← reusable dependencies
├── config.py              ← settings from .env
└── requirements.txt

Exercise 🏋️

Add these features to the current project:

1. Change Password Route:

PATCH /users/me/password
{
    "current_password": "secret123",
    "new_password": "newSecret456"
}
  • Verify current password before allowing change
  • Hash and save new password

2. Deactivate Account:

PATCH /users/me/deactivate
  • Sets is_active = False
  • Deactivated users cannot login

3. Admin Routes:

  • GET /admin/users — list all users including inactive (admin only)
  • DELETE /admin/users/{id} — force delete any user (admin only)
  • PATCH /admin/users/{id}/activate — reactivate a deactivated user

4. Post Ownership:

  • Regular users can only edit/delete their own posts
  • Admin can edit/delete any post

No comments:

Post a Comment

Authentication & Security in Fast APIs

What We're Building By the end of this stage your API will have: User registration with hashed passwords Login that returns a JWT to...