Final Real Project in Fast APIs

What We're Building

A complete Task Management API — like a mini Trello/Jira backend.

This project uses everything from all 6 stages together in a real, production-quality structure. When done, you'll have a portfolio project you can actually show.


Features

✅ User registration and login (JWT auth)
✅ Workspaces — users create workspaces, invite members
✅ Projects — inside workspaces
✅ Tasks — inside projects, assign to members
✅ Comments — on tasks
✅ File attachments — on tasks
✅ Role based access — owner, admin, member
✅ Background email notifications
✅ Pagination on all list endpoints
✅ Full filtering and search
✅ Consistent API responses
✅ Complete /docs documentation

Final Project Structure

task-manager-api/
├── routers/
│   ├── __init__.py
│   ├── auth.py
│   ├── users.py
│   ├── workspaces.py
│   ├── projects.py
│   ├── tasks.py
│   └── comments.py
├── uploads/
├── venv/
├── .env
├── .gitignore
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth_utils.py
├── dependencies.py
├── config.py
├── utils.py
└── requirements.txt

Step 1 — Setup

mkdir task-manager-api
cd task-manager-api
python -m venv venv
venv\Scripts\activate     # Windows
source venv/bin/activate  # Mac/Linux

pip install fastapi uvicorn sqlalchemy passlib[bcrypt] python-jose[cryptography] python-multipart python-dotenv pydantic-settings

pip freeze > requirements.txt

Create .env:

SECRET_KEY=super-secret-key-make-this-very-long-and-random-in-production
DATABASE_URL=sqlite:///./taskmanager.db
ACCESS_TOKEN_EXPIRE_MINUTES=60

Create .gitignore:

venv/
__pycache__/
*.pyc
.env
*.db
uploads/

Step 2 — config.py

# config.py

from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    secret_key: str
    database_url: str = "sqlite:///./taskmanager.db"
    access_token_expire_minutes: int = 60

    class Config:
        env_file = ".env"


settings = Settings()

Step 3 — database.py

# database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config import settings

engine = create_engine(
    settings.database_url,
    connect_args={"check_same_thread": False}
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Step 4 — models.py

This is the core — all database tables:

# models.py

from sqlalchemy import (
    Column, Integer, String, Boolean, DateTime,
    Text, ForeignKey, Enum as SQLEnum
)
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
import enum


# ── Enums ─────────────────────────────────────────

class WorkspaceRole(str, enum.Enum):
    owner = "owner"
    admin = "admin"
    member = "member"

class TaskStatus(str, enum.Enum):
    todo = "todo"
    in_progress = "in_progress"
    in_review = "in_review"
    done = "done"

class TaskPriority(str, enum.Enum):
    low = "low"
    medium = "medium"
    high = "high"
    urgent = "urgent"


# ── Tables ────────────────────────────────────────

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), nullable=False)
    email = Column(String(100), unique=True, nullable=False, index=True)
    password = Column(String(255), nullable=False)
    avatar = Column(String(255), nullable=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())

    workspace_memberships = relationship("WorkspaceMember", back_populates="user")
    assigned_tasks = relationship("Task", back_populates="assignee", foreign_keys="Task.assignee_id")
    comments = relationship("Comment", back_populates="author")


class Workspace(Base):
    __tablename__ = "workspaces"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    created_at = Column(DateTime, server_default=func.now())
    owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    owner = relationship("User")
    members = relationship("WorkspaceMember", back_populates="workspace")
    projects = relationship("Project", back_populates="workspace")


class WorkspaceMember(Base):
    __tablename__ = "workspace_members"

    id = Column(Integer, primary_key=True, index=True)
    workspace_id = Column(Integer, ForeignKey("workspaces.id"), nullable=False)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    role = Column(SQLEnum(WorkspaceRole), default=WorkspaceRole.member)
    joined_at = Column(DateTime, server_default=func.now())

    workspace = relationship("Workspace", back_populates="members")
    user = relationship("User", back_populates="workspace_memberships")


class Project(Base):
    __tablename__ = "projects"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())
    workspace_id = Column(Integer, ForeignKey("workspaces.id"), nullable=False)

    workspace = relationship("Workspace", back_populates="projects")
    tasks = relationship("Task", back_populates="project")


class Task(Base):
    __tablename__ = "tasks"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    description = Column(Text, nullable=True)
    status = Column(SQLEnum(TaskStatus), default=TaskStatus.todo)
    priority = Column(SQLEnum(TaskPriority), default=TaskPriority.medium)
    due_date = Column(DateTime, nullable=True)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

    project_id = Column(Integer, ForeignKey("projects.id"), nullable=False)
    creator_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    assignee_id = Column(Integer, ForeignKey("users.id"), nullable=True)

    project = relationship("Project", back_populates="tasks")
    creator = relationship("User", foreign_keys=[creator_id])
    assignee = relationship("User", back_populates="assigned_tasks", foreign_keys=[assignee_id])
    comments = relationship("Comment", back_populates="task")
    attachments = relationship("Attachment", back_populates="task")


class Comment(Base):
    __tablename__ = "comments"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

    task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    task = relationship("Task", back_populates="comments")
    author = relationship("User", back_populates="comments")


class Attachment(Base):
    __tablename__ = "attachments"

    id = Column(Integer, primary_key=True, index=True)
    filename = Column(String(255), nullable=False)
    original_name = Column(String(255), nullable=False)
    file_size = Column(Integer, nullable=False)
    content_type = Column(String(100), nullable=False)
    uploaded_at = Column(DateTime, server_default=func.now())

    task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
    uploaded_by = Column(Integer, ForeignKey("users.id"), nullable=False)

    task = relationship("Task", back_populates="attachments")
    uploader = relationship("User")

Step 5 — auth_utils.py

# auth_utils.py

from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status
from config import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)


def create_access_token(data: dict) -> str:
    payload = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
    payload.update({"exp": expire})
    return jwt.encode(payload, settings.secret_key, algorithm="HS256")


def verify_token(token: str) -> dict:
    try:
        return jwt.decode(token, settings.secret_key, algorithms=["HS256"])
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token",
            headers={"WWW-Authenticate": "Bearer"}
        )

Step 6 — schemas.py

# schemas.py

from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from typing import Generic, TypeVar
from models import TaskStatus, TaskPriority, WorkspaceRole

T = TypeVar("T")


# ── Generic Response Wrapper ──────────────────────

class APIResponse(BaseModel, Generic[T]):
    success: bool = True
    message: str = "Success"
    data: T | None = None


class PaginatedResponse(BaseModel, Generic[T]):
    success: bool = True
    data: list[T] = []
    total: int = 0
    page: int = 1
    per_page: int = 10
    total_pages: int = 0
    has_next: bool = False
    has_prev: bool = False


# ── User Schemas ─────────────────────────────────

class UserCreate(BaseModel):
    name: str = Field(min_length=2, max_length=50)
    email: str
    password: str = Field(min_length=8)

    @field_validator("email")
    @classmethod
    def normalize_email(cls, v: str) -> str:
        return v.lower().strip()

    @field_validator("password")
    @classmethod
    def validate_password(cls, v: str) -> str:
        if v.isdigit():
            raise ValueError("Password cannot be only numbers")
        if v.isalpha():
            raise ValueError("Password must include at least one number")
        return v


class UserUpdate(BaseModel):
    name: str | None = Field(default=None, min_length=2)


class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    avatar: str | None
    is_active: bool
    created_at: datetime

    model_config = {"from_attributes": True}


class LoginRequest(BaseModel):
    email: str
    password: str


class TokenResponse(BaseModel):
    access_token: str
    token_type: str = "bearer"
    user: UserResponse


# ── Workspace Schemas ─────────────────────────────

class WorkspaceCreate(BaseModel):
    name: str = Field(min_length=2, max_length=100)
    description: str | None = Field(default=None, max_length=500)


class WorkspaceUpdate(BaseModel):
    name: str | None = Field(default=None, min_length=2)
    description: str | None = None


class WorkspaceMemberResponse(BaseModel):
    id: int
    user: UserResponse
    role: WorkspaceRole
    joined_at: datetime

    model_config = {"from_attributes": True}


class WorkspaceResponse(BaseModel):
    id: int
    name: str
    description: str | None
    owner_id: int
    created_at: datetime
    member_count: int = 0

    model_config = {"from_attributes": True}


class InviteMember(BaseModel):
    email: str
    role: WorkspaceRole = WorkspaceRole.member


# ── Project Schemas ───────────────────────────────

class ProjectCreate(BaseModel):
    name: str = Field(min_length=2, max_length=100)
    description: str | None = Field(default=None, max_length=500)


class ProjectUpdate(BaseModel):
    name: str | None = Field(default=None, min_length=2)
    description: str | None = None
    is_active: bool | None = None


class ProjectResponse(BaseModel):
    id: int
    name: str
    description: str | None
    is_active: bool
    workspace_id: int
    created_at: datetime
    task_count: int = 0

    model_config = {"from_attributes": True}


# ── Task Schemas ──────────────────────────────────

class TaskCreate(BaseModel):
    title: str = Field(min_length=3, max_length=200)
    description: str | None = None
    priority: TaskPriority = TaskPriority.medium
    due_date: datetime | None = None
    assignee_id: int | None = None


class TaskUpdate(BaseModel):
    title: str | None = Field(default=None, min_length=3)
    description: str | None = None
    status: TaskStatus | None = None
    priority: TaskPriority | None = None
    due_date: datetime | None = None
    assignee_id: int | None = None


class TaskResponse(BaseModel):
    id: int
    title: str
    description: str | None
    status: TaskStatus
    priority: TaskPriority
    due_date: datetime | None
    project_id: int
    creator_id: int
    assignee_id: int | None
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


class TaskDetailResponse(BaseModel):
    id: int
    title: str
    description: str | None
    status: TaskStatus
    priority: TaskPriority
    due_date: datetime | None
    created_at: datetime
    updated_at: datetime
    creator: UserResponse
    assignee: UserResponse | None
    comment_count: int = 0

    model_config = {"from_attributes": True}


# ── Comment Schemas ───────────────────────────────

class CommentCreate(BaseModel):
    content: str = Field(min_length=1, max_length=2000)


class CommentUpdate(BaseModel):
    content: str = Field(min_length=1, max_length=2000)


class CommentResponse(BaseModel):
    id: int
    content: str
    task_id: int
    author: UserResponse
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


# ── Attachment Schema ─────────────────────────────

class AttachmentResponse(BaseModel):
    id: int
    filename: str
    original_name: str
    file_size: int
    content_type: str
    uploaded_at: datetime
    task_id: int

    model_config = {"from_attributes": True}

Step 7 — utils.py

# utils.py

from sqlalchemy.orm import Session


def paginate(query, page: int = 1, per_page: int = 10) -> dict:
    """Apply pagination to any SQLAlchemy query."""
    total = query.count()
    total_pages = (total + per_page - 1) // per_page
    items = query.offset((page - 1) * per_page).limit(per_page).all()

    return {
        "data": items,
        "total": total,
        "page": page,
        "per_page": per_page,
        "total_pages": total_pages,
        "has_next": page < total_pages,
        "has_prev": page > 1
    }


def send_notification_email(to_email: str, subject: str, body: str):
    """Simulate sending email — replace with real email library in production."""
    print(f"\n📧 EMAIL TO: {to_email}")
    print(f"   SUBJECT: {subject}")
    print(f"   BODY: {body}\n")

Step 8 — dependencies.py

# dependencies.py

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from database import get_db
import auth_utils
import models

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")


def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> models.User:
    payload = auth_utils.verify_token(token)
    user_id = payload.get("user_id")

    if not user_id:
        raise HTTPException(status_code=401, detail="Invalid token")

    user = db.query(models.User).filter(models.User.id == user_id).first()

    if not user:
        raise HTTPException(status_code=401, detail="User not found")

    if not user.is_active:
        raise HTTPException(status_code=403, detail="Account deactivated")

    return user


def get_workspace_member(
    workspace_id: int,
    current_user: models.User = Depends(get_current_user),
    db: Session = Depends(get_db)
) -> models.WorkspaceMember:
    """Verify user is a member of the workspace."""
    membership = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id,
        models.WorkspaceMember.user_id == current_user.id
    ).first()

    if not membership:
        raise HTTPException(
            status_code=403,
            detail="You are not a member of this workspace"
        )

    return membership


def get_workspace_admin(
    membership: models.WorkspaceMember = Depends(get_workspace_member)
) -> models.WorkspaceMember:
    """Verify user is admin or owner of the workspace."""
    if membership.role not in [models.WorkspaceRole.owner, models.WorkspaceRole.admin]:
        raise HTTPException(
            status_code=403,
            detail="Admin access required"
        )
    return membership

Step 9 — All Routers

routers/auth.py

# routers/auth.py

from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import models
import schemas
import auth_utils
from database import get_db
from dependencies import get_current_user

router = APIRouter(prefix="/auth", tags=["Authentication"])


@router.post("/register", response_model=schemas.UserResponse, status_code=201)
def register(user_data: schemas.UserCreate, db: Session = Depends(get_db)):
    if db.query(models.User).filter(models.User.email == user_data.email).first():
        raise HTTPException(status_code=400, detail="Email already registered")

    hashed = auth_utils.hash_password(user_data.password)
    user = models.User(
        name=user_data.name,
        email=user_data.email,
        password=hashed
    )
    db.add(user)
    db.commit()
    db.refresh(user)
    return user


@router.post("/login", response_model=schemas.TokenResponse)
def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
    user = db.query(models.User).filter(
        models.User.email == credentials.email.lower()
    ).first()

    if not user or not auth_utils.verify_password(credentials.password, user.password):
        raise HTTPException(status_code=401, detail="Invalid email or password")

    if not user.is_active:
        raise HTTPException(status_code=403, detail="Account deactivated")

    token = auth_utils.create_access_token({"user_id": user.id})
    return {"access_token": token, "token_type": "bearer", "user": user}


@router.post("/token")    # for /docs Authorize button
def login_for_docs(
    form_data: schemas.LoginRequest,
    db: Session = Depends(get_db)
):
    user = db.query(models.User).filter(
        models.User.email == form_data.email.lower()
    ).first()
    if not user or not auth_utils.verify_password(form_data.password, user.password):
        raise HTTPException(status_code=401, detail="Invalid credentials")
    token = auth_utils.create_access_token({"user_id": user.id})
    return {"access_token": token, "token_type": "bearer"}


@router.get("/me", response_model=schemas.UserResponse)
def get_me(current_user: models.User = Depends(get_current_user)):
    return current_user


@router.patch("/me", response_model=schemas.UserResponse)
def update_me(
    update: schemas.UserUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    if update.name:
        current_user.name = update.name
    db.commit()
    db.refresh(current_user)
    return current_user

routers/workspaces.py

# routers/workspaces.py

from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member, get_workspace_admin
from utils import paginate, send_notification_email

router = APIRouter(prefix="/workspaces", tags=["Workspaces"])


@router.post("", response_model=schemas.WorkspaceResponse, status_code=201)
def create_workspace(
    data: schemas.WorkspaceCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    workspace = models.Workspace(
        name=data.name,
        description=data.description,
        owner_id=current_user.id
    )
    db.add(workspace)
    db.flush()    # get ID before commit

    # Add creator as owner member
    member = models.WorkspaceMember(
        workspace_id=workspace.id,
        user_id=current_user.id,
        role=models.WorkspaceRole.owner
    )
    db.add(member)
    db.commit()
    db.refresh(workspace)

    workspace.member_count = 1
    return workspace


@router.get("", response_model=schemas.PaginatedResponse[schemas.WorkspaceResponse])
def get_my_workspaces(
    page: int = 1,
    per_page: int = 10,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    query = db.query(models.Workspace).join(
        models.WorkspaceMember
    ).filter(
        models.WorkspaceMember.user_id == current_user.id
    )

    result = paginate(query, page, per_page)

    for ws in result["data"]:
        ws.member_count = db.query(models.WorkspaceMember).filter(
            models.WorkspaceMember.workspace_id == ws.id
        ).count()

    return result


@router.get("/{workspace_id}", response_model=schemas.WorkspaceResponse)
def get_workspace(
    workspace_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    workspace = db.query(models.Workspace).filter(
        models.Workspace.id == workspace_id
    ).first()

    workspace.member_count = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id
    ).count()

    return workspace


@router.patch("/{workspace_id}", response_model=schemas.WorkspaceResponse)
def update_workspace(
    workspace_id: int,
    data: schemas.WorkspaceUpdate,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    workspace = db.query(models.Workspace).filter(
        models.Workspace.id == workspace_id
    ).first()

    if data.name:
        workspace.name = data.name
    if data.description is not None:
        workspace.description = data.description

    db.commit()
    db.refresh(workspace)
    workspace.member_count = len(workspace.members)
    return workspace


@router.post("/{workspace_id}/invite", response_model=schemas.WorkspaceMemberResponse)
def invite_member(
    workspace_id: int,
    invite: schemas.InviteMember,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    # Find user by email
    user = db.query(models.User).filter(
        models.User.email == invite.email.lower()
    ).first()

    if not user:
        raise HTTPException(status_code=404, detail="User with this email not found")

    # Check already member
    existing = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id,
        models.WorkspaceMember.user_id == user.id
    ).first()

    if existing:
        raise HTTPException(status_code=400, detail="User is already a member")

    # Add member
    new_member = models.WorkspaceMember(
        workspace_id=workspace_id,
        user_id=user.id,
        role=invite.role
    )
    db.add(new_member)
    db.commit()
    db.refresh(new_member)

    # Send notification in background
    workspace = db.query(models.Workspace).filter(
        models.Workspace.id == workspace_id
    ).first()

    background_tasks.add_task(
        send_notification_email,
        user.email,
        f"You've been added to {workspace.name}",
        f"Hi {user.name}, you've been added as {invite.role} to workspace '{workspace.name}'"
    )

    return new_member


@router.get("/{workspace_id}/members", response_model=list[schemas.WorkspaceMemberResponse])
def get_members(
    workspace_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    return db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id
    ).all()


@router.delete("/{workspace_id}/members/{user_id}", status_code=204)
def remove_member(
    workspace_id: int,
    user_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    member = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id,
        models.WorkspaceMember.user_id == user_id
    ).first()

    if not member:
        raise HTTPException(status_code=404, detail="Member not found")

    if member.role == models.WorkspaceRole.owner:
        raise HTTPException(status_code=400, detail="Cannot remove workspace owner")

    db.delete(member)
    db.commit()

routers/projects.py

# routers/projects.py

from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member, get_workspace_admin
from utils import paginate

router = APIRouter(prefix="/workspaces/{workspace_id}/projects", tags=["Projects"])


@router.post("", response_model=schemas.ProjectResponse, status_code=201)
def create_project(
    workspace_id: int,
    data: schemas.ProjectCreate,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    project = models.Project(
        name=data.name,
        description=data.description,
        workspace_id=workspace_id
    )
    db.add(project)
    db.commit()
    db.refresh(project)
    project.task_count = 0
    return project


@router.get("", response_model=schemas.PaginatedResponse[schemas.ProjectResponse])
def get_projects(
    workspace_id: int,
    page: int = 1,
    per_page: int = 10,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    query = db.query(models.Project).filter(
        models.Project.workspace_id == workspace_id
    )
    result = paginate(query, page, per_page)

    for project in result["data"]:
        project.task_count = db.query(models.Task).filter(
            models.Task.project_id == project.id
        ).count()

    return result


@router.get("/{project_id}", response_model=schemas.ProjectResponse)
def get_project(
    workspace_id: int,
    project_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    project = db.query(models.Project).filter(
        models.Project.id == project_id,
        models.Project.workspace_id == workspace_id
    ).first()

    if not project:
        raise HTTPException(status_code=404, detail="Project not found")

    project.task_count = db.query(models.Task).filter(
        models.Task.project_id == project_id
    ).count()

    return project


@router.patch("/{project_id}", response_model=schemas.ProjectResponse)
def update_project(
    workspace_id: int,
    project_id: int,
    data: schemas.ProjectUpdate,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    project = db.query(models.Project).filter(
        models.Project.id == project_id,
        models.Project.workspace_id == workspace_id
    ).first()

    if not project:
        raise HTTPException(status_code=404, detail="Project not found")

    update_data = data.model_dump(exclude_none=True)
    for field, value in update_data.items():
        setattr(project, field, value)

    db.commit()
    db.refresh(project)
    project.task_count = len(project.tasks)
    return project


@router.delete("/{project_id}", status_code=204)
def delete_project(
    workspace_id: int,
    project_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    project = db.query(models.Project).filter(
        models.Project.id == project_id,
        models.Project.workspace_id == workspace_id
    ).first()

    if not project:
        raise HTTPException(status_code=404, detail="Project not found")

    db.delete(project)
    db.commit()

routers/tasks.py

# routers/tasks.py

from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, File, UploadFile
from sqlalchemy.orm import Session
from sqlalchemy import or_
import os
import uuid
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member
from utils import paginate, send_notification_email
from models import TaskStatus, TaskPriority

router = APIRouter(prefix="/projects/{project_id}/tasks", tags=["Tasks"])

UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)


def get_project_or_404(project_id: int, db: Session) -> models.Project:
    project = db.query(models.Project).filter(models.Project.id == project_id).first()
    if not project:
        raise HTTPException(status_code=404, detail="Project not found")
    return project


@router.post("", response_model=schemas.TaskResponse, status_code=201)
def create_task(
    project_id: int,
    data: schemas.TaskCreate,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    project = get_project_or_404(project_id, db)

    task = models.Task(
        title=data.title,
        description=data.description,
        priority=data.priority,
        due_date=data.due_date,
        assignee_id=data.assignee_id,
        project_id=project_id,
        creator_id=current_user.id
    )
    db.add(task)
    db.commit()
    db.refresh(task)

    # Notify assignee in background
    if data.assignee_id and data.assignee_id != current_user.id:
        assignee = db.query(models.User).filter(
            models.User.id == data.assignee_id
        ).first()
        if assignee:
            background_tasks.add_task(
                send_notification_email,
                assignee.email,
                f"New task assigned: {task.title}",
                f"Hi {assignee.name}, you have been assigned task '{task.title}' in project '{project.name}'"
            )

    return task


@router.get("", response_model=schemas.PaginatedResponse[schemas.TaskResponse])
def get_tasks(
    project_id: int,
    page: int = 1,
    per_page: int = 10,
    status: TaskStatus | None = None,
    priority: TaskPriority | None = None,
    assignee_id: int | None = None,
    search: str | None = None,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    get_project_or_404(project_id, db)

    query = db.query(models.Task).filter(models.Task.project_id == project_id)

    if status:
        query = query.filter(models.Task.status == status)
    if priority:
        query = query.filter(models.Task.priority == priority)
    if assignee_id:
        query = query.filter(models.Task.assignee_id == assignee_id)
    if search:
        query = query.filter(
            or_(
                models.Task.title.ilike(f"%{search}%"),
                models.Task.description.ilike(f"%{search}%")
            )
        )

    return paginate(query, page, per_page)


@router.get("/{task_id}", response_model=schemas.TaskDetailResponse)
def get_task(
    project_id: int,
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    task.comment_count = db.query(models.Comment).filter(
        models.Comment.task_id == task_id
    ).count()

    return task


@router.patch("/{task_id}", response_model=schemas.TaskResponse)
def update_task(
    project_id: int,
    task_id: int,
    data: schemas.TaskUpdate,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    old_assignee_id = task.assignee_id
    update_data = data.model_dump(exclude_none=True)
    for field, value in update_data.items():
        setattr(task, field, value)

    db.commit()
    db.refresh(task)

    # Notify new assignee if changed
    if data.assignee_id and data.assignee_id != old_assignee_id:
        new_assignee = db.query(models.User).filter(
            models.User.id == data.assignee_id
        ).first()
        if new_assignee:
            background_tasks.add_task(
                send_notification_email,
                new_assignee.email,
                f"Task reassigned to you: {task.title}",
                f"Hi {new_assignee.name}, task '{task.title}' has been assigned to you."
            )

    return task


@router.delete("/{task_id}", status_code=204)
def delete_task(
    project_id: int,
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    if task.creator_id != current_user.id:
        raise HTTPException(status_code=403, detail="Only task creator can delete")

    db.delete(task)
    db.commit()


@router.post("/{task_id}/attachments", response_model=schemas.AttachmentResponse)
async def upload_attachment(
    project_id: int,
    task_id: int,
    file: UploadFile = File(...),
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    content = await file.read()

    if len(content) > 10 * 1024 * 1024:
        raise HTTPException(status_code=400, detail="File too large. Max 10MB")

    extension = file.filename.split(".")[-1].lower()
    unique_name = f"{uuid.uuid4()}.{extension}"
    file_path = os.path.join(UPLOAD_DIR, unique_name)

    with open(file_path, "wb") as f:
        f.write(content)

    attachment = models.Attachment(
        filename=unique_name,
        original_name=file.filename,
        file_size=len(content),
        content_type=file.content_type,
        task_id=task_id,
        uploaded_by=current_user.id
    )
    db.add(attachment)
    db.commit()
    db.refresh(attachment)

    return attachment


@router.get("/{task_id}/attachments", response_model=list[schemas.AttachmentResponse])
def get_attachments(
    project_id: int,
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    return db.query(models.Attachment).filter(
        models.Attachment.task_id == task_id
    ).all()

routers/comments.py

# routers/comments.py

from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user
from utils import paginate

router = APIRouter(prefix="/tasks/{task_id}/comments", tags=["Comments"])


@router.post("", response_model=schemas.CommentResponse, status_code=201)
def create_comment(
    task_id: int,
    data: schemas.CommentCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(models.Task.id == task_id).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    comment = models.Comment(
        content=data.content,
        task_id=task_id,
        author_id=current_user.id
    )
    db.add(comment)
    db.commit()
    db.refresh(comment)
    return comment


@router.get("", response_model=schemas.PaginatedResponse[schemas.CommentResponse])
def get_comments(
    task_id: int,
    page: int = 1,
    per_page: int = 20,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    query = db.query(models.Comment).filter(
        models.Comment.task_id == task_id
    ).order_by(models.Comment.created_at.asc())

    return paginate(query, page, per_page)


@router.patch("/{comment_id}", response_model=schemas.CommentResponse)
def update_comment(
    task_id: int,
    comment_id: int,
    data: schemas.CommentUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    comment = db.query(models.Comment).filter(
        models.Comment.id == comment_id,
        models.Comment.task_id == task_id
    ).first()

    if not comment:
        raise HTTPException(status_code=404, detail="Comment not found")

    if comment.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Can only edit your own comments")

    comment.content = data.content
    db.commit()
    db.refresh(comment)
    return comment


@router.delete("/{comment_id}", status_code=204)
def delete_comment(
    task_id: int,
    comment_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    comment = db.query(models.Comment).filter(
        models.Comment.id == comment_id,
        models.Comment.task_id == task_id
    ).first()

    if not comment:
        raise HTTPException(status_code=404, detail="Comment not found")

    if comment.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Can only delete your own comments")

    db.delete(comment)
    db.commit()

Step 10 — main.py — Final Clean Entry Point

# main.py

import time
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import models
from database import engine
from routers import auth, users, workspaces, projects, tasks, comments
import os

# Create tables
models.Base.metadata.create_all(bind=engine)

# Create uploads folder
os.makedirs("uploads", exist_ok=True)

app = FastAPI(
    title="Task Manager API",
    description="""
    A complete task management API built with FastAPI.

    ## Features
    - JWT Authentication
    - Workspaces with role-based access
    - Projects inside workspaces
    - Tasks with priorities and statuses
    - Comments on tasks
    - File attachments
    - Background notifications
    """,
    version="1.0.0",
    contact={
        "name": "Gagan",
        "email": "gagan@email.com"
    }
)

# ── CORS ──────────────────────────────────────────
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ── Request Logging Middleware ────────────────────
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = round((time.time() - start) * 1000, 2)
    print(f"[{request.method}] {request.url.path} → {response.status_code} ({duration}ms)")
    return response

# ── Exception Handlers ────────────────────────────
@app.exception_handler(RequestValidationError)
async def validation_handler(request: Request, exc: RequestValidationError):
    errors = [
        {
            "field": " → ".join(str(l) for l in e["loc"]),
            "message": e["msg"]
        }
        for e in exc.errors()
    ]
    return JSONResponse(
        status_code=422,
        content={"success": False, "message": "Validation failed", "errors": errors}
    )

@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
    return JSONResponse(
        status_code=404,
        content={"success": False, "message": f"Route not found: {request.url.path}"}
    )

# ── Static Files ──────────────────────────────────
app.mount("/files", StaticFiles(directory="uploads"), name="files")

# ── Routers ───────────────────────────────────────
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(workspaces.router)
app.include_router(projects.router)
app.include_router(tasks.router)
app.include_router(comments.router)

# ── Root ──────────────────────────────────────────
@app.get("/", tags=["Root"])
def root():
    return {
        "message": "Task Manager API",
        "version": "1.0.0",
        "docs": "/docs",
        "redoc": "/redoc"
    }

Running the Final Project

uvicorn main:app --reload

Open http://localhost:8000/docs — you'll see the complete API organized into 6 sections:

  • Authentication
  • Users
  • Workspaces
  • Projects
  • Tasks
  • Comments

Complete Testing Flow

Test this complete scenario in /docs:

1. Register two users:

  • User A: gagan@email.com
  • User B: rahul@email.com

2. Login as User A → get token → Authorize in /docs

3. Create a workspace:

{"name": "My Team", "description": "Our workspace"}

4. Invite User B to workspace:

{"email": "rahul@email.com", "role": "member"}

Watch terminal — notification email prints in background.

5. Create a project:

{"name": "Website Redesign", "description": "Q1 project"}

6. Create a task:

{
    "title": "Design homepage mockup",
    "description": "Create Figma designs for the new homepage",
    "priority": "high",
    "assignee_id": 2
}

Watch terminal — assignment notification prints.

7. Login as User B → add a comment on the task

8. Upload an attachment to the task

9. Update task status to in_progress

10. Mark task as done


What You've Built

A complete production-grade REST API with:

✅ Clean project structure — routers, models, schemas separated
✅ JWT authentication — register, login, protected routes
✅ Role-based access — owner, admin, member
✅ Full CRUD — workspaces, projects, tasks, comments
✅ Relationships — nested resources, foreign keys
✅ Pagination — on all list endpoints
✅ Search and filtering — tasks by status, priority, assignee
✅ File uploads — attachments on tasks
✅ Background tasks — email notifications
✅ CORS — ready for frontend connection
✅ Middleware — request logging
✅ Custom exception handlers — consistent errors
✅ Auto documentation — /docs and /redoc

Connecting to Your Next.js Frontend

Since you already build Next.js apps — connecting is straightforward:

// lib/api.js in your Next.js project

const API_URL = "http://localhost:8000"

export async function login(email, password) {
    const res = await fetch(`${API_URL}/auth/login`, {
        method: "POST",
        headers: {"Content-Type": "application/json"},
        body: JSON.stringify({email, password})
    })
    return res.json()    // returns {access_token, user}
}

export async function getTasks(projectId, token) {
    const res = await fetch(`${API_URL}/projects/${projectId}/tasks`, {
        headers: {"Authorization": `Bearer ${token}`}
    })
    return res.json()
}

export async function createTask(projectId, taskData, token) {
    const res = await fetch(`${API_URL}/projects/${projectId}/tasks`, {
        method: "POST",
        headers: {
            "Content-Type": "application/json",
            "Authorization": `Bearer ${token}`
        },
        body: JSON.stringify(taskData)
    })
    return res.json()
}

Same Bearer token pattern your NestJS frontend already uses — zero learning curve.


What's Next

You now know FastAPI end to end. Depending on where you want to go:

Immediate next steps for this project:

  • Switch SQLite to PostgreSQL — one line change in .env
  • Deploy to a VPS with Nginx + Uvicorn
  • Add email sending with fastapi-mail
  • Add WebSockets for real-time task updates
  • Write tests with pytest

Deeper FastAPI topics:

  • Alembic migrations — managing database schema changes
  • WebSockets — real-time features
  • Celery + Redis — heavy background job processing
  • pytest — testing your API endpoints
  • Docker — containerizing your app

Given your NestJS and PostgreSQL background — switching to PostgreSQL and deploying this to your existing Azure infrastructure would be the natural next step. The code is already ready for it.


No comments:

Post a Comment

Final Real Project in Fast APIs

What We're Building A complete Task Management API — like a mini Trello/Jira backend. This project uses everything from all 6 stages ...