Authentication & Security in Fast APIs

What We're Building

By the end of this stage your API will have:

  • User registration with hashed passwords
  • Login that returns a JWT token
  • Protected routes that require a valid token
  • Current user detection from token

This is exactly how real production APIs work — same pattern used by every major application.


How JWT Authentication Works

Since you build with NestJS you know this flow. Quick recap in FastAPI context:

1. User registers → password gets hashed → stored in database
2. User logs in → password verified → JWT token generated → sent to client
3. Client sends token in every request header
4. Server verifies token → extracts user info → allows or denies access

Installing Dependencies

pip install python-jose[cryptography] passlib[bcrypt] python-multipart
  • python-jose — creates and verifies JWT tokens
  • passlib[bcrypt] — hashes passwords with bcrypt
  • python-multipart — needed for OAuth2 form login

Updated Project Structure

fastapi-learning/
├── venv/
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth.py            ← NEW: all auth logic lives here
├── dependencies.py    ← NEW: reusable dependencies
└── requirements.txt

Step 1 — Update models.py

Add password field to User:


    # models.py

    from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey
    from sqlalchemy.orm import relationship
    from sqlalchemy.sql import func
    from database import Base


    class User(Base):
        __tablename__ = "users"

        id = Column(Integer, primary_key=True, index=True)
        name = Column(String(50), nullable=False)
        email = Column(String(100), unique=True, nullable=False, index=True)
        password = Column(String(255), nullable=False)      # hashed password stored here
        age = Column(Integer, nullable=False)
        city = Column(String(50), default="Unknown")
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, server_default=func.now())

        posts = relationship("Post", back_populates="author")


    class Post(Base):
        __tablename__ = "posts"

        id = Column(Integer, primary_key=True, index=True)
        title = Column(String(100), nullable=False)
        content = Column(Text, nullable=False)
        published = Column(Boolean, default=False)
        created_at = Column(DateTime, server_default=func.now())
        author_id = Column(Integer, ForeignKey("users.id"), nullable=False)

        author = relationship("User", back_populates="posts")


Step 2 — auth.py — All Auth Logic


    # auth.py

    from datetime import datetime, timedelta
    from jose import JWTError, jwt
    from passlib.context import CryptContext
    from fastapi import HTTPException, status

    # ── Configuration ─────────────────────────────────
    SECRET_KEY = "your-super-secret-key-change-this-in-production"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30

    # ── Password Hashing ──────────────────────────────
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


    def hash_password(password: str) -> str:
        """Convert plain password to bcrypt hash."""
        return pwd_context.hash(password)


    def verify_password(plain_password: str, hashed_password: str) -> bool:
        """Check if plain password matches the stored hash."""
        return pwd_context.verify(plain_password, hashed_password)


    # ── JWT Token ─────────────────────────────────────
    def create_access_token(data: dict) -> str:
        """Create a JWT token with expiry."""
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


    def verify_token(token: str) -> dict:
        """Verify JWT token and return its payload."""
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            return payload
        except JWTError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid or expired token",
                headers={"WWW-Authenticate": "Bearer"}
            )


Step 3 — dependencies.py — Get Current User


    # dependencies.py

    from fastapi import Depends, HTTPException, status
    from fastapi.security import OAuth2PasswordBearer
    from sqlalchemy.orm import Session
    from database import get_db
    import auth
    import crud

    # This tells FastAPI where the login endpoint is
    # Adds a lock icon on protected routes in /docs
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


    def get_current_user(
        token: str = Depends(oauth2_scheme),
        db: Session = Depends(get_db)
    ):
        """Extract current user from JWT token."""
        payload = auth.verify_token(token)
        user_id: int = payload.get("user_id")

        if user_id is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token payload"
            )

        user = crud.get_user(db, user_id)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="User no longer exists"
            )

        if not user.is_active:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Account is deactivated"
            )

        return user


    def get_current_active_user(
        current_user = Depends(get_current_user)
    ):
        """Same as get_current_user — alias for clarity."""
        return current_user


Step 4 — Update schemas.py

Add auth-related schemas:


    # schemas.py

    from pydantic import BaseModel, Field, field_validator
    from datetime import datetime


    # ── User Schemas ─────────────────────────────────

    class UserCreate(BaseModel):
        name: str = Field(min_length=2, max_length=50)
        email: str
        password: str = Field(min_length=8)
        age: int = Field(ge=0, le=150)
        city: str = Field(default="Unknown")

        @field_validator("email")
        @classmethod
        def normalize_email(cls, value: str) -> str:
            return value.lower().strip()

        @field_validator("password")
        @classmethod
        def password_strength(cls, value: str) -> str:
            if value.isdigit():
                raise ValueError("Password cannot be only numbers")
            if value.isalpha():
                raise ValueError("Password must include at least one number")
            return value


    class UserUpdate(BaseModel):
        name: str | None = Field(default=None, min_length=2)
        age: int | None = Field(default=None, ge=0, le=150)
        city: str | None = None


    class UserResponse(BaseModel):
        id: int
        name: str
        email: str
        age: int
        city: str
        is_active: bool
        created_at: datetime

        model_config = {"from_attributes": True}


    # ── Auth Schemas ─────────────────────────────────

    class LoginRequest(BaseModel):
        email: str
        password: str


    class TokenResponse(BaseModel):
        access_token: str
        token_type: str = "bearer"
        user: UserResponse


    # ── Post Schemas ─────────────────────────────────

    class PostCreate(BaseModel):
        title: str = Field(min_length=5, max_length=100)
        content: str = Field(min_length=20)


    class PostUpdate(BaseModel):
        title: str | None = Field(default=None, min_length=5)
        content: str | None = Field(default=None, min_length=20)
        published: bool | None = None


    class PostResponse(BaseModel):
        id: int
        title: str
        content: str
        published: bool
        author_id: int
        created_at: datetime

        model_config = {"from_attributes": True}


    class PostWithAuthor(BaseModel):
        id: int
        title: str
        content: str
        published: bool
        created_at: datetime
        author: UserResponse

        model_config = {"from_attributes": True}

Notice PostCreate no longer has author_id — logged in user's ID will be used automatically.

Step 5 — Update crud.py


    # crud.py

    from sqlalchemy.orm import Session
    import models
    import schemas
    import auth


    # ── User CRUD ─────────────────────────────────────

    def get_user(db: Session, user_id: int):
        return db.query(models.User).filter(models.User.id == user_id).first()


    def get_user_by_email(db: Session, email: str):
        return db.query(models.User).filter(models.User.email == email).first()


    def get_users(db: Session, skip: int = 0, limit: int = 10):
        return db.query(models.User).offset(skip).limit(limit).all()


    def create_user(db: Session, user: schemas.UserCreate):
        # Hash password before storing
        hashed = auth.hash_password(user.password)

        db_user = models.User(
            name=user.name,
            email=user.email,
            password=hashed,        # never store plain password
            age=user.age,
            city=user.city
        )

        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        return db_user


    def authenticate_user(db: Session, email: str, password: str):
        """Verify email and password — return user if valid, None if not."""
        user = get_user_by_email(db, email)
        if not user:
            return None
        if not auth.verify_password(password, user.password):
            return None
        return user


    def update_user(db: Session, user_id: int, user: schemas.UserUpdate):
        db_user = get_user(db, user_id)
        if not db_user:
            return None
        update_data = user.model_dump(exclude_none=True)
        for field, value in update_data.items():
            setattr(db_user, field, value)
        db.commit()
        db.refresh(db_user)
        return db_user


    def delete_user(db: Session, user_id: int):
        db_user = get_user(db, user_id)
        if not db_user:
            return None
        db.delete(db_user)
        db.commit()
        return db_user


    # ── Post CRUD ─────────────────────────────────────

    def get_post(db: Session, post_id: int):
        return db.query(models.Post).filter(models.Post.id == post_id).first()


    def get_posts(
        db: Session,
        skip: int = 0,
        limit: int = 10,
        published: bool | None = None,
        author_id: int | None = None
    ):
        query = db.query(models.Post)
        if published is not None:
            query = query.filter(models.Post.published == published)
        if author_id is not None:
            query = query.filter(models.Post.author_id == author_id)
        return query.offset(skip).limit(limit).all()


    def create_post(db: Session, post: schemas.PostCreate, author_id: int):
        db_post = models.Post(
            title=post.title,
            content=post.content,
            author_id=author_id         # use logged-in user's id
        )
        db.add(db_post)
        db.commit()
        db.refresh(db_post)
        return db_post


    def update_post(db: Session, post_id: int, post: schemas.PostUpdate):
        db_post = get_post(db, post_id)
        if not db_post:
            return None
        update_data = post.model_dump(exclude_none=True)
        for field, value in update_data.items():
            setattr(db_post, field, value)
        db.commit()
        db.refresh(db_post)
        return db_post


    def delete_post(db: Session, post_id: int):
        db_post = get_post(db, post_id)
        if not db_post:
            return None
        db.delete(db_post)
        db.commit()
        return db_post


Step 6 — main.py — Complete with Auth Routes


    # main.py

    from fastapi import FastAPI, HTTPException, Depends, status
    from fastapi.security import OAuth2PasswordRequestForm
    from sqlalchemy.orm import Session
    import models
    import schemas
    import crud
    import auth
    from database import engine, get_db
    from dependencies import get_current_user

    models.Base.metadata.create_all(bind=engine)

    app = FastAPI(title="Authenticated API", version="4.0.0")


    # ── Auth Routes ───────────────────────────────────

    @app.post("/auth/register", response_model=schemas.UserResponse, status_code=201)
    def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
        existing = crud.get_user_by_email(db, user.email)
        if existing:
            raise HTTPException(status_code=400, detail="Email already registered")
        return crud.create_user(db, user)


    @app.post("/auth/login", response_model=schemas.TokenResponse)
    def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
        user = crud.authenticate_user(db, credentials.email, credentials.password)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid email or password"
            )

        token = auth.create_access_token(data={"user_id": user.id})

        return {
            "access_token": token,
            "token_type": "bearer",
            "user": user
        }


    # OAuth2 login — for /docs Authorize button to work
    @app.post("/auth/token")
    def login_for_docs(
        form_data: OAuth2PasswordRequestForm = Depends(),
        db: Session = Depends(get_db)
    ):
        user = crud.authenticate_user(db, form_data.username, form_data.password)
        if not user:
            raise HTTPException(status_code=401, detail="Invalid credentials")
        token = auth.create_access_token(data={"user_id": user.id})
        return {"access_token": token, "token_type": "bearer"}


    # ── User Routes ───────────────────────────────────

    @app.get("/users", response_model=list[schemas.UserResponse])
    def get_users(
        skip: int = 0,
        limit: int = 10,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)    # protected route
    ):
        return crud.get_users(db, skip, limit)


    @app.get("/users/me", response_model=schemas.UserResponse)
    def get_me(current_user = Depends(get_current_user)):
        """Get currently logged in user's profile."""
        return current_user


    @app.get("/users/{user_id}", response_model=schemas.UserResponse)
    def get_user(
        user_id: int,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        user = crud.get_user(db, user_id)
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        return user


    @app.patch("/users/me", response_model=schemas.UserResponse)
    def update_me(
        user_update: schemas.UserUpdate,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        """Update currently logged in user's profile."""
        return crud.update_user(db, current_user.id, user_update)


    @app.delete("/users/me", status_code=status.HTTP_204_NO_CONTENT)
    def delete_me(
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        """Delete currently logged in user's account."""
        crud.delete_user(db, current_user.id)


    # ── Post Routes ───────────────────────────────────

    @app.post("/posts", response_model=schemas.PostResponse, status_code=201)
    def create_post(
        post: schemas.PostCreate,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        return crud.create_post(db, post, author_id=current_user.id)


    @app.get("/posts", response_model=list[schemas.PostResponse])
    def get_posts(
        skip: int = 0,
        limit: int = 10,
        published: bool | None = None,
        db: Session = Depends(get_db)
        # no auth required — anyone can read posts
    ):
        return crud.get_posts(db, skip, limit, published)


    @app.get("/posts/my", response_model=list[schemas.PostResponse])
    def get_my_posts(
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        """Get all posts by currently logged in user."""
        return crud.get_posts(db, author_id=current_user.id)


    @app.get("/posts/{post_id}", response_model=schemas.PostWithAuthor)
    def get_post(post_id: int, db: Session = Depends(get_db)):
        post = crud.get_post(db, post_id)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")
        return post


    @app.patch("/posts/{post_id}", response_model=schemas.PostResponse)
    def update_post(
        post_id: int,
        post_update: schemas.PostUpdate,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        post = crud.get_post(db, post_id)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")

        # Only author can update their own post
        if post.author_id != current_user.id:
            raise HTTPException(status_code=403, detail="Not authorized to update this post")

        return crud.update_post(db, post_id, post_update)


    @app.delete("/posts/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
    def delete_post(
        post_id: int,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        post = crud.get_post(db, post_id)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")

        # Only author can delete their own post
        if post.author_id != current_user.id:
            raise HTTPException(status_code=403, detail="Not authorized to delete this post")

        crud.delete_post(db, post_id)


Testing Auth Flow in /docs

Step 1 — Register:

POST /auth/register
{
    "name": "Gagan Singh",
    "email": "gagan@email.com",
    "password": "secret123",
    "age": 22,
    "city": "Delhi"
}

Step 2 — Login:

POST /auth/login
{
    "email": "gagan@email.com",
    "password": "secret123"
}

Response:

{
    "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
    "token_type": "bearer",
    "user": {...}
}

Step 3 — Authorize in /docs: Click the Authorize button (lock icon) at the top right of /docs. Enter your token. Now all protected routes work.

Step 4 — Test protected routes:

  • GET /users/me — returns your profile
  • POST /posts — creates a post as you
  • PATCH /posts/1 — works only if you're the author

Using Token in Thunder Client / Postman

Add header to every protected request:

Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

This is the standard Bearer token format — same as your NestJS frontend uses.


Environment Variables — Secure Your Secret Key

Never hardcode secrets. Use environment variables:

Create a .env file:

SECRET_KEY=your-super-secret-key-change-this-in-production-make-it-long
DATABASE_URL=sqlite:///./app.db
ACCESS_TOKEN_EXPIRE_MINUTES=30

Install python-dotenv:

pip install python-dotenv

Create config.py:

# config.py

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    secret_key: str
    database_url: str = "sqlite:///./app.db"
    access_token_expire_minutes: int = 30

    class Config:
        env_file = ".env"

settings = Settings()

Install pydantic-settings:

pip install pydantic-settings

Update auth.py to use settings:

from config import settings

SECRET_KEY = settings.secret_key
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes

Update database.py:

from config import settings
DATABASE_URL = settings.database_url

Add .env to .gitignore — never commit secrets to git.


Role-Based Access — Admin vs User

A common pattern — restricting certain routes to admin users only:

Add role to User model:

class User(Base):
    # ... existing fields
    role = Column(String(20), default="user")    # "user" or "admin"

Create admin dependency in dependencies.py:

def get_admin_user(current_user = Depends(get_current_user)):
    if current_user.role != "admin":
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin access required"
        )
    return current_user

Use it on admin-only routes:

@app.delete("/admin/users/{user_id}", status_code=204)
def admin_delete_user(
    user_id: int,
    db: Session = Depends(get_db),
    admin = Depends(get_admin_user)    # only admins
):
    crud.delete_user(db, user_id)

This is the same pattern as Guards in NestJS.


Password Reset Flow — How It Works

In real apps password reset needs:

  1. User requests reset with their email
  2. Generate a temporary token, save it, email it to user
  3. User clicks link with token
  4. Verify token, let user set new password

Basic implementation:

import secrets

def create_password_reset_token() -> str:
    return secrets.token_urlsafe(32)    # secure random token

@app.post("/auth/forgot-password")
def forgot_password(email: str, db: Session = Depends(get_db)):
    user = crud.get_user_by_email(db, email)
    if not user:
        # Don't reveal if email exists or not — security best practice
        return {"message": "If that email exists, a reset link has been sent"}

    reset_token = create_password_reset_token()
    # In real app: save token to database with expiry, send email
    # For now just return it (in production NEVER return it directly)
    return {"reset_token": reset_token, "message": "Use this token to reset password"}

For sending emails you'd use fastapi-mail library. We'll keep that for the final project.


Security Checklist

Things every production API must have:

✅ Passwords hashed with bcrypt — done
✅ JWT tokens with expiry — done
✅ Protected routes — done
✅ Author-only post editing — done
✅ Role-based access — done
✅ Secrets in environment variables — done
✅ Email normalization — done (lowercase in validator)
❌ Rate limiting — prevents brute force attacks
❌ HTTPS — always in production
❌ Input sanitization — prevent SQL injection (SQLAlchemy handles this)
❌ Refresh tokens — long-lived sessions

Rate limiting example with slowapi:

pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/auth/login")
@limiter.limit("5/minute")    # max 5 login attempts per minute
def login(request: Request, credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
    ...

Final Project Structure

fastapi-learning/
├── venv/
├── .env                   ← secrets (never commit)
├── .gitignore
├── main.py                ← routes
├── database.py            ← db connection
├── models.py              ← SQLAlchemy tables
├── schemas.py             ← Pydantic models
├── crud.py                ← db operations
├── auth.py                ← JWT and password logic
├── dependencies.py        ← reusable dependencies
├── config.py              ← settings from .env
└── requirements.txt

Exercise 🏋️

Add these features to the current project:

1. Change Password Route:

PATCH /users/me/password
{
    "current_password": "secret123",
    "new_password": "newSecret456"
}
  • Verify current password before allowing change
  • Hash and save new password

2. Deactivate Account:

PATCH /users/me/deactivate
  • Sets is_active = False
  • Deactivated users cannot login

3. Admin Routes:

  • GET /admin/users — list all users including inactive (admin only)
  • DELETE /admin/users/{id} — force delete any user (admin only)
  • PATCH /admin/users/{id}/activate — reactivate a deactivated user

4. Post Ownership:

  • Regular users can only edit/delete their own posts
  • Admin can edit/delete any post

No comments:

Post a Comment

How PHP Embeds Into HTML — And Can It Work Inside JavaScript?

One of PHP's most unique characteristics is that it doesn't live in its own isolated file waiting to be called. It can sit directly ...