InkdownInkdown
Start writing

Study

59 files·8 subfolders

Shared Workspace

Study
core

SQLAlchemy overview

Shared from "Study" on Inkdown

SQLAlchemy — Overview & Quick Reference


What Is SQLAlchemy?

SQLAlchemy is two tools in one:

Plain text
SQLAlchemy
├── Core  →  Python-level SQL expression language. Writes SQL for you.
└── ORM   →  Maps Python classes ↔ DB tables. You work with objects, not SQL.

Most FastAPI apps use the ORM layer. Core runs underneath it silently. Think: Core = engine, ORM = steering wheel.


Project Structure

programming-language-concepts.md
zero-language-explanation.md
DB
01-introduction.md
02-relational-databases.md
03-database-design.md
04-indexing.md
05-transactions-acid.md
06-nosql-databases.md
07-query-optimization.md
08-replication-ha.md
09-sharding-partitioning.md
10-caching-strategies.md
11-cap-theorem.md
12-connection-pooling.md
13-backup-recovery.md
14-monitoring.md
15-database-selection.md
README.md
JS
Event loop
Merlin Backend
01-Orchestration.md
02-DeepResearch.md
03-Search.md
04-Scraping.md
05-Streaming.md
06-MultiProviderLLM.md
07-MemoryAndContext.md
08-ErrorHandling.md
09-RateLimiting.md
10-TaskQueue.md
11-SecurityAndAuth.md
Orchestration-2nd-draft
OpenAI Agents Python
00_OVERVIEW.md
01_AGENT_SYSTEM.md
02_RUNNER_SYSTEM.md
03_TOOL_SYSTEM.md
04_ITEMS_SYSTEM.md
05_GUARDRAILS.md
06_HANDOFFS.md
07_MEMORY_SESSIONS.md
08_MODEL_PROVIDERS.md
09_SANDBOX_SYSTEM.md
10_TRACING.md
11_RUN_STATE.md
12_CONTEXT.md
13_LIFECYCLE_HOOKS.md
14_CONFIGURATION.md
15_ERROR_HANDLING.md
16_STREAMING.md
17_EXTENSIONS.md
18_MCP_INTEGRATION.md
19_BEST_PRACTICES.md
20_ARCHITECTURE_PATTERNS.md
opencode-study
context-handling
core
Python
Alembic
Basics
sqlalchemy - fastapi
SQLAlchemy overview
tweets
system_design_for_agentic_apps.md
Plain text
app/
├── main.py
├── database.py        ← engine + session setup
├── models/            ← SQLAlchemy models (talks to DB)
│   ├── user.py
│   └── post.py
├── schemas/           ← Pydantic schemas (validates HTTP data)
│   ├── user.py
│   └── post.py
├── crud/              ← DB operations
│   ├── user.py
│   └── post.py
└── routers/           ← FastAPI route handlers
    ├── users.py
    └── posts.py

Engine & Session

Engine = the one connection to your DB. Created once, lives for the app's lifetime.
Session = a "unit of work" per request. Like a shopping cart — you stage changes, then commit.

Python
# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydb"

engine = create_async_engine(DATABASE_URL, echo=True, pool_size=10, max_overflow=20)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # keeps object data accessible after commit
)

class Base(DeclarativeBase):
    pass

# FastAPI dependency — one session per request
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()    # auto-commit if no errors
        except Exception:
            await session.rollback()  # auto-rollback on error
            raise

expire_on_commit=False — without this, SQLAlchemy clears object data after commit. In async FastAPI, your response serializer would crash trying to read cleared attributes.


Defining Models

Python
# app/models/user.py
from sqlalchemy import String, Boolean, DateTime, Text, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from app.database import Base

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
    username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
    hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)

    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

    posts: Mapped[list["Post"]] = relationship("Post", back_populates="author")
Python
# app/models/post.py
from sqlalchemy import String, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    content: Mapped[str | None] = mapped_column(Text, nullable=True)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)

    author: Mapped["User"] = relationship("User", back_populates="posts")
Two-Layer Mental Model
Plain text
Python Layer         SQL Layer
------------         ---------
User class      →    users table
user.email      →    users.email column
user.posts      →    SELECT * FROM posts WHERE author_id = user.id

Relationships

One-to-Many (most common)

One user → many posts. Use back_populates to create a two-way link.

Python
# On User
posts: Mapped[list["Post"]] = relationship("Post", back_populates="author")

# On Post
author: Mapped["User"] = relationship("User", back_populates="posts")

# Usage
user.posts   # → list of Post objects
post.author  # → the User object
Many-to-Many (association table)
Python
from sqlalchemy import Table, Column, ForeignKey

post_tags = Table(
    "post_tags", Base.metadata,
    Column("post_id", ForeignKey("posts.id"), primary_key=True),
    Column("tag_id", ForeignKey("tags.id"), primary_key=True),
)

class Tag(Base):
    __tablename__ = "tags"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)
    posts: Mapped[list["Post"]] = relationship("Post", secondary=post_tags, back_populates="tags")

# In Post model, add:
# tags: Mapped[list["Tag"]] = relationship("Tag", secondary=post_tags, back_populates="posts")
One-to-One
Python
# unique=True on ForeignKey enforces one-to-one
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), unique=True)
Lazy vs Eager Loading (critical in async)

In async SQLAlchemy, accessing user.posts without eager loading raises MissingGreenlet. Always explicitly load relationships.

Python
from sqlalchemy.orm import selectinload, joinedload

# selectinload → runs a second query. Best for lists.
stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)

# joinedload → uses a JOIN. Best for single objects.
stmt = select(Post).options(joinedload(Post.author)).where(Post.id == post_id)

Pydantic Schemas vs SQLAlchemy Models

These are two different things. Don't confuse them.

SQLAlchemy ModelPydantic Schema
Lives inmodels/schemas/
PurposeTalks to DBValidates HTTP data
Inherits fromBaseBaseModel
Python
# app/schemas/user.py
from pydantic import BaseModel, EmailStr
from datetime import datetime

class UserCreate(BaseModel):     # what comes IN
    email: EmailStr
    username: str
    password: str

class UserResponse(BaseModel):   # what goes OUT
    id: int
    email: str
    username: str
    is_active: bool
    created_at: datetime

    model_config = {"from_attributes": True}  # allows reading SQLAlchemy objects directly

from_attributes = True lets Pydantic read SQLAlchemy model attributes instead of expecting a plain dict.


flush vs commit vs rollback

Plain text
db.add(obj)     →  staged in Python memory only
db.flush()      →  SQL sent to DB, inside the transaction (can still rollback)
db.commit()     →  transaction finalized, permanent, visible to everyone
db.rollback()   →  undo everything since last commit
db.refresh(obj) →  reload object from DB (get server-set values like IDs, timestamps)
The Google Doc Analogy
  • flush = typing on screen (not saved yet, browser crash = gone)
  • commit = clicking Save (permanent)
When to Use What
SituationUse
Need auto-generated ID to create a related objectflush() first, then continue
End of a complete operation, everything succeededcommit()
Something went wrongrollback()
Need server-set values (created_at, id) after flush/commitrefresh(obj)
The Decision Rule
Plain text
"Do I need something back from the DB to continue my current operation?"

YES → flush(), use the value, commit() at the end
NO  → just commit()
Example: flush in action
Python
workspace = Workspace(name="My Team")
db.add(workspace)

await db.flush()   # ← DB assigns workspace.id now

settings = WorkspaceSettings(workspace_id=workspace.id, theme="dark")
db.add(settings)

await db.commit()  # both saved atomically

CRUD Operations (Async)

Python
# CREATE
async def create_user(db: AsyncSession, data: UserCreate) -> User:
    user = User(email=data.email, username=data.username)
    db.add(user)
    await db.flush()
    await db.refresh(user)
    return user

# READ one
async def get_user(db: AsyncSession, user_id: int) -> User | None:
    stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

# READ many with pagination
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 20) -> list[User]:
    stmt = select(User).offset(skip).limit(limit).order_by(User.created_at.desc())
    result = await db.execute(stmt)
    return list(result.scalars().all())

# UPDATE
async def update_user(db: AsyncSession, user_id: int, update_data: dict) -> User | None:
    user = await get_user(db, user_id)
    if not user:
        return None
    for key, value in update_data.items():
        setattr(user, key, value)
    await db.flush()
    await db.refresh(user)
    return user

# DELETE
async def delete_user(db: AsyncSession, user_id: int) -> bool:
    user = await get_user(db, user_id)
    if not user:
        return False
    await db.delete(user)
    return True

Advanced Queries

Python
from sqlalchemy import select, and_, or_, func, desc, insert

# AND / OR filtering
stmt = select(User).where(and_(User.is_active == True, User.created_at > some_date))
stmt = select(User).where(or_(User.email == "a@b.com", User.username == "john"))

# Case-insensitive LIKE search
stmt = select(User).where(User.username.ilike(f"%{query}%"))

# Aggregation — count posts per user
stmt = (
    select(User.username, func.count(Post.id).label("post_count"))
    .join(Post, Post.author_id == User.id)
    .group_by(User.username)
    .order_by(desc("post_count"))
)

# Total count
total = await db.scalar(select(func.count()).select_from(User).where(User.is_active == True))

# Bulk insert
await db.execute(insert(User), [{"email": "a@b.com", "username": "a"}, {"email": "b@c.com", "username": "b"}])

Production Patterns

Reusable Timestamp Mixin
Python
class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(server_default=func.now(), onupdate=func.now())

class User(TimestampMixin, Base):
    __tablename__ = "users"
    # ...
Soft Deletes
Python
class User(Base):
    # ...
    deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)

# All queries: .where(User.deleted_at == None)
Repository Pattern
Python
class UserRepository:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def get_by_id(self, user_id: int) -> User | None: ...
    async def create(self, data: UserCreate) -> User: ...

# In router:
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    repo = UserRepository(db)
    return await repo.get_by_id(user_id)

Common Gotchas

GotchaFix
Forgetting await on DB callsEvery DB call is async — always await db.execute(...)
Accessing user.posts in async without eager loadingUse selectinload or joinedload
N+1 query problem (looping + accessing relationships)Eager load everything upfront in one query
Sharing one session across concurrent requestsget_db creates one session per request — don't share
Alembic not detecting a new modelImport all models in alembic/env.py