AWS Database Blog

Building Python applications with SQLAlchemy and Aurora DSQL

Building Python applications with SQLAlchemy and Amazon Aurora DSQL means your object-relational mapper (ORM) handles query construction and your database handles scaling. Aurora DSQL provides a serverless, PostgreSQL-compatible database that automatically scales based on traffic and requires no capacity planning. It authenticates through AWS Identity and Access Management (AWS IAM) instead of stored passwords. SQLAlchemy is the most widely used Python ORM, with type-safe model definitions and a query builder that maps directly to SQL.

SQLAlchemy works with Aurora DSQL after three adjustments. You use universally unique identifiers (UUIDs) for primary keys (Aurora DSQL’s recommended approach for distributed workloads), define relationships using explicit primaryjoin with foreign() annotations instead of ForeignKey() constraints, and run the engine in AUTOCOMMIT mode to avoid unsupported SAVEPOINT operations.

In this post, you’ll build a working veterinary clinic command line interface (CLI) application that demonstrates production-ready patterns for connecting SQLAlchemy to Aurora DSQL. The patterns you implement (UUID primary keys, application-level relationships, and AUTOCOMMIT engine configuration) apply to other Python ORMs on Aurora DSQL. You’ll learn how to:

  • Define SQLAlchemy 2.x ORM models using Mapped[] type annotations and UUID primary keys compatible with Aurora DSQL
  • Manage table relationships using relationship() with primaryjoin and foreign() annotations instead of ForeignKey() constraints
  • Connect through IAM authentication using the Aurora DSQL Python Connector
  • Configure the SQLAlchemy engine in AUTOCOMMIT mode to work around unsupported SAVEPOINT operations
  • Run CRUD (create, read, update, delete) operations with eager loading against Aurora DSQL

Solution overview

You’ll create a Python CLI application that runs type-safe CRUD operations against Aurora DSQL through SQLAlchemy 2.x. You’ll use a veterinary clinic domain model: owners and pets (one-to-many), veterinarians and specialties (many-to-many).

The following diagram shows the architecture.

Architecture diagram showing a Python CLI application connecting through the Aurora DSQL Python Connector with IAM authentication to Amazon Aurora DSQL
Figure 1: SQLAlchemy application architecture with Amazon Aurora DSQL

When you run the app:

  1. The Aurora DSQL Python Connector establishes an IAM-authenticated connection using psycopg3, generating and refreshing short-lived tokens.
  2. SQLAlchemy creates the engine in AUTOCOMMIT mode, avoiding SAVEPOINT operations that Aurora DSQL does not support.
  3. SQLAlchemy’s ORM runs CRUD operations. The relationship() declarations with foreign() annotations handle table joins at the application level.

Key considerations

Aurora DSQL is a distributed, serverless database. Its architecture affects schema design choices such as primary key selection. The migration guide covers additional patterns.

  • AWS recommends UUIDs as the primary key type because they require no coordination across the distributed system, which means your inserts scale without bottlenecks. Aurora DSQL also supports sequences and identity columns (with CACHE specified) when you need integer identifiers.
  • Aurora DSQL does not support foreign key constraints. ForeignKey() cannot be used in SQLAlchemy column definitions. Without ForeignKey(), SQLAlchemy cannot auto-detect join conditions between tables. You define relationships using relationship() with explicit primaryjoin and foreign() annotations. The foreign() annotation tells SQLAlchemy which column is the referencing side of the join, replacing the role ForeignKey() normally plays. See Creating Custom Foreign Conditions in the SQLAlchemy documentation.
  • Aurora DSQL does not support SAVEPOINT, which SQLAlchemy’s psycopg dialect uses during engine initialization. Setting isolation_level="AUTOCOMMIT" and autocommit=True on the connection avoids this.
  • Aurora DSQL uses IAM authentication with time-limited tokens. The Aurora DSQL Python Connector handles token generation and refresh. See Authentication and authorization for Aurora DSQL.

Prerequisites

To follow along with this tutorial, you’ll need:

IAM permissions configuration

Use the following IAM policy. ClusterSetup covers the one-time cluster creation step. DatabaseAccess is what your application uses at runtime.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ClusterSetup",
            "Effect": "Allow",
            "Action": [
                "dsql:CreateCluster",
                "dsql:GetCluster"
            ],
            "Resource": "arn:aws:dsql:<region>:<account-id>:cluster/*"
        },
        {
            "Sid": "DatabaseAccess",
            "Effect": "Allow",
            "Action": "dsql:DbConnectAdmin",
            "Resource": "arn:aws:dsql:<region>:<account-id>:cluster/<cluster-id>"
        }
    ]
}

Replace <region> and <account-id> with your values. Use dsql:DbConnect instead of dsql:DbConnectAdmin when connecting as a non-admin user.

Important: After creating your cluster, remove dsql:CreateCluster from your policy and scope the DatabaseAccess statement to your specific cluster ARN. The ClusterSetup statement uses a wildcard because the cluster identifier isn’t known before creation.

Estimated time and costs

This tutorial takes 15 to 20 minutes. Running the sample generates minimal database activity.

Aurora DSQL charges for Distributed Processing Units (DPUs) and storage (GB-month). See Amazon Aurora DSQL Pricing. Delete your cluster after testing.

Solution walkthrough

To create an Aurora DSQL cluster

  1. Open a terminal and run:
    aws dsql create-cluster --region us-east-1
  2. Copy the identifier from the JSON response.
  3. Retrieve the cluster endpoint:
    aws dsql get-cluster --identifier <cluster-id> --region us-east-1
  4. Wait for the status field to show Active. The endpoint field is the hostname your application connects to (for example, abc123def456.dsql.us-east-1.on.aws).

To clone the sample and install dependencies

  1. Clone the repository:
    git clone https://github.com/aws-samples/aurora-dsql-samples.git
  2. Navigate to the SQLAlchemy sample:
    cd aurora-dsql-samples/python/sqlalchemy
  3. Create a virtual environment and install dependencies:
    python3 -m venv .venv
    source .venv/bin/activate
    pip install -r requirements.txt

Note: If you want to run the sample immediately, skip ahead to “To build and run the application”. The following sections walk through the key parts of the code (model definitions, connection setup, and the AUTOCOMMIT engine configuration) to explain the Aurora DSQL-specific adaptations made in this sample.

To define the SQLAlchemy models

In src/models.py, you define five tables using SQLAlchemy 2.x declarative models with Mapped[] type annotations. Most use UUID primary keys. The specialty table uses a String(80) VARCHAR primary key. None use ForeignKey() constraints.

The owner table uses a UUID primary key. Aurora DSQL generates the ID on insert via gen_random_uuid():

class Owner(Base):
    __tablename__ = "owner"

    id: Mapped[UUID] = mapped_column(
        Uuid, primary_key=True, server_default=text("gen_random_uuid()")
    )
    name: Mapped[str] = mapped_column(String(30))
    city: Mapped[str] = mapped_column(String(80))
    telephone: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)

The pet table stores an owner reference as a UUID column. No ForeignKey() is used:

class Pet(Base):
    __tablename__ = "pet"

    id: Mapped[UUID] = mapped_column(
        Uuid, primary_key=True, server_default=text("gen_random_uuid()")
    )
    name: Mapped[str] = mapped_column(String(30))
    birth_date: Mapped[date] = mapped_column(Date)
    owner_id: Mapped[Optional[UUID]] = mapped_column(Uuid, nullable=True)

Because Aurora DSQL does not support foreign key constraints, ForeignKey() cannot be used in column definitions. Relationships are instead declared after all classes exist using relationship() with explicit primaryjoin and foreign() annotations. The foreign() annotation tells SQLAlchemy which column is the referencing side of the join (see src/models.py). SQLAlchemy’s joinedload() then handles eager loading of related records in a single query.

To connect to Aurora DSQL

In src/dsql_engine.py, you create the SQLAlchemy engine using the Aurora DSQL Python Connector. The connector extends psycopg3 and handles IAM token generation:

import aurora_dsql_psycopg as dsql
from psycopg import sql
from sqlalchemy import create_engine, event

engine = create_engine(
    "postgresql+psycopg://",
    creator=lambda: dsql.connect(host=host, user=user, autocommit=True),
    isolation_level="AUTOCOMMIT",
    pool_pre_ping=True,
    pool_size=5,
    max_overflow=0,
    pool_recycle=3300,  # Recycle before DSQL's 1-hour limit
)

@event.listens_for(engine, "connect")
def set_search_path(dbapi_connection, connection_record):
    cursor = dbapi_connection.cursor()
    cursor.execute(
        sql.SQL("SET search_path TO {}").format(sql.Identifier(schema))
    )
    cursor.close()

The creator parameter passes a callable that returns a psycopg3 connection with IAM authentication. isolation_level="AUTOCOMMIT" prevents SQLAlchemy from issuing SAVEPOINT statements during initialization. pool_recycle=3300 refreshes connections before Aurora DSQL’s 1-hour connection limit.

We use psycopg.sql.Identifier() to safely quote the schema name. The schema value is derived from a fixed comparison (user == ADMIN), producing only “public” or “myschema”. Never use f-strings or string formatting to construct SQL (even for SET commands) because the value is passed directly to PostgreSQL without parameterization.

To build and run the application

  1. Set environment variables. Replace the endpoint with your cluster endpoint from the first step:
    export CLUSTER_USER="admin"
    export CLUSTER_ENDPOINT="your-cluster.dsql.us-east-1.on.aws"
  2. Run the example:
    python -m src.example
  3. Verify the output:
    Starting SQLAlchemy DSQL Example...
    Dropping existing tables (if any)...
    Creating tables...
    Creating owners...
    Created owner: John Doe (ID: a1b2c3d4-...)
    Created owner: Mary Major (ID: e5f6g7h8-...)
    Creating pets...
    Created pet: Pet1 (Owner: John Doe)
    Created pet: Pet2 (Owner: John Doe)
    Creating veterinary specialties...
    Created specialties: Exotic, Dogs, Cats
    Creating veterinarians...
    Created vet: Akua Mansa (Specialty: Exotic)
    Created vet: Carlos Salazar (Specialties: Cats, Dogs)
    Querying pet information...
    Querying owner information...
    Querying veterinarians with specialties...
    Cleaning up...
    Cleanup complete.
    Example completed successfully!
    Dropping tables...

    Note: If you see “Missing required environment variable CLUSTER_ENDPOINT”, check that your environment variables are set in the current shell session.

To validate with tests

Run the pytest integration test:

pytest

A successful run produces:

========================= test session starts =========================
collected 1 item

test/test_example.py . [100%]

========================= 1 passed in XXs =========================

To handle write conflicts with retry logic

Aurora DSQL uses optimistic concurrency control (OCC) and returns SQLSTATE 40001 when two transactions conflict on the same row (OC000) or when a session’s cached schema is stale (OC001). In psycopg3, this surfaces as psycopg.errors.SerializationFailure. Both cases are safe to retry. The recommended pattern is exponential backoff with jitter so that concurrent retries are less likely to conflict with each other.

Add src/utils/retry.py to the project:

# src/utils/retry.py
import time
import random
import psycopg.errors
from typing import Callable, TypeVar

T = TypeVar("T")
BASE_DELAY = 0.05  # 50ms

def with_retry(fn: Callable[[], T], max_retries: int = 3) -> T:
    for attempt in range(max_retries + 1):
        try:
            return fn()
        except psycopg.errors.SerializationFailure:
            if attempt < max_retries:
                backoff = BASE_DELAY * (2 ** attempt)
                time.sleep(backoff + random.uniform(0, backoff))
            else:
                raise

Wrap any database operation that might conflict in with_retry(). Because the engine runs in AUTOCOMMIT mode, each call to session.begin() starts an explicit transaction that Aurora DSQL evaluates at commit time:

from sqlalchemy.orm import Session
from src.dsql_engine import engine
from src.models import Pet
from src.utils.retry import with_retry

def update_pet_name(pet_id, new_name):
    def _txn():
        with Session(engine) as session:
            with session.begin():
                pet = session.get(Pet, pet_id)
                pet.name = new_name
    with_retry(_txn)

with_retry() retries up to three times with exponential backoff and random jitter. If all retries are exhausted, the SerializationFailure is re-raised for the caller to handle.

Clean up

To delete the Aurora DSQL cluster

Using the AWS CLI:

aws dsql delete-cluster --identifier <cluster-id> --region us-east-1

Or in the Aurora DSQL console: select your cluster, choose Actions, then Delete.

To remove database tables without deleting the cluster

DROP TABLE IF EXISTS "specialty_to_vet";
DROP TABLE IF EXISTS "pet";
DROP TABLE IF EXISTS "owner";
DROP TABLE IF EXISTS "specialty";
DROP TABLE IF EXISTS "vet";

Conclusion

In this post, you learned how to connect SQLAlchemy to Amazon Aurora DSQL by making three targeted adaptations: using UUID primary keys with server_default=text("gen_random_uuid()"), defining table relationships using relationship() with primaryjoin and foreign() annotations instead of ForeignKey() constraints, and configuring the SQLAlchemy engine in AUTOCOMMIT mode to avoid unsupported SAVEPOINT operations. You also added OCC retry logic with exponential backoff and jitter to handle write conflicts in production. These patterns apply to other Python ORMs on Aurora DSQL.

To take this further, connect as a non-admin user with a custom schema to test multi-tenant patterns.

Get started by cloning the aurora-dsql-samples repository and deploying the sample against your own Aurora DSQL cluster. For more on Aurora DSQL, see the Aurora DSQL User Guide. To learn more about SQLAlchemy’s PostgreSQL support, visit the SQLAlchemy documentation.


About the authors

Dipen Patel

Dipen Patel

Dipen is a Technical Account Manager at Amazon Web Services with over 7 years of experience helping enterprise customers architect, optimize, and operate their workloads on AWS. He specializes in guiding customers through complex architectural decisions, cloud-native migrations, and operational best practices across a broad range of AWS services. Dipen is passionate about bridging the gap between emerging AWS capabilities and real-world customer outcomes.