What is NumPy | Setup — Jupyter Notebook

What is Jupyter Notebook?

When you were learning Python, you wrote code in .py files and ran them in terminal. That works for building apps like FastAPI.

But for Data Science — everyone uses Jupyter Notebook. It's a different way of writing code where:

  • Code is written in cells — small blocks
  • You run one cell at a time and see output immediately below it
  • You can mix code, output, charts, and text in one file
  • Perfect for exploring data step by step

It looks like this:

┌─────────────────────────────────┐
│ import numpy as np              │  ← code cell
└─────────────────────────────────┘
  Output: nothing

┌─────────────────────────────────┐
│ a = np.array([1, 2, 3])         │  ← code cell
│ print(a)                        │
└─────────────────────────────────┘
  Output: [1 2 3]                    ← output appears right below

┌─────────────────────────────────┐
│ # this is a chart cell          │  ← code cell
│ plt.plot(a)                     │
└─────────────────────────────────┘
  Output: 📈 chart appears here

Every data scientist in the world uses this tool. You'll love it after 10 minutes.


Setting Up

Step 1 — Create Project Folder

mkdir data-science-learning
cd data-science-learning

Step 2 — Create Virtual Environment

python -m venv venv

# Windows
venv\Scripts\activate

# Mac/Linux
source venv/bin/activate

You should see (venv) in your terminal now.

Step 3 — Install Everything

pip install numpy pandas matplotlib seaborn jupyter

This installs all 4 libraries at once. It will take a minute — they're large packages.

Step 4 — Launch Jupyter Notebook

jupyter notebook

Your browser will automatically open at http://localhost:8888 showing a file explorer interface.


Creating Your First Notebook

In the Jupyter browser interface:

  1. Click "New" button on the top right
  2. Click "Python 3 (ipykernel)"
  3. A new tab opens — this is your notebook
  4. Click on "Untitled" at the top and rename it to numpy-basics

You'll see an empty cell waiting for you. This is where you write code.


How to Use Jupyter Cells

Write code in the cell
Press Shift + Enter → runs the cell and moves to next
Press Ctrl + Enter  → runs the cell and stays
Press A             → add cell Above current
Press B             → add cell Below current
Press DD            → delete current cell
Press M             → change cell to Markdown (text)
Press Y             → change cell back to Code

These shortcuts will become muscle memory quickly.


Your First Cell — Import NumPy

In your first cell type:

import numpy as np
print("NumPy version:", np.__version__)

Press Shift + Enter. Output:

NumPy version: 2.1.0

import numpy as np — you import NumPy and give it the alias np. This is a universal convention — every data scientist in the world writes np. Never write import numpy without the alias.


What is NumPy and Why Does It Exist?

The Problem with Python Lists

You already know Python lists. They work but they're slow for math:


    # Python list — slow way
    numbers = [1, 2, 3, 4, 5]

    # Multiply every number by 2
    doubled = []
    for n in numbers:
        doubled.append(n * 2)

    print(doubled)    # [2, 4, 6, 8, 10]

This works but imagine doing this on 10 million numbers. Python loop on a list is very slow.

NumPy Solution — Arrays


    import numpy as np

    numbers = np.array([1, 2, 3, 4, 5])
    doubled = numbers * 2

    print(doubled)    # [2 4 6 8 10]

No loop needed. NumPy does the operation on all elements at once — and it's 50-100x faster than a Python loop because NumPy is written in C under the hood.

This is called vectorization — applying an operation to an entire array at once instead of looping.


Step 2: Creating NumPy Arrays

In a new cell:


    import numpy as np

    # From a Python list
    arr1 = np.array([1, 2, 3, 4, 5])
    print(arr1)           # [1 2 3 4 5]
    print(type(arr1))     # <class 'numpy.ndarray'>

Notice the output — NumPy arrays print without commas between elements, unlike Python lists. ndarray = n-dimensional array.


Array Data Types

Every NumPy array has a single data type — all elements must be the same type:


    # Integer array
    int_arr = np.array([1, 2, 3, 4, 5])
    print(int_arr.dtype)     # int64

    # Float array
    float_arr = np.array([1.5, 2.5, 3.5])
    print(float_arr.dtype)   # float64

    # String array
    str_arr = np.array(["apple", "banana", "mango"])
    print(str_arr.dtype)     # <U6  (unicode string)

    # Mixed — NumPy converts everything to same type
    mixed = np.array([1, 2.5, 3])
    print(mixed)             # [1.  2.5 3. ]  — all converted to float
    print(mixed.dtype)       # float64

You can specify type explicitly:


    arr = np.array([1, 2, 3], dtype=float)
    print(arr)        # [1. 2. 3.]

    arr = np.array([1.9, 2.8, 3.7], dtype=int)
    print(arr)        # [1 2 3]  — decimal part cut off


Array Properties


    arr = np.array([10, 20, 30, 40, 50])

    print(arr.shape)      # (5,)  — 5 elements, 1 dimension
    print(arr.ndim)       # 1     — number of dimensions
    print(arr.size)       # 5     — total number of elements
    print(arr.dtype)      # int64 — data type


Ways to Create Arrays — Very Important

You'll use these constantly:

np.zeros() — array filled with zeros


    zeros = np.zeros(5)
    print(zeros)       # [0. 0. 0. 0. 0.]

    zeros_int = np.zeros(5, dtype=int)
    print(zeros_int)   # [0 0 0 0 0]

np.ones() — array filled with ones


    ones = np.ones(5)
    print(ones)        # [1. 1. 1. 1. 1.]

np.arange() — like Python range() but returns array


    arr = np.arange(10)
    print(arr)         # [0 1 2 3 4 5 6 7 8 9]

    arr = np.arange(1, 11)
    print(arr)         # [ 1  2  3  4  5  6  7  8  9 10]

    arr = np.arange(0, 20, 2)
    print(arr)         # [ 0  2  4  6  8 10 12 14 16 18]

    arr = np.arange(10, 0, -1)
    print(arr)         # [10  9  8  7  6  5  4  3  2  1]

np.linspace() — evenly spaced numbers between two values


    arr = np.linspace(0, 1, 5)
    print(arr)         # [0.   0.25 0.5  0.75 1.  ]

    arr = np.linspace(0, 100, 11)
    print(arr)         # [  0.  10.  20.  30.  40.  50.  60.  70.  80.  90. 100.]

linspace(start, stop, num) — gives you num evenly spaced points from start to stop. Unlike arange — the stop value IS included.

np.full() — array filled with a specific value


    arr = np.full(5, 7)
    print(arr)         # [7 7 7 7 7]

    arr = np.full(4, 3.14)
    print(arr)         # [3.14 3.14 3.14 3.14]


Step 3: 2D Arrays — The Real Power

A 2D array is like a table with rows and columns. This is how real data looks — datasets, images, matrices.

    # Create 2D array from list of lists
    matrix = np.array([
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]
    ])

    print(matrix)
    print()
    print("Shape:", matrix.shape)    # (3, 3) — 3 rows, 3 columns
    print("Dimensions:", matrix.ndim) # 2
    print("Total elements:", matrix.size) # 9

Output:

[[1 2 3]
 [4 5 6]
 [7 8 9]]

Shape: (3, 3)
Dimensions: 2
Total elements: 9

Creating 2D Arrays


    # 3x4 array of zeros (3 rows, 4 columns)
    zeros_2d = np.zeros((3, 4))
    print(zeros_2d)

Output:

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]

    # 2x3 array of ones
    ones_2d = np.ones((2, 3), dtype=int)
    print(ones_2d)

Output:

[[1 1 1]
 [1 1 1]]

    # Identity matrix — 1s on diagonal
    identity = np.eye(4)
    print(identity)

Output:

[[1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 0. 1.]]

Step 4: Indexing and Slicing

1D Array Indexing


    arr = np.array([10, 20, 30, 40, 50])
    #               0    1   2   3   4

    print(arr[0])     # 10  — first element
    print(arr[2])     # 30
    print(arr[-1])    # 50  — last element
    print(arr[-2])    # 40  — second from last

1D Array Slicing


    arr = np.array([10, 20, 30, 40, 50, 60, 70])

    print(arr[1:4])     # [20 30 40]  — index 1 to 3
    print(arr[:3])      # [10 20 30]  — first 3
    print(arr[3:])      # [40 50 60 70] — from index 3 to end
    print(arr[::2])     # [10 30 50 70] — every 2nd element
    print(arr[::-1])    # [70 60 50 40 30 20 10] — reversed

2D Array Indexing


    matrix = np.array([
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]
    ])

    # Access single element — [row, column]
    print(matrix[0, 0])    # 1  — row 0, col 0
    print(matrix[1, 2])    # 6  — row 1, col 2
    print(matrix[2, 1])    # 8  — row 2, col 1
    print(matrix[-1, -1])  # 9  — last row, last column

2D Array Slicing


    matrix = np.array([
        [1,  2,  3,  4],
        [5,  6,  7,  8],
        [9, 10, 11, 12]
    ])

    # Get entire row
    print(matrix[0])        # [1 2 3 4]  — first row
    print(matrix[1, :])     # [5 6 7 8]  — second row (explicit)

    # Get entire column
    print(matrix[:, 0])     # [1 5 9]   — first column
    print(matrix[:, 2])     # [3 7 11]  — third column

    # Get submatrix
    print(matrix[0:2, 1:3]) # rows 0-1, cols 1-2

Output of last line:

[[2 3]
 [6 7]]

Step 5: Array Operations

This is where NumPy really shines. Operations apply to every element automatically.

Basic Math


    arr = np.array([1, 2, 3, 4, 5])

    print(arr + 10)     # [11 12 13 14 15]
    print(arr - 3)      # [-2 -1  0  1  2]
    print(arr * 2)      # [ 2  4  6  8 10]
    print(arr / 2)      # [0.5 1.  1.5 2.  2.5]
    print(arr ** 2)     # [ 1  4  9 16 25]
    print(arr % 2)      # [1 0 1 0 1]  — remainder

Operations Between Two Arrays


    a = np.array([1, 2, 3, 4, 5])
    b = np.array([10, 20, 30, 40, 50])

    print(a + b)     # [11 22 33 44 55]
    print(a * b)     # [ 10  40  90 160 250]
    print(b / a)     # [10. 10. 10. 10. 10.]
    print(b - a)     # [ 9 18 27 36 45]

Element-wise — first element with first, second with second, and so on.

Math Functions


    arr = np.array([1, 4, 9, 16, 25])

    print(np.sqrt(arr))    # [1. 2. 3. 4. 5.]  — square root
    print(np.log(arr))     # natural log of each element
    print(np.exp(arr))     # e^x for each element
    print(np.abs(np.array([-3, -1, 0, 2, 4])))  # [3 1 0 2 4]


Step 6: Statistical Functions

These are used constantly in data analysis:


    data = np.array([23, 45, 12, 67, 34, 89, 56, 78, 43, 21])

    print("Sum:", np.sum(data))           # 468
    print("Mean:", np.mean(data))         # 46.8
    print("Median:", np.median(data))     # 44.0
    print("Std Dev:", np.std(data))       # 23.18...
    print("Variance:", np.var(data))      # 537.76
    print("Min:", np.min(data))           # 12
    print("Max:", np.max(data))           # 89
    print("Min index:", np.argmin(data))  # 2  — index of minimum value
    print("Max index:", np.argmax(data))  # 5  — index of maximum value
    print("Range:", np.max(data) - np.min(data))  # 77

On 2D Arrays — axis parameter


    scores = np.array([
        [85, 90, 78],    # student 1 — 3 subjects
        [92, 88, 95],    # student 2
        [76, 82, 79]     # student 3
    ])

    print(np.mean(scores))           # 85.0 — mean of all values

    print(np.mean(scores, axis=1))   # [84.33 91.67 79.0] — mean per row (per student)
    print(np.mean(scores, axis=0))   # [84.33 86.67 84.0] — mean per column (per subject)

    print(np.sum(scores, axis=1))    # [253 275 237] — total per student
    print(np.max(scores, axis=0))    # [92 90 95] — best score per subject

axis=0 means along rows (column-wise result) axis=1 means along columns (row-wise result)

This confuses everyone at first. Just remember:

  • axis=1 → result has one value per row
  • axis=0 → result has one value per column

Step 7: Boolean Indexing — Very Powerful

This is one of the most useful NumPy features for data filtering:


    marks = np.array([85, 42, 90, 38, 75, 55, 29, 91, 66, 48])

    # Create a boolean mask
    passing = marks >= 50
    print(passing)
    # [ True False  True False  True  True False  True  True False]

    # Use mask to filter
    print(marks[passing])
    # [85 90 75 55 91 66]

    # One liner
    print(marks[marks >= 50])
    # [85 90 75 55 91 66]

    # Multiple conditions
    print(marks[(marks >= 50) & (marks < 80)])
    # [75 55 66]  — between 50 and 80

    # How many students passed?
    print(np.sum(marks >= 50))     # 6

    # What percentage passed?
    print(np.mean(marks >= 50) * 100)  # 60.0%


Step 8: Random Numbers

Used constantly in ML for creating test data, initializing weights, etc:

# Set seed for reproducibility — same "random" numbers every run
np.random.seed(42)

# Random floats between 0 and 1
print(np.random.random(5))
# [0.374 0.951 0.732 0.599 0.156]

# Random integers
print(np.random.randint(1, 100, size=5))
# [52 93 15 72 61]

# Random 2D array
print(np.random.randint(0, 10, size=(3, 4)))
# [[6 3 7 4]
#  [6 9 2 6]
#  [7 4 3 7]]

# Random from normal distribution (bell curve)
# mean=0, std=1
normal = np.random.normal(0, 1, 1000)
print("Mean:", np.mean(normal).round(2))    # ~0.0
print("Std:", np.std(normal).round(2))      # ~1.0

# Random choice from array
options = np.array(["rock", "paper", "scissors"])
print(np.random.choice(options, size=5))
# ['paper' 'rock' 'scissors' 'rock' 'paper']

np.random.seed(42) — setting a seed means your "random" numbers are always the same. Crucial in ML so your experiments are reproducible.


Step 9: Reshaping Arrays


    arr = np.arange(12)
    print(arr)          # [ 0  1  2  3  4  5  6  7  8  9 10 11]
    print(arr.shape)    # (12,)

    # Reshape to 3x4 matrix
    matrix = arr.reshape(3, 4)
    print(matrix)
    print(matrix.shape)    # (3, 4)

Output:

[[ 0  1  2  3]
 [ 4  5  6  7]
 [ 8  9 10 11]]
(3, 4)

    # -1 means "figure it out automatically"
    arr = np.arange(12)
    print(arr.reshape(3, -1))    # 3 rows, NumPy calculates 4 columns
    print(arr.reshape(-1, 6))    # NumPy calculates 2 rows, 6 columns

    # Flatten 2D back to 1D
    matrix = np.array([[1,2,3],[4,5,6]])
    print(matrix.flatten())    # [1 2 3 4 5 6]


Real World Example — Student Grade Analysis

Let's put everything together in one practical example:

import numpy as np

# Exam scores for 5 students across 4 subjects
# Rows = students, Columns = subjects (Math, Science, English, History)
np.random.seed(42)
scores = np.random.randint(40, 100, size=(5, 4))

students = ["Rahul", "Priya", "Gagan", "Amit", "Neha"]
subjects = ["Math", "Science", "English", "History"]

print("=== Raw Scores ===")
print(scores)
print()

# Average per student (axis=1 = across columns)
student_avg = np.mean(scores, axis=1)
print("=== Student Averages ===")
for i, name in enumerate(students):
    print(f"{name}: {student_avg[i]:.1f}")

print()

# Average per subject (axis=0 = across rows)
subject_avg = np.mean(scores, axis=0)
print("=== Subject Averages ===")
for i, subject in enumerate(subjects):
    print(f"{subject}: {subject_avg[i]:.1f}")

print()

# Best and worst students
best_idx = np.argmax(student_avg)
worst_idx = np.argmin(student_avg)
print(f"Best student: {students[best_idx]} ({student_avg[best_idx]:.1f})")
print(f"Needs help: {students[worst_idx]} ({student_avg[worst_idx]:.1f})")

print()

# How many students passed each subject (>=50)
passed_per_subject = np.sum(scores >= 50, axis=0)
print("=== Pass Count Per Subject ===")
for i, subject in enumerate(subjects):
    print(f"{subject}: {passed_per_subject[i]}/5 passed")

print()

# Grade each student
print("=== Grades ===")
for i, name in enumerate(students):
    avg = student_avg[i]
    if avg >= 85:
        grade = "A"
    elif avg >= 70:
        grade = "B"
    elif avg >= 55:
        grade = "C"
    else:
        grade = "F"
    print(f"{name}: {avg:.1f} → Grade {grade}")

Output:

=== Raw Scores ===
[[71 60 57 85]
 [74 77 55 74]
 [49 78 95 80]
 [54 68 95 65]
 [65 71 47 93]]

=== Student Averages ===
Rahul: 68.2
Priya: 70.0
Gagan: 75.5
Amit: 70.5
Neha: 69.0

=== Subject Averages ===
Math: 62.6
Science: 70.8
English: 69.8
History: 79.4

Best student: Gagan (75.5)
Needs help: Rahul (68.2)

=== Pass Count Per Subject ===
Math: 4/5 passed
Science: 5/5 passed
English: 4/5 passed
History: 5/5 passed

=== Grades ===
Rahul: 68.2 → Grade C
Priya: 70.0 → Grade B
Gagan: 75.5 → Grade B
Amit: 70.5 → Grade B
Neha: 69.0 → Grade C

This is actual data analysis — loading data, computing statistics, finding insights. You just did your first data analysis with NumPy.


Quick Reference — NumPy Cheat Sheet


    import numpy as np

    # Creating arrays
    np.array([1,2,3])              # from list
    np.zeros(5)                    # [0. 0. 0. 0. 0.]
    np.ones((3,4))                 # 3x4 matrix of ones
    np.arange(0, 10, 2)            # [0 2 4 6 8]
    np.linspace(0, 1, 5)           # 5 evenly spaced points
    np.random.randint(0, 100, 10)  # 10 random integers
    np.random.seed(42)             # reproducibility

    # Array info
    arr.shape      # dimensions
    arr.ndim       # number of dimensions
    arr.size       # total elements
    arr.dtype      # data type

    # Indexing
    arr[0]          # first element
    arr[-1]         # last element
    arr[1:4]        # slice
    arr[::2]        # every 2nd
    matrix[1, 2]    # row 1, col 2
    matrix[:, 0]    # entire first column
    matrix[0, :]    # entire first row

    # Operations
    arr + 5         # add 5 to all
    arr * 2         # multiply all by 2
    arr ** 2        # square all
    np.sqrt(arr)    # square root

    # Statistics
    np.sum(arr)
    np.mean(arr)
    np.median(arr)
    np.std(arr)
    np.min(arr)
    np.max(arr)
    np.argmin(arr)  # index of min
    np.argmax(arr)  # index of max

    # 2D statistics
    np.mean(matrix, axis=0)   # column means
    np.mean(matrix, axis=1)   # row means

    # Filtering
    arr[arr > 50]              # values greater than 50
    arr[(arr > 20) & (arr < 80)]  # between 20 and 80

    # Reshape
    arr.reshape(3, 4)    # reshape to 3x4
    arr.flatten()        # back to 1D


Exercise 🏋️

Create a new Jupyter notebook cell and solve this:

Sales Analysis:

# Monthly sales data for 4 products over 6 months
# Each row = one product, each column = one month
sales = np.array([
    [1200, 1500, 1100, 1800, 2000, 1600],  # Product A
    [800,  950,  870,  1100, 1250, 900],   # Product B
    [2100, 1900, 2300, 2100, 1800, 2500],  # Product C
    [500,  600,  550,  700,  650,  720]    # Product D
])

products = ["Product A", "Product B", "Product C", "Product D"]
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun"]

Find and print:

  1. Total sales per product over 6 months
  2. Average monthly sales per product
  3. Best performing product (highest total)
  4. Worst performing product (lowest total)
  5. Best sales month overall (sum across all products)
  6. Which product had sales above 1000 in every month
  7. Total company revenue per month


What We're Learning Now — NumPy & Data Science Foundation

The Big Picture

You've completed:

  • ✅ Core Python
  • ✅ FastAPI

You are now starting:

  • 🎯 Phase 1 — Python for Data Science

This phase has 3 libraries in order:

NumPy → Pandas → Matplotlib

These 3 libraries are the absolute foundation of everything in Data Science and Machine Learning. Every ML engineer uses them daily. You cannot skip these.


Phase 1 Roadmap — What We'll Cover

Stage 1 — NumPy (2 weeks)

NumPy = Numerical Python. It handles arrays and mathematical operations at very high speed. Every other data library is built on top of NumPy.

Topics:

  • What is NumPy and why it exists
  • Arrays — creating, indexing, slicing
  • Array operations — math, comparisons
  • Shape and reshaping
  • Statistical functions — mean, median, std
  • Random number generation
  • Real world use cases

Stage 2 — Pandas (2-3 weeks)

Pandas is for working with structured data — like Excel but in Python. Real world data always comes as tables — CSV files, database exports, API responses. Pandas handles all of it.

Topics:

  • Series and DataFrame — core data structures
  • Loading data — CSV, Excel, JSON
  • Exploring data — info, describe, shape
  • Selecting and filtering data
  • Handling missing values
  • Grouping and aggregation
  • Merging and joining datasets
  • Real world data cleaning

Stage 3 — Matplotlib and Seaborn (1 week)

Turning data into charts and graphs. Every data analysis ends with visualization.

Topics:

  • Line charts, bar charts, pie charts
  • Scatter plots, histograms
  • Seaborn for beautiful statistical charts
  • Customizing charts

After Phase 1 — What Comes Next

Once you finish these 3 libraries, you'll move to:

Phase 2 → pytest (1-2 weeks)
Phase 3 → scikit-learn / Machine Learning (4-6 weeks)
Phase 4 → OpenAI API + LangChain / AI Integration (2-3 weeks)

Why NumPy First

Everything in data science is built on NumPy:

NumPy          ← foundation, everything runs on this
   ↓
Pandas         ← built on NumPy
   ↓
Matplotlib     ← built on NumPy
   ↓
scikit-learn   ← built on NumPy
   ↓
TensorFlow     ← built on NumPy
PyTorch        ← built on NumPy

If you understand NumPy well — everything else makes sense faster.


Tools You'll Need

Jupyter Notebook — this is how data scientists write code. Instead of running a .py file, you write code in cells and see output immediately. Perfect for data analysis.

We'll set this up in the very first step.

Kaggle — free platform with real datasets, notebooks, and competitions. Create a free account at kaggle.com — you'll need it from Stage 2 onwards.


What You'll Be Able to Do After Phase 1

After completing NumPy + Pandas + Matplotlib you will be able to:

  • Load any CSV or Excel file into Python
  • Clean messy real world data
  • Filter, sort, group, and summarize data
  • Calculate statistics on datasets
  • Find patterns in data
  • Create professional charts and graphs
  • Prepare data for machine learning

This is exactly what a Data Analyst does — and it's a well paying job on its own. You'll also have the foundation to move into ML.

Final Real Project in Fast APIs

What We're Building

A complete Task Management API — like a mini Trello/Jira backend.

This project uses everything from all 6 stages together in a real, production-quality structure. When done, you'll have a portfolio project you can actually show.


Features

✅ User registration and login (JWT auth)
✅ Workspaces — users create workspaces, invite members
✅ Projects — inside workspaces
✅ Tasks — inside projects, assign to members
✅ Comments — on tasks
✅ File attachments — on tasks
✅ Role based access — owner, admin, member
✅ Background email notifications
✅ Pagination on all list endpoints
✅ Full filtering and search
✅ Consistent API responses
✅ Complete /docs documentation

Final Project Structure

task-manager-api/
├── routers/
│   ├── __init__.py
│   ├── auth.py
│   ├── users.py
│   ├── workspaces.py
│   ├── projects.py
│   ├── tasks.py
│   └── comments.py
├── uploads/
├── venv/
├── .env
├── .gitignore
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth_utils.py
├── dependencies.py
├── config.py
├── utils.py
└── requirements.txt

Step 1 — Setup

mkdir task-manager-api
cd task-manager-api
python -m venv venv
venv\Scripts\activate     # Windows
source venv/bin/activate  # Mac/Linux

pip install fastapi uvicorn sqlalchemy passlib[bcrypt] python-jose[cryptography] python-multipart python-dotenv pydantic-settings

pip freeze > requirements.txt

Create .env:

SECRET_KEY=super-secret-key-make-this-very-long-and-random-in-production
DATABASE_URL=sqlite:///./taskmanager.db
ACCESS_TOKEN_EXPIRE_MINUTES=60

Create .gitignore:

venv/
__pycache__/
*.pyc
.env
*.db
uploads/

Step 2 — config.py

# config.py

from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    secret_key: str
    database_url: str = "sqlite:///./taskmanager.db"
    access_token_expire_minutes: int = 60

    class Config:
        env_file = ".env"


settings = Settings()

Step 3 — database.py

# database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config import settings

engine = create_engine(
    settings.database_url,
    connect_args={"check_same_thread": False}
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Step 4 — models.py

This is the core — all database tables:

# models.py

from sqlalchemy import (
    Column, Integer, String, Boolean, DateTime,
    Text, ForeignKey, Enum as SQLEnum
)
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
import enum


# ── Enums ─────────────────────────────────────────

class WorkspaceRole(str, enum.Enum):
    owner = "owner"
    admin = "admin"
    member = "member"

class TaskStatus(str, enum.Enum):
    todo = "todo"
    in_progress = "in_progress"
    in_review = "in_review"
    done = "done"

class TaskPriority(str, enum.Enum):
    low = "low"
    medium = "medium"
    high = "high"
    urgent = "urgent"


# ── Tables ────────────────────────────────────────

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), nullable=False)
    email = Column(String(100), unique=True, nullable=False, index=True)
    password = Column(String(255), nullable=False)
    avatar = Column(String(255), nullable=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())

    workspace_memberships = relationship("WorkspaceMember", back_populates="user")
    assigned_tasks = relationship("Task", back_populates="assignee", foreign_keys="Task.assignee_id")
    comments = relationship("Comment", back_populates="author")


class Workspace(Base):
    __tablename__ = "workspaces"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    created_at = Column(DateTime, server_default=func.now())
    owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    owner = relationship("User")
    members = relationship("WorkspaceMember", back_populates="workspace")
    projects = relationship("Project", back_populates="workspace")


class WorkspaceMember(Base):
    __tablename__ = "workspace_members"

    id = Column(Integer, primary_key=True, index=True)
    workspace_id = Column(Integer, ForeignKey("workspaces.id"), nullable=False)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    role = Column(SQLEnum(WorkspaceRole), default=WorkspaceRole.member)
    joined_at = Column(DateTime, server_default=func.now())

    workspace = relationship("Workspace", back_populates="members")
    user = relationship("User", back_populates="workspace_memberships")


class Project(Base):
    __tablename__ = "projects"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())
    workspace_id = Column(Integer, ForeignKey("workspaces.id"), nullable=False)

    workspace = relationship("Workspace", back_populates="projects")
    tasks = relationship("Task", back_populates="project")


class Task(Base):
    __tablename__ = "tasks"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    description = Column(Text, nullable=True)
    status = Column(SQLEnum(TaskStatus), default=TaskStatus.todo)
    priority = Column(SQLEnum(TaskPriority), default=TaskPriority.medium)
    due_date = Column(DateTime, nullable=True)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

    project_id = Column(Integer, ForeignKey("projects.id"), nullable=False)
    creator_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    assignee_id = Column(Integer, ForeignKey("users.id"), nullable=True)

    project = relationship("Project", back_populates="tasks")
    creator = relationship("User", foreign_keys=[creator_id])
    assignee = relationship("User", back_populates="assigned_tasks", foreign_keys=[assignee_id])
    comments = relationship("Comment", back_populates="task")
    attachments = relationship("Attachment", back_populates="task")


class Comment(Base):
    __tablename__ = "comments"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

    task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    task = relationship("Task", back_populates="comments")
    author = relationship("User", back_populates="comments")


class Attachment(Base):
    __tablename__ = "attachments"

    id = Column(Integer, primary_key=True, index=True)
    filename = Column(String(255), nullable=False)
    original_name = Column(String(255), nullable=False)
    file_size = Column(Integer, nullable=False)
    content_type = Column(String(100), nullable=False)
    uploaded_at = Column(DateTime, server_default=func.now())

    task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
    uploaded_by = Column(Integer, ForeignKey("users.id"), nullable=False)

    task = relationship("Task", back_populates="attachments")
    uploader = relationship("User")

Step 5 — auth_utils.py

# auth_utils.py

from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status
from config import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)


def create_access_token(data: dict) -> str:
    payload = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
    payload.update({"exp": expire})
    return jwt.encode(payload, settings.secret_key, algorithm="HS256")


def verify_token(token: str) -> dict:
    try:
        return jwt.decode(token, settings.secret_key, algorithms=["HS256"])
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token",
            headers={"WWW-Authenticate": "Bearer"}
        )

Step 6 — schemas.py

# schemas.py

from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from typing import Generic, TypeVar
from models import TaskStatus, TaskPriority, WorkspaceRole

T = TypeVar("T")


# ── Generic Response Wrapper ──────────────────────

class APIResponse(BaseModel, Generic[T]):
    success: bool = True
    message: str = "Success"
    data: T | None = None


class PaginatedResponse(BaseModel, Generic[T]):
    success: bool = True
    data: list[T] = []
    total: int = 0
    page: int = 1
    per_page: int = 10
    total_pages: int = 0
    has_next: bool = False
    has_prev: bool = False


# ── User Schemas ─────────────────────────────────

class UserCreate(BaseModel):
    name: str = Field(min_length=2, max_length=50)
    email: str
    password: str = Field(min_length=8)

    @field_validator("email")
    @classmethod
    def normalize_email(cls, v: str) -> str:
        return v.lower().strip()

    @field_validator("password")
    @classmethod
    def validate_password(cls, v: str) -> str:
        if v.isdigit():
            raise ValueError("Password cannot be only numbers")
        if v.isalpha():
            raise ValueError("Password must include at least one number")
        return v


class UserUpdate(BaseModel):
    name: str | None = Field(default=None, min_length=2)


class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    avatar: str | None
    is_active: bool
    created_at: datetime

    model_config = {"from_attributes": True}


class LoginRequest(BaseModel):
    email: str
    password: str


class TokenResponse(BaseModel):
    access_token: str
    token_type: str = "bearer"
    user: UserResponse


# ── Workspace Schemas ─────────────────────────────

class WorkspaceCreate(BaseModel):
    name: str = Field(min_length=2, max_length=100)
    description: str | None = Field(default=None, max_length=500)


class WorkspaceUpdate(BaseModel):
    name: str | None = Field(default=None, min_length=2)
    description: str | None = None


class WorkspaceMemberResponse(BaseModel):
    id: int
    user: UserResponse
    role: WorkspaceRole
    joined_at: datetime

    model_config = {"from_attributes": True}


class WorkspaceResponse(BaseModel):
    id: int
    name: str
    description: str | None
    owner_id: int
    created_at: datetime
    member_count: int = 0

    model_config = {"from_attributes": True}


class InviteMember(BaseModel):
    email: str
    role: WorkspaceRole = WorkspaceRole.member


# ── Project Schemas ───────────────────────────────

class ProjectCreate(BaseModel):
    name: str = Field(min_length=2, max_length=100)
    description: str | None = Field(default=None, max_length=500)


class ProjectUpdate(BaseModel):
    name: str | None = Field(default=None, min_length=2)
    description: str | None = None
    is_active: bool | None = None


class ProjectResponse(BaseModel):
    id: int
    name: str
    description: str | None
    is_active: bool
    workspace_id: int
    created_at: datetime
    task_count: int = 0

    model_config = {"from_attributes": True}


# ── Task Schemas ──────────────────────────────────

class TaskCreate(BaseModel):
    title: str = Field(min_length=3, max_length=200)
    description: str | None = None
    priority: TaskPriority = TaskPriority.medium
    due_date: datetime | None = None
    assignee_id: int | None = None


class TaskUpdate(BaseModel):
    title: str | None = Field(default=None, min_length=3)
    description: str | None = None
    status: TaskStatus | None = None
    priority: TaskPriority | None = None
    due_date: datetime | None = None
    assignee_id: int | None = None


class TaskResponse(BaseModel):
    id: int
    title: str
    description: str | None
    status: TaskStatus
    priority: TaskPriority
    due_date: datetime | None
    project_id: int
    creator_id: int
    assignee_id: int | None
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


class TaskDetailResponse(BaseModel):
    id: int
    title: str
    description: str | None
    status: TaskStatus
    priority: TaskPriority
    due_date: datetime | None
    created_at: datetime
    updated_at: datetime
    creator: UserResponse
    assignee: UserResponse | None
    comment_count: int = 0

    model_config = {"from_attributes": True}


# ── Comment Schemas ───────────────────────────────

class CommentCreate(BaseModel):
    content: str = Field(min_length=1, max_length=2000)


class CommentUpdate(BaseModel):
    content: str = Field(min_length=1, max_length=2000)


class CommentResponse(BaseModel):
    id: int
    content: str
    task_id: int
    author: UserResponse
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


# ── Attachment Schema ─────────────────────────────

class AttachmentResponse(BaseModel):
    id: int
    filename: str
    original_name: str
    file_size: int
    content_type: str
    uploaded_at: datetime
    task_id: int

    model_config = {"from_attributes": True}

Step 7 — utils.py

# utils.py

from sqlalchemy.orm import Session


def paginate(query, page: int = 1, per_page: int = 10) -> dict:
    """Apply pagination to any SQLAlchemy query."""
    total = query.count()
    total_pages = (total + per_page - 1) // per_page
    items = query.offset((page - 1) * per_page).limit(per_page).all()

    return {
        "data": items,
        "total": total,
        "page": page,
        "per_page": per_page,
        "total_pages": total_pages,
        "has_next": page < total_pages,
        "has_prev": page > 1
    }


def send_notification_email(to_email: str, subject: str, body: str):
    """Simulate sending email — replace with real email library in production."""
    print(f"\n📧 EMAIL TO: {to_email}")
    print(f"   SUBJECT: {subject}")
    print(f"   BODY: {body}\n")

Step 8 — dependencies.py

# dependencies.py

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from database import get_db
import auth_utils
import models

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")


def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> models.User:
    payload = auth_utils.verify_token(token)
    user_id = payload.get("user_id")

    if not user_id:
        raise HTTPException(status_code=401, detail="Invalid token")

    user = db.query(models.User).filter(models.User.id == user_id).first()

    if not user:
        raise HTTPException(status_code=401, detail="User not found")

    if not user.is_active:
        raise HTTPException(status_code=403, detail="Account deactivated")

    return user


def get_workspace_member(
    workspace_id: int,
    current_user: models.User = Depends(get_current_user),
    db: Session = Depends(get_db)
) -> models.WorkspaceMember:
    """Verify user is a member of the workspace."""
    membership = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id,
        models.WorkspaceMember.user_id == current_user.id
    ).first()

    if not membership:
        raise HTTPException(
            status_code=403,
            detail="You are not a member of this workspace"
        )

    return membership


def get_workspace_admin(
    membership: models.WorkspaceMember = Depends(get_workspace_member)
) -> models.WorkspaceMember:
    """Verify user is admin or owner of the workspace."""
    if membership.role not in [models.WorkspaceRole.owner, models.WorkspaceRole.admin]:
        raise HTTPException(
            status_code=403,
            detail="Admin access required"
        )
    return membership

Step 9 — All Routers

routers/auth.py

# routers/auth.py

from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import models
import schemas
import auth_utils
from database import get_db
from dependencies import get_current_user

router = APIRouter(prefix="/auth", tags=["Authentication"])


@router.post("/register", response_model=schemas.UserResponse, status_code=201)
def register(user_data: schemas.UserCreate, db: Session = Depends(get_db)):
    if db.query(models.User).filter(models.User.email == user_data.email).first():
        raise HTTPException(status_code=400, detail="Email already registered")

    hashed = auth_utils.hash_password(user_data.password)
    user = models.User(
        name=user_data.name,
        email=user_data.email,
        password=hashed
    )
    db.add(user)
    db.commit()
    db.refresh(user)
    return user


@router.post("/login", response_model=schemas.TokenResponse)
def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
    user = db.query(models.User).filter(
        models.User.email == credentials.email.lower()
    ).first()

    if not user or not auth_utils.verify_password(credentials.password, user.password):
        raise HTTPException(status_code=401, detail="Invalid email or password")

    if not user.is_active:
        raise HTTPException(status_code=403, detail="Account deactivated")

    token = auth_utils.create_access_token({"user_id": user.id})
    return {"access_token": token, "token_type": "bearer", "user": user}


@router.post("/token")    # for /docs Authorize button
def login_for_docs(
    form_data: schemas.LoginRequest,
    db: Session = Depends(get_db)
):
    user = db.query(models.User).filter(
        models.User.email == form_data.email.lower()
    ).first()
    if not user or not auth_utils.verify_password(form_data.password, user.password):
        raise HTTPException(status_code=401, detail="Invalid credentials")
    token = auth_utils.create_access_token({"user_id": user.id})
    return {"access_token": token, "token_type": "bearer"}


@router.get("/me", response_model=schemas.UserResponse)
def get_me(current_user: models.User = Depends(get_current_user)):
    return current_user


@router.patch("/me", response_model=schemas.UserResponse)
def update_me(
    update: schemas.UserUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    if update.name:
        current_user.name = update.name
    db.commit()
    db.refresh(current_user)
    return current_user

routers/workspaces.py

# routers/workspaces.py

from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member, get_workspace_admin
from utils import paginate, send_notification_email

router = APIRouter(prefix="/workspaces", tags=["Workspaces"])


@router.post("", response_model=schemas.WorkspaceResponse, status_code=201)
def create_workspace(
    data: schemas.WorkspaceCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    workspace = models.Workspace(
        name=data.name,
        description=data.description,
        owner_id=current_user.id
    )
    db.add(workspace)
    db.flush()    # get ID before commit

    # Add creator as owner member
    member = models.WorkspaceMember(
        workspace_id=workspace.id,
        user_id=current_user.id,
        role=models.WorkspaceRole.owner
    )
    db.add(member)
    db.commit()
    db.refresh(workspace)

    workspace.member_count = 1
    return workspace


@router.get("", response_model=schemas.PaginatedResponse[schemas.WorkspaceResponse])
def get_my_workspaces(
    page: int = 1,
    per_page: int = 10,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    query = db.query(models.Workspace).join(
        models.WorkspaceMember
    ).filter(
        models.WorkspaceMember.user_id == current_user.id
    )

    result = paginate(query, page, per_page)

    for ws in result["data"]:
        ws.member_count = db.query(models.WorkspaceMember).filter(
            models.WorkspaceMember.workspace_id == ws.id
        ).count()

    return result


@router.get("/{workspace_id}", response_model=schemas.WorkspaceResponse)
def get_workspace(
    workspace_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    workspace = db.query(models.Workspace).filter(
        models.Workspace.id == workspace_id
    ).first()

    workspace.member_count = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id
    ).count()

    return workspace


@router.patch("/{workspace_id}", response_model=schemas.WorkspaceResponse)
def update_workspace(
    workspace_id: int,
    data: schemas.WorkspaceUpdate,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    workspace = db.query(models.Workspace).filter(
        models.Workspace.id == workspace_id
    ).first()

    if data.name:
        workspace.name = data.name
    if data.description is not None:
        workspace.description = data.description

    db.commit()
    db.refresh(workspace)
    workspace.member_count = len(workspace.members)
    return workspace


@router.post("/{workspace_id}/invite", response_model=schemas.WorkspaceMemberResponse)
def invite_member(
    workspace_id: int,
    invite: schemas.InviteMember,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    # Find user by email
    user = db.query(models.User).filter(
        models.User.email == invite.email.lower()
    ).first()

    if not user:
        raise HTTPException(status_code=404, detail="User with this email not found")

    # Check already member
    existing = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id,
        models.WorkspaceMember.user_id == user.id
    ).first()

    if existing:
        raise HTTPException(status_code=400, detail="User is already a member")

    # Add member
    new_member = models.WorkspaceMember(
        workspace_id=workspace_id,
        user_id=user.id,
        role=invite.role
    )
    db.add(new_member)
    db.commit()
    db.refresh(new_member)

    # Send notification in background
    workspace = db.query(models.Workspace).filter(
        models.Workspace.id == workspace_id
    ).first()

    background_tasks.add_task(
        send_notification_email,
        user.email,
        f"You've been added to {workspace.name}",
        f"Hi {user.name}, you've been added as {invite.role} to workspace '{workspace.name}'"
    )

    return new_member


@router.get("/{workspace_id}/members", response_model=list[schemas.WorkspaceMemberResponse])
def get_members(
    workspace_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    return db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id
    ).all()


@router.delete("/{workspace_id}/members/{user_id}", status_code=204)
def remove_member(
    workspace_id: int,
    user_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    member = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id,
        models.WorkspaceMember.user_id == user_id
    ).first()

    if not member:
        raise HTTPException(status_code=404, detail="Member not found")

    if member.role == models.WorkspaceRole.owner:
        raise HTTPException(status_code=400, detail="Cannot remove workspace owner")

    db.delete(member)
    db.commit()

routers/projects.py

# routers/projects.py

from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member, get_workspace_admin
from utils import paginate

router = APIRouter(prefix="/workspaces/{workspace_id}/projects", tags=["Projects"])


@router.post("", response_model=schemas.ProjectResponse, status_code=201)
def create_project(
    workspace_id: int,
    data: schemas.ProjectCreate,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    project = models.Project(
        name=data.name,
        description=data.description,
        workspace_id=workspace_id
    )
    db.add(project)
    db.commit()
    db.refresh(project)
    project.task_count = 0
    return project


@router.get("", response_model=schemas.PaginatedResponse[schemas.ProjectResponse])
def get_projects(
    workspace_id: int,
    page: int = 1,
    per_page: int = 10,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    query = db.query(models.Project).filter(
        models.Project.workspace_id == workspace_id
    )
    result = paginate(query, page, per_page)

    for project in result["data"]:
        project.task_count = db.query(models.Task).filter(
            models.Task.project_id == project.id
        ).count()

    return result


@router.get("/{project_id}", response_model=schemas.ProjectResponse)
def get_project(
    workspace_id: int,
    project_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    project = db.query(models.Project).filter(
        models.Project.id == project_id,
        models.Project.workspace_id == workspace_id
    ).first()

    if not project:
        raise HTTPException(status_code=404, detail="Project not found")

    project.task_count = db.query(models.Task).filter(
        models.Task.project_id == project_id
    ).count()

    return project


@router.patch("/{project_id}", response_model=schemas.ProjectResponse)
def update_project(
    workspace_id: int,
    project_id: int,
    data: schemas.ProjectUpdate,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    project = db.query(models.Project).filter(
        models.Project.id == project_id,
        models.Project.workspace_id == workspace_id
    ).first()

    if not project:
        raise HTTPException(status_code=404, detail="Project not found")

    update_data = data.model_dump(exclude_none=True)
    for field, value in update_data.items():
        setattr(project, field, value)

    db.commit()
    db.refresh(project)
    project.task_count = len(project.tasks)
    return project


@router.delete("/{project_id}", status_code=204)
def delete_project(
    workspace_id: int,
    project_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    project = db.query(models.Project).filter(
        models.Project.id == project_id,
        models.Project.workspace_id == workspace_id
    ).first()

    if not project:
        raise HTTPException(status_code=404, detail="Project not found")

    db.delete(project)
    db.commit()

routers/tasks.py

# routers/tasks.py

from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, File, UploadFile
from sqlalchemy.orm import Session
from sqlalchemy import or_
import os
import uuid
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member
from utils import paginate, send_notification_email
from models import TaskStatus, TaskPriority

router = APIRouter(prefix="/projects/{project_id}/tasks", tags=["Tasks"])

UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)


def get_project_or_404(project_id: int, db: Session) -> models.Project:
    project = db.query(models.Project).filter(models.Project.id == project_id).first()
    if not project:
        raise HTTPException(status_code=404, detail="Project not found")
    return project


@router.post("", response_model=schemas.TaskResponse, status_code=201)
def create_task(
    project_id: int,
    data: schemas.TaskCreate,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    project = get_project_or_404(project_id, db)

    task = models.Task(
        title=data.title,
        description=data.description,
        priority=data.priority,
        due_date=data.due_date,
        assignee_id=data.assignee_id,
        project_id=project_id,
        creator_id=current_user.id
    )
    db.add(task)
    db.commit()
    db.refresh(task)

    # Notify assignee in background
    if data.assignee_id and data.assignee_id != current_user.id:
        assignee = db.query(models.User).filter(
            models.User.id == data.assignee_id
        ).first()
        if assignee:
            background_tasks.add_task(
                send_notification_email,
                assignee.email,
                f"New task assigned: {task.title}",
                f"Hi {assignee.name}, you have been assigned task '{task.title}' in project '{project.name}'"
            )

    return task


@router.get("", response_model=schemas.PaginatedResponse[schemas.TaskResponse])
def get_tasks(
    project_id: int,
    page: int = 1,
    per_page: int = 10,
    status: TaskStatus | None = None,
    priority: TaskPriority | None = None,
    assignee_id: int | None = None,
    search: str | None = None,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    get_project_or_404(project_id, db)

    query = db.query(models.Task).filter(models.Task.project_id == project_id)

    if status:
        query = query.filter(models.Task.status == status)
    if priority:
        query = query.filter(models.Task.priority == priority)
    if assignee_id:
        query = query.filter(models.Task.assignee_id == assignee_id)
    if search:
        query = query.filter(
            or_(
                models.Task.title.ilike(f"%{search}%"),
                models.Task.description.ilike(f"%{search}%")
            )
        )

    return paginate(query, page, per_page)


@router.get("/{task_id}", response_model=schemas.TaskDetailResponse)
def get_task(
    project_id: int,
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    task.comment_count = db.query(models.Comment).filter(
        models.Comment.task_id == task_id
    ).count()

    return task


@router.patch("/{task_id}", response_model=schemas.TaskResponse)
def update_task(
    project_id: int,
    task_id: int,
    data: schemas.TaskUpdate,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    old_assignee_id = task.assignee_id
    update_data = data.model_dump(exclude_none=True)
    for field, value in update_data.items():
        setattr(task, field, value)

    db.commit()
    db.refresh(task)

    # Notify new assignee if changed
    if data.assignee_id and data.assignee_id != old_assignee_id:
        new_assignee = db.query(models.User).filter(
            models.User.id == data.assignee_id
        ).first()
        if new_assignee:
            background_tasks.add_task(
                send_notification_email,
                new_assignee.email,
                f"Task reassigned to you: {task.title}",
                f"Hi {new_assignee.name}, task '{task.title}' has been assigned to you."
            )

    return task


@router.delete("/{task_id}", status_code=204)
def delete_task(
    project_id: int,
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    if task.creator_id != current_user.id:
        raise HTTPException(status_code=403, detail="Only task creator can delete")

    db.delete(task)
    db.commit()


@router.post("/{task_id}/attachments", response_model=schemas.AttachmentResponse)
async def upload_attachment(
    project_id: int,
    task_id: int,
    file: UploadFile = File(...),
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    content = await file.read()

    if len(content) > 10 * 1024 * 1024:
        raise HTTPException(status_code=400, detail="File too large. Max 10MB")

    extension = file.filename.split(".")[-1].lower()
    unique_name = f"{uuid.uuid4()}.{extension}"
    file_path = os.path.join(UPLOAD_DIR, unique_name)

    with open(file_path, "wb") as f:
        f.write(content)

    attachment = models.Attachment(
        filename=unique_name,
        original_name=file.filename,
        file_size=len(content),
        content_type=file.content_type,
        task_id=task_id,
        uploaded_by=current_user.id
    )
    db.add(attachment)
    db.commit()
    db.refresh(attachment)

    return attachment


@router.get("/{task_id}/attachments", response_model=list[schemas.AttachmentResponse])
def get_attachments(
    project_id: int,
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    return db.query(models.Attachment).filter(
        models.Attachment.task_id == task_id
    ).all()

routers/comments.py

# routers/comments.py

from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user
from utils import paginate

router = APIRouter(prefix="/tasks/{task_id}/comments", tags=["Comments"])


@router.post("", response_model=schemas.CommentResponse, status_code=201)
def create_comment(
    task_id: int,
    data: schemas.CommentCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(models.Task.id == task_id).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    comment = models.Comment(
        content=data.content,
        task_id=task_id,
        author_id=current_user.id
    )
    db.add(comment)
    db.commit()
    db.refresh(comment)
    return comment


@router.get("", response_model=schemas.PaginatedResponse[schemas.CommentResponse])
def get_comments(
    task_id: int,
    page: int = 1,
    per_page: int = 20,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    query = db.query(models.Comment).filter(
        models.Comment.task_id == task_id
    ).order_by(models.Comment.created_at.asc())

    return paginate(query, page, per_page)


@router.patch("/{comment_id}", response_model=schemas.CommentResponse)
def update_comment(
    task_id: int,
    comment_id: int,
    data: schemas.CommentUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    comment = db.query(models.Comment).filter(
        models.Comment.id == comment_id,
        models.Comment.task_id == task_id
    ).first()

    if not comment:
        raise HTTPException(status_code=404, detail="Comment not found")

    if comment.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Can only edit your own comments")

    comment.content = data.content
    db.commit()
    db.refresh(comment)
    return comment


@router.delete("/{comment_id}", status_code=204)
def delete_comment(
    task_id: int,
    comment_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    comment = db.query(models.Comment).filter(
        models.Comment.id == comment_id,
        models.Comment.task_id == task_id
    ).first()

    if not comment:
        raise HTTPException(status_code=404, detail="Comment not found")

    if comment.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Can only delete your own comments")

    db.delete(comment)
    db.commit()

Step 10 — main.py — Final Clean Entry Point

# main.py

import time
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import models
from database import engine
from routers import auth, users, workspaces, projects, tasks, comments
import os

# Create tables
models.Base.metadata.create_all(bind=engine)

# Create uploads folder
os.makedirs("uploads", exist_ok=True)

app = FastAPI(
    title="Task Manager API",
    description="""
    A complete task management API built with FastAPI.

    ## Features
    - JWT Authentication
    - Workspaces with role-based access
    - Projects inside workspaces
    - Tasks with priorities and statuses
    - Comments on tasks
    - File attachments
    - Background notifications
    """,
    version="1.0.0",
    contact={
        "name": "Gagan",
        "email": "gagan@email.com"
    }
)

# ── CORS ──────────────────────────────────────────
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ── Request Logging Middleware ────────────────────
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = round((time.time() - start) * 1000, 2)
    print(f"[{request.method}] {request.url.path} → {response.status_code} ({duration}ms)")
    return response

# ── Exception Handlers ────────────────────────────
@app.exception_handler(RequestValidationError)
async def validation_handler(request: Request, exc: RequestValidationError):
    errors = [
        {
            "field": " → ".join(str(l) for l in e["loc"]),
            "message": e["msg"]
        }
        for e in exc.errors()
    ]
    return JSONResponse(
        status_code=422,
        content={"success": False, "message": "Validation failed", "errors": errors}
    )

@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
    return JSONResponse(
        status_code=404,
        content={"success": False, "message": f"Route not found: {request.url.path}"}
    )

# ── Static Files ──────────────────────────────────
app.mount("/files", StaticFiles(directory="uploads"), name="files")

# ── Routers ───────────────────────────────────────
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(workspaces.router)
app.include_router(projects.router)
app.include_router(tasks.router)
app.include_router(comments.router)

# ── Root ──────────────────────────────────────────
@app.get("/", tags=["Root"])
def root():
    return {
        "message": "Task Manager API",
        "version": "1.0.0",
        "docs": "/docs",
        "redoc": "/redoc"
    }

Running the Final Project

uvicorn main:app --reload

Open http://localhost:8000/docs — you'll see the complete API organized into 6 sections:

  • Authentication
  • Users
  • Workspaces
  • Projects
  • Tasks
  • Comments

Complete Testing Flow

Test this complete scenario in /docs:

1. Register two users:

  • User A: gagan@email.com
  • User B: rahul@email.com

2. Login as User A → get token → Authorize in /docs

3. Create a workspace:

{"name": "My Team", "description": "Our workspace"}

4. Invite User B to workspace:

{"email": "rahul@email.com", "role": "member"}

Watch terminal — notification email prints in background.

5. Create a project:

{"name": "Website Redesign", "description": "Q1 project"}

6. Create a task:

{
    "title": "Design homepage mockup",
    "description": "Create Figma designs for the new homepage",
    "priority": "high",
    "assignee_id": 2
}

Watch terminal — assignment notification prints.

7. Login as User B → add a comment on the task

8. Upload an attachment to the task

9. Update task status to in_progress

10. Mark task as done


What You've Built

A complete production-grade REST API with:

✅ Clean project structure — routers, models, schemas separated
✅ JWT authentication — register, login, protected routes
✅ Role-based access — owner, admin, member
✅ Full CRUD — workspaces, projects, tasks, comments
✅ Relationships — nested resources, foreign keys
✅ Pagination — on all list endpoints
✅ Search and filtering — tasks by status, priority, assignee
✅ File uploads — attachments on tasks
✅ Background tasks — email notifications
✅ CORS — ready for frontend connection
✅ Middleware — request logging
✅ Custom exception handlers — consistent errors
✅ Auto documentation — /docs and /redoc

Connecting to Your Next.js Frontend

Since you already build Next.js apps — connecting is straightforward:

// lib/api.js in your Next.js project

const API_URL = "http://localhost:8000"

export async function login(email, password) {
    const res = await fetch(`${API_URL}/auth/login`, {
        method: "POST",
        headers: {"Content-Type": "application/json"},
        body: JSON.stringify({email, password})
    })
    return res.json()    // returns {access_token, user}
}

export async function getTasks(projectId, token) {
    const res = await fetch(`${API_URL}/projects/${projectId}/tasks`, {
        headers: {"Authorization": `Bearer ${token}`}
    })
    return res.json()
}

export async function createTask(projectId, taskData, token) {
    const res = await fetch(`${API_URL}/projects/${projectId}/tasks`, {
        method: "POST",
        headers: {
            "Content-Type": "application/json",
            "Authorization": `Bearer ${token}`
        },
        body: JSON.stringify(taskData)
    })
    return res.json()
}

Same Bearer token pattern your NestJS frontend already uses — zero learning curve.


What's Next

You now know FastAPI end to end. Depending on where you want to go:

Immediate next steps for this project:

  • Switch SQLite to PostgreSQL — one line change in .env
  • Deploy to a VPS with Nginx + Uvicorn
  • Add email sending with fastapi-mail
  • Add WebSockets for real-time task updates
  • Write tests with pytest

Deeper FastAPI topics:

  • Alembic migrations — managing database schema changes
  • WebSockets — real-time features
  • Celery + Redis — heavy background job processing
  • pytest — testing your API endpoints
  • Docker — containerizing your app

Given your NestJS and PostgreSQL background — switching to PostgreSQL and deploying this to your existing Azure infrastructure would be the natural next step. The code is already ready for it.


Advanced Features in Fast APIs

What We're Covering

This stage covers the features that take your API from "works locally" to "production ready":

  • CORS — so your frontend can talk to your API
  • Middleware — code that runs on every request
  • Background Tasks — run tasks after sending response
  • File Uploads — handle images and documents
  • APIRouter — organize routes into separate files
  • Custom Exception Handlers — consistent error responses

Part 1 — CORS

What is CORS?

CORS = Cross-Origin Resource Sharing

When your Next.js frontend (running on localhost:3000) tries to call your FastAPI backend (running on localhost:8000) — the browser blocks it by default. Different ports = different origins = CORS error.

You've definitely seen this error before in your NestJS projects:

Access to fetch at 'http://localhost:8000' from origin 'http://localhost:3000' 
has been blocked by CORS policy

FastAPI fixes this with one middleware setup:

# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "http://localhost:3000",      # Next.js dev server
        "http://localhost:3001",
        "https://yourfrontend.com",   # production frontend
    ],
    allow_credentials=True,           # allows cookies and auth headers
    allow_methods=["*"],              # allows GET, POST, PUT, DELETE etc
    allow_headers=["*"],              # allows Authorization, Content-Type etc
)

For development only — allow everything:

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],      # never use this in production
    allow_methods=["*"],
    allow_headers=["*"],
)

Always restrict allow_origins in production — only list domains that should access your API.


Part 2 — Middleware

What is Middleware?

Middleware is code that runs on every single request before it reaches your route, and on every response before it goes back to the client.

Think of it like a checkpoint — every request passes through it.

Request → Middleware → Route Handler → Middleware → Response

You use middleware for things that apply to all routes:

  • Logging every request
  • Measuring response time
  • Adding headers to every response
  • Checking API keys

Creating Custom Middleware

# main.py
import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()

    # Code here runs BEFORE the route handler
    print(f"→ {request.method} {request.url}")

    response = await call_next(request)    # call the actual route

    # Code here runs AFTER the route handler
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(round(process_time * 1000, 2)) + "ms"

    print(f"← {response.status_code} ({round(process_time * 1000, 2)}ms)")

    return response

Now every request logs like this in your terminal:

→ GET http://localhost:8000/users
← 200 (3.42ms)

→ POST http://localhost:8000/auth/login
← 200 (145.23ms)

And every response has the processing time in its headers — visible in browser dev tools.


Multiple Middlewares

You can stack multiple middlewares. They run in order:

@app.middleware("http")
async def log_requests(request: Request, call_next):
    print(f"Request: {request.method} {request.url.path}")
    response = await call_next(request)
    return response


@app.middleware("http")
async def add_security_headers(request: Request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    return response

Part 3 — Background Tasks

What are Background Tasks?

Sometimes after handling a request you need to do extra work that the client doesn't need to wait for — like sending an email, logging to a file, processing data.

Without background tasks:

Client waits... API sends email (3 seconds)... API returns response
Total wait: 3+ seconds

With background tasks:

Client waits... API returns response immediately
Email sends in background (client already got response)
Total wait: milliseconds

Basic Background Task

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()


def send_welcome_email(email: str, name: str):
    """This runs in background after response is sent."""
    import time
    time.sleep(2)    # simulate email sending delay
    print(f"Email sent to {email}: Welcome {name}!")


@app.post("/users/register")
def register_user(
    name: str,
    email: str,
    background_tasks: BackgroundTasks
):
    # Add task to run after response
    background_tasks.add_task(send_welcome_email, email, name)

    # This response goes back immediately
    # Email sends after this
    return {"message": "Registered successfully! Check your email."}

Client gets response instantly. Email sends 2 seconds later in background. Client never waits.


Background Task with Database

A realistic example — log every login attempt:

from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from datetime import datetime
from database import get_db


def log_login_attempt(email: str, success: bool, ip: str):
    """Log to file in background."""
    status = "SUCCESS" if success else "FAILED"
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] LOGIN {status} — Email: {email} — IP: {ip}\n"

    with open("login_log.txt", "a") as f:
        f.write(log_entry)

    print(log_entry.strip())


@app.post("/auth/login")
def login(
    credentials: schemas.LoginRequest,
    request: Request,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db)
):
    user = crud.authenticate_user(db, credentials.email, credentials.password)
    client_ip = request.client.host

    if not user:
        background_tasks.add_task(log_login_attempt, credentials.email, False, client_ip)
        raise HTTPException(status_code=401, detail="Invalid credentials")

    token = auth.create_access_token(data={"user_id": user.id})
    background_tasks.add_task(log_login_attempt, credentials.email, True, client_ip)

    return {"access_token": token, "token_type": "bearer", "user": user}

Multiple Background Tasks

You can add multiple tasks — they all run after response:

@app.post("/orders")
def create_order(
    order: OrderCreate,
    background_tasks: BackgroundTasks,
    current_user = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    new_order = crud.create_order(db, order, current_user.id)

    background_tasks.add_task(send_order_confirmation_email, current_user.email, new_order)
    background_tasks.add_task(notify_warehouse, new_order.id)
    background_tasks.add_task(update_inventory, order.product_id, order.quantity)

    return new_order    # returns immediately, all 3 tasks run after

Part 4 — File Uploads

Handling File Uploads

FastAPI handles file uploads cleanly. You need python-multipart which you already installed.

from fastapi import FastAPI, File, UploadFile

app = FastAPI()


@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    return {
        "filename": file.filename,
        "content_type": file.content_type,
        "size": file.size
    }

UploadFile gives you:

  • file.filename — original filename
  • file.content_type — mime type (image/jpeg, application/pdf etc)
  • file.size — file size in bytes
  • await file.read() — file content as bytes

Saving Uploaded File

import os
import uuid
from fastapi import FastAPI, File, UploadFile, HTTPException

app = FastAPI()

UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)    # create folder if not exists

ALLOWED_TYPES = ["image/jpeg", "image/png", "image/webp"]
MAX_SIZE = 5 * 1024 * 1024    # 5MB in bytes


@app.post("/upload/image")
async def upload_image(file: UploadFile = File(...)):
    # Validate file type
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(
            status_code=400,
            detail=f"File type not allowed. Allowed: {ALLOWED_TYPES}"
        )

    # Read file content
    content = await file.read()

    # Validate file size
    if len(content) > MAX_SIZE:
        raise HTTPException(
            status_code=400,
            detail="File too large. Maximum size is 5MB"
        )

    # Generate unique filename to avoid conflicts
    extension = file.filename.split(".")[-1]
    unique_filename = f"{uuid.uuid4()}.{extension}"
    file_path = os.path.join(UPLOAD_DIR, unique_filename)

    # Save file
    with open(file_path, "wb") as f:
        f.write(content)

    return {
        "message": "File uploaded successfully",
        "filename": unique_filename,
        "original_name": file.filename,
        "size": len(content),
        "url": f"/files/{unique_filename}"
    }

Serving Uploaded Files

After saving, you want to serve them via URL:

from fastapi.staticfiles import StaticFiles

# Mount uploads folder as static files
# Now http://localhost:8000/files/image.jpg serves the file
app.mount("/files", StaticFiles(directory="uploads"), name="files")

Upload with Form Data and Fields Together

Sometimes you want to upload a file AND send extra data together:

from fastapi import Form

@app.post("/upload/profile-picture")
async def upload_profile_picture(
    file: UploadFile = File(...),
    user_id: int = Form(...),
    description: str = Form(default="")
):
    # Can't use JSON body with file upload — must use Form fields
    content = await file.read()

    return {
        "user_id": user_id,
        "description": description,
        "filename": file.filename,
        "size": len(content)
    }

Important — when uploading files you cannot use JSON body for other fields. You must use Form() for text fields alongside File().


Multiple File Upload

@app.post("/upload/multiple")
async def upload_multiple(files: list[UploadFile] = File(...)):
    results = []

    for file in files:
        content = await file.read()
        extension = file.filename.split(".")[-1]
        unique_name = f"{uuid.uuid4()}.{extension}"
        file_path = os.path.join(UPLOAD_DIR, unique_name)

        with open(file_path, "wb") as f:
            f.write(content)

        results.append({
            "original": file.filename,
            "saved_as": unique_name,
            "size": len(content)
        })

    return {"uploaded": len(results), "files": results}

Part 5 — APIRouter — Organizing Routes

The Problem

As your app grows, main.py becomes thousands of lines with routes for users, posts, products, auth, orders — everything mixed together. Nightmare to maintain.

APIRouter lets you split routes into separate files — exactly like Controllers in NestJS.


Creating Routers

Create a routers/ folder:

fastapi-learning/
├── routers/
│   ├── auth.py
│   ├── users.py
│   ├── posts.py
│   └── uploads.py
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth.py
└── dependencies.py

routers/auth.py

# routers/auth.py

from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import schemas
import crud
import auth as auth_utils
from database import get_db

router = APIRouter(
    prefix="/auth",           # all routes start with /auth
    tags=["Authentication"]   # groups in /docs
)


@router.post("/register", response_model=schemas.UserResponse, status_code=201)
def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
    existing = crud.get_user_by_email(db, user.email)
    if existing:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db, user)


@router.post("/login", response_model=schemas.TokenResponse)
def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
    user = crud.authenticate_user(db, credentials.email, credentials.password)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid email or password")

    token = auth_utils.create_access_token(data={"user_id": user.id})
    return {"access_token": token, "token_type": "bearer", "user": user}

routers/users.py

# routers/users.py

from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import schemas
import crud
from database import get_db
from dependencies import get_current_user

router = APIRouter(
    prefix="/users",
    tags=["Users"]
)


@router.get("/me", response_model=schemas.UserResponse)
def get_me(current_user = Depends(get_current_user)):
    return current_user


@router.patch("/me", response_model=schemas.UserResponse)
def update_me(
    user_update: schemas.UserUpdate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    return crud.update_user(db, current_user.id, user_update)


@router.delete("/me", status_code=status.HTTP_204_NO_CONTENT)
def delete_me(
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    crud.delete_user(db, current_user.id)


@router.get("", response_model=list[schemas.UserResponse])
def get_users(
    skip: int = 0,
    limit: int = 10,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    return crud.get_users(db, skip, limit)


@router.get("/{user_id}", response_model=schemas.UserResponse)
def get_user(
    user_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    user = crud.get_user(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

routers/posts.py

# routers/posts.py

from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import schemas
import crud
from database import get_db
from dependencies import get_current_user

router = APIRouter(
    prefix="/posts",
    tags=["Posts"]
)


@router.post("", response_model=schemas.PostResponse, status_code=201)
def create_post(
    post: schemas.PostCreate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    return crud.create_post(db, post, author_id=current_user.id)


@router.get("", response_model=list[schemas.PostResponse])
def get_posts(
    skip: int = 0,
    limit: int = 10,
    published: bool | None = None,
    db: Session = Depends(get_db)
):
    return crud.get_posts(db, skip, limit, published)


@router.get("/my", response_model=list[schemas.PostResponse])
def get_my_posts(
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    return crud.get_posts(db, author_id=current_user.id)


@router.get("/{post_id}", response_model=schemas.PostWithAuthor)
def get_post(post_id: int, db: Session = Depends(get_db)):
    post = crud.get_post(db, post_id)
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    return post


@router.patch("/{post_id}", response_model=schemas.PostResponse)
def update_post(
    post_id: int,
    post_update: schemas.PostUpdate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    post = crud.get_post(db, post_id)
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    if post.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Not authorized")
    return crud.update_post(db, post_id, post_update)


@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(
    post_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    post = crud.get_post(db, post_id)
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    if post.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Not authorized")
    crud.delete_post(db, post_id)

routers/uploads.py

# routers/uploads.py

import os
import uuid
from fastapi import APIRouter, File, UploadFile, HTTPException, Depends
from fastapi.responses import FileResponse
from dependencies import get_current_user

router = APIRouter(
    prefix="/uploads",
    tags=["Uploads"]
)

UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
ALLOWED_TYPES = ["image/jpeg", "image/png", "image/webp", "application/pdf"]
MAX_SIZE = 5 * 1024 * 1024


@router.post("/image")
async def upload_image(
    file: UploadFile = File(...),
    current_user = Depends(get_current_user)
):
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(status_code=400, detail="File type not allowed")

    content = await file.read()

    if len(content) > MAX_SIZE:
        raise HTTPException(status_code=400, detail="File too large. Max 5MB")

    extension = file.filename.split(".")[-1].lower()
    unique_name = f"{uuid.uuid4()}.{extension}"
    file_path = os.path.join(UPLOAD_DIR, unique_name)

    with open(file_path, "wb") as f:
        f.write(content)

    return {
        "message": "Uploaded successfully",
        "filename": unique_name,
        "url": f"/files/{unique_name}",
        "uploaded_by": current_user.id
    }

Clean main.py — Register All Routers

Now main.py is beautifully clean:

# main.py

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
import time
import models
from database import engine
from routers import auth, users, posts, uploads

# Create database tables
models.Base.metadata.create_all(bind=engine)

app = FastAPI(
    title="Production Ready API",
    description="FastAPI with Auth, Database, File Uploads",
    version="5.0.0"
)

# ── CORS ──────────────────────────────────────────
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ── Middleware ────────────────────────────────────
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = round((time.time() - start) * 1000, 2)
    print(f"{request.method} {request.url.path} → {response.status_code} ({duration}ms)")
    return response

# ── Static Files ──────────────────────────────────
app.mount("/files", StaticFiles(directory="uploads"), name="files")

# ── Routers ───────────────────────────────────────
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(posts.router)
app.include_router(uploads.router)

# ── Root ──────────────────────────────────────────
@app.get("/", tags=["Root"])
def root():
    return {"message": "API is running", "docs": "/docs"}

Open /docs — routes are now organized into groups: Authentication, Users, Posts, Uploads. Much cleaner.


Part 6 — Custom Exception Handlers

Consistent Error Responses

Right now different errors return different formats. Let's make all errors consistent:

# main.py — add these handlers

from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from sqlalchemy.exc import SQLAlchemyError


# Handle validation errors (422)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    errors = []
    for error in exc.errors():
        errors.append({
            "field": " → ".join(str(loc) for loc in error["loc"]),
            "message": error["msg"],
            "type": error["type"]
        })

    return JSONResponse(
        status_code=422,
        content={
            "success": False,
            "message": "Validation failed",
            "errors": errors
        }
    )


# Handle database errors (500)
@app.exception_handler(SQLAlchemyError)
async def database_exception_handler(request: Request, exc: SQLAlchemyError):
    return JSONResponse(
        status_code=500,
        content={
            "success": False,
            "message": "Database error occurred",
            "detail": str(exc)
        }
    )


# Handle 404 not found
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
    return JSONResponse(
        status_code=404,
        content={
            "success": False,
            "message": f"Route {request.url.path} not found"
        }
    )

Now all errors have the same shape — frontend can handle them consistently.


Standard Response Wrapper — Optional but Professional

Some teams wrap all responses in a standard format:

# schemas.py — add this

from typing import Generic, TypeVar
from pydantic import BaseModel

T = TypeVar("T")

class APIResponse(BaseModel, Generic[T]):
    success: bool = True
    message: str = "Success"
    data: T | None = None

Use it in routes:

@router.get("/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
    user = crud.get_user(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return APIResponse(data=user, message="User fetched successfully")

Response:

{
    "success": true,
    "message": "User fetched successfully",
    "data": {
        "id": 1,
        "name": "Gagan",
        ...
    }
}

This is a common pattern in production APIs — especially when building for mobile apps.


Final Project Structure

fastapi-learning/
├── routers/
│   ├── __init__.py
│   ├── auth.py
│   ├── users.py
│   ├── posts.py
│   └── uploads.py
├── uploads/               ← uploaded files stored here
├── venv/
├── .env
├── .gitignore
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth.py
├── dependencies.py
├── config.py
└── requirements.txt

Save Requirements

pip freeze > requirements.txt

Your requirements.txt will look like:

fastapi==0.115.0
uvicorn==0.30.0
sqlalchemy==2.0.35
passlib[bcrypt]==1.7.4
python-jose[cryptography]==3.3.0
python-multipart==0.0.12
python-dotenv==1.0.1
pydantic-settings==2.5.2

Exercise 🏋️

Add these features to complete the project:

1. Profile Picture Upload:

  • POST /users/me/avatar — upload profile picture (protected)
  • Save filename to user's database record
  • Add avatar_url field to UserResponse
  • Add avatar column to User model

2. Post Cover Image:

  • PATCH /posts/{id}/cover — upload cover image for a post
  • Only post author can upload cover
  • Add cover_image field to Post model and response

3. Request Logging Middleware:

[2025-03-03 10:30:01] POST /auth/login 200 145ms
[2025-03-03 10:30:05] GET /users/me 200 3ms
[2025-03-03 10:30:10] POST /posts 201 12ms

Save logs to requests.log file using background tasks.

4. Pagination Response: Instead of returning plain list, return:

{
    "data": [...],
    "total": 50,
    "page": 1,
    "per_page": 10,
    "total_pages": 5,
    "has_next": true,
    "has_prev": false
}

Create a reusable paginate() helper function in a new utils.py file.


Phase 4 — Data Binding & Directives

Chapter 1 — What is Data Binding? 1.1 — The Problem Data Binding Solves When you build a web application, you have two worlds that need to t...