Skip to main content

Transactions

Ensure data integrity with ACID transactions, proper error handling, and isolation levels.

ACID Compliance

Full transaction guarantees

Auto Rollback

Errors trigger rollback

Isolation Levels

Control concurrency behavior

Savepoints

Nested transaction support

Transaction Basics

Transactions ensure that a series of operations either all succeed or all fail together:

import { db } from '@/lib/db'
import { accounts, transactions } from '@/lib/db/schema'
import { eq, sql } from 'drizzle-orm'

// Basic transaction with automatic rollback on error
await db.transaction(async (tx) => {
  // Debit from source account
  await tx.update(accounts)
    .set({ balance: sql`${accounts.balance} - 100` })
    .where(eq(accounts.id, sourceAccountId))

  // Credit to destination account
  await tx.update(accounts)
    .set({ balance: sql`${accounts.balance} + 100` })
    .where(eq(accounts.id, destAccountId))

  // Record the transaction
  await tx.insert(transactions).values({
    fromAccountId: sourceAccountId,
    toAccountId: destAccountId,
    amount: 100,
  })
})
// If any operation fails, ALL changes are rolled back

Automatic Commit & Rollback

Drizzle automatically commits if the transaction function completes successfully, and rolls back if any error is thrown.

Returning Values

Return values from transactions for use outside the transaction:

import { db } from '@/lib/db'
import { users, profiles } from '@/lib/db/schema'

// Return created records
const result = await db.transaction(async (tx) => {
  const [user] = await tx.insert(users).values({
    email: 'user@example.com',
    name: 'John Doe',
  }).returning()

  const [profile] = await tx.insert(profiles).values({
    userId: user.id,
    bio: 'Hello world!',
  }).returning()

  return { user, profile }
})

console.log('Created user:', result.user.id)
console.log('Created profile:', result.profile.id)

Manual Rollback

Explicitly rollback transactions based on business logic:

import { db } from '@/lib/db'
import { accounts } from '@/lib/db/schema'
import { eq } from 'drizzle-orm'

async function transferMoney(fromId: string, toId: string, amount: number) {
  return await db.transaction(async (tx) => {
    // Check source account balance
    const [source] = await tx.select()
      .from(accounts)
      .where(eq(accounts.id, fromId))

    if (source.balance < amount) {
      // Throw to trigger rollback
      throw new Error('Insufficient funds')
    }

    // Proceed with transfer
    await tx.update(accounts)
      .set({ balance: source.balance - amount })
      .where(eq(accounts.id, fromId))

    await tx.update(accounts)
      .set({ balance: sql`${accounts.balance} + ${amount}` })
      .where(eq(accounts.id, toId))

    return { success: true, newBalance: source.balance - amount }
  })
}

// Usage with error handling
try {
  const result = await transferMoney(fromId, toId, 100)
  console.log('Transfer successful:', result.newBalance)
} catch (error) {
  console.error('Transfer failed:', error.message)
}

Isolation Levels

Control how transactions interact with concurrent transactions:

PropertyTypeDescription
read uncommittedstringCan see uncommitted changes from other transactions (dirty reads)
read committedstringDefault. Only sees committed changes. May see different data on re-read
repeatable readstringSame query returns same results within transaction. Prevents non-repeatable reads
serializablestringStrictest. Transactions appear to run sequentially. May cause serialization failures
import { db } from '@/lib/db'

// Set isolation level for a transaction
await db.transaction(async (tx) => {
  // Your queries here run with serializable isolation
  const inventory = await tx.select()
    .from(products)
    .where(eq(products.id, productId))

  if (inventory[0].stock < quantity) {
    throw new Error('Out of stock')
  }

  await tx.update(products)
    .set({ stock: inventory[0].stock - quantity })
    .where(eq(products.id, productId))
}, {
  isolationLevel: 'serializable',
})

// Read committed (default) - good for most cases
await db.transaction(async (tx) => {
  // Queries
}, {
  isolationLevel: 'read committed',
})

// Repeatable read - same query returns same data
await db.transaction(async (tx) => {
  // Queries
}, {
  isolationLevel: 'repeatable read',
})

Serialization Failures

With serializable isolation, concurrent transactions may fail with serialization errors. Always implement retry logic for these cases.

Handling Serialization Failures

Implement retry logic for serializable transactions:

import { db } from '@/lib/db'

async function withRetry<T>(
  fn: () => Promise<T>,
  maxRetries = 3,
  delay = 100
): Promise<T> {
  let lastError: Error | null = null

  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      return await fn()
    } catch (error) {
      lastError = error as Error

      // Check if it's a serialization failure (PostgreSQL error code 40001)
      if (error.code === '40001') {
        console.log(`Retry attempt ${attempt + 1} after serialization failure`)
        await new Promise(r => setTimeout(r, delay * Math.pow(2, attempt)))
        continue
      }

      // Re-throw non-serialization errors immediately
      throw error
    }
  }

  throw lastError
}

// Usage
const result = await withRetry(() =>
  db.transaction(async (tx) => {
    // Critical inventory operation
    const [product] = await tx.select()
      .from(products)
      .where(eq(products.id, productId))

    if (product.stock < 1) {
      throw new Error('Out of stock')
    }

    return await tx.update(products)
      .set({ stock: product.stock - 1 })
      .where(eq(products.id, productId))
      .returning()
  }, { isolationLevel: 'serializable' })
)

Nested Transactions (Savepoints)

Use savepoints for partial rollbacks within a transaction:

import { db } from '@/lib/db'
import { orders, orderItems, inventory } from '@/lib/db/schema'

await db.transaction(async (tx) => {
  // Create the order
  const [order] = await tx.insert(orders).values({
    userId,
    status: 'pending',
  }).returning()

  // Process each item with individual savepoints
  for (const item of cartItems) {
    try {
      // Nested transaction creates a savepoint
      await tx.transaction(async (nested) => {
        // Check and reserve inventory
        const [inv] = await nested.select()
          .from(inventory)
          .where(eq(inventory.productId, item.productId))

        if (inv.available < item.quantity) {
          throw new Error(`Insufficient inventory for ${item.productId}`)
        }

        await nested.update(inventory)
          .set({ available: inv.available - item.quantity })
          .where(eq(inventory.productId, item.productId))

        await nested.insert(orderItems).values({
          orderId: order.id,
          productId: item.productId,
          quantity: item.quantity,
          price: item.price,
        })
      })
    } catch (error) {
      // Savepoint rolled back, but outer transaction continues
      console.log(`Skipping item: ${error.message}`)
      // Could add to a "failed items" list
    }
  }

  // Update order total
  await tx.update(orders)
    .set({ status: 'confirmed' })
    .where(eq(orders.id, order.id))
})

Savepoint Behavior

Nested transactions in Drizzle use PostgreSQL savepoints. If the inner transaction fails, only changes since the savepoint are rolled back. The outer transaction can continue.

Error Handling Patterns

Best practices for handling transaction errors:

import { db } from '@/lib/db'

// Custom error types for transaction failures
class InsufficientFundsError extends Error {
  constructor(public available: number, public required: number) {
    super(`Insufficient funds: need ${required}, have ${available}`)
    this.name = 'InsufficientFundsError'
  }
}

class ConcurrencyError extends Error {
  constructor(message: string) {
    super(message)
    this.name = 'ConcurrencyError'
  }
}

// Transaction with typed error handling
async function processPayment(userId: string, amount: number) {
  try {
    return await db.transaction(async (tx) => {
      const [account] = await tx.select()
        .from(accounts)
        .where(eq(accounts.userId, userId))

      if (!account) {
        throw new Error('Account not found')
      }

      if (account.balance < amount) {
        throw new InsufficientFundsError(account.balance, amount)
      }

      // Version check for optimistic locking
      const updated = await tx.update(accounts)
        .set({
          balance: account.balance - amount,
          version: account.version + 1,
        })
        .where(and(
          eq(accounts.userId, userId),
          eq(accounts.version, account.version)
        ))
        .returning()

      if (updated.length === 0) {
        throw new ConcurrencyError('Account was modified by another transaction')
      }

      return updated[0]
    })
  } catch (error) {
    if (error instanceof InsufficientFundsError) {
      return { success: false, reason: 'insufficient_funds', ...error }
    }
    if (error instanceof ConcurrencyError) {
      return { success: false, reason: 'concurrency', message: error.message }
    }
    throw error // Re-throw unexpected errors
  }
}

Best Practices

Follow these guidelines for reliable transactions:

Keep Transactions Short

Long-running transactions hold locks and can cause contention. Do expensive computations before starting the transaction.

Implement Retry Logic

Always retry on transient failures like deadlocks and serialization errors. Use exponential backoff to avoid thundering herd.

Use Appropriate Isolation

Start with read committed (default). Only use serializable when you need to prevent all anomalies.

No External Side Effects

Don't send emails, call external APIs, or write files inside transactions. These can't be rolled back if the transaction fails.

Side Effects Pattern
// BAD: Side effect inside transaction
await db.transaction(async (tx) => {
  await tx.insert(orders).values(order)
  await sendEmail(user.email, 'Order confirmed') // Can't rollback!
})

// GOOD: Side effect after transaction
const order = await db.transaction(async (tx) => {
  return await tx.insert(orders).values(order).returning()
})

// Only send email after successful commit
await sendEmail(user.email, 'Order confirmed')