Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Graph Checkpointers

By default, you can use [StoreCheckpointer] with an [InMemoryStore] backend, which stores graph state only in-process memory. This means state is lost when the process restarts — not suitable for production.

Synaptic provides four persistent checkpointer backends:

BackendCrateBest For
Redissynaptic-store (feature redis)Low-latency, optional TTL expiry
PostgreSQLsynaptic-store (feature postgres)Relational workloads, ACID guarantees
SQLitesynaptic-store (feature sqlite)Single-machine, no external service
MongoDBsynaptic-store (feature mongodb)Distributed, document-oriented

Setup

Add the relevant crate to Cargo.toml:

# Redis checkpointer
[dependencies]
synaptic = { version = "0.4", features = ["agent", "redis"] }

# PostgreSQL checkpointer
synaptic = { version = "0.4", features = ["agent", "postgres"] }

# SQLite checkpointer (no external service required)
synaptic = { version = "0.4", features = ["agent", "sqlite"] }

# MongoDB checkpointer
synaptic = { version = "0.4", features = ["agent", "mongodb"] }

Redis Checkpointer

Quick start

use synaptic::store::redis::{RedisCheckpointer, RedisCheckpointerConfig};
use synaptic::graph::{create_react_agent, MessageState};
use std::sync::Arc;

// Connect to Redis
let checkpointer = RedisCheckpointer::from_url("redis://127.0.0.1/").await?;

// Build the graph with the persistent checkpointer
let graph = create_react_agent(model, tools)?
    .with_checkpointer(Arc::new(checkpointer));

// Run with a thread ID for persistence
let state = MessageState { messages: vec![Message::human("Hello")] };
let config = RunnableConfig::default().with_metadata("thread_id", "user-123");
let result = graph.invoke_with_config(state, config).await?;

Configuration

use synaptic::store::redis::RedisCheckpointerConfig;

let config = RedisCheckpointerConfig::new("redis://127.0.0.1/")
    .with_ttl(86400)          // Expire checkpoints after 24 hours
    .with_prefix("myapp");    // Custom key prefix (default: "synaptic")

let checkpointer = RedisCheckpointer::new(config).await?;

Configuration reference

FieldTypeDefaultDescription
urlStringrequiredRedis connection URL
ttlOption<u64>NoneTTL in seconds for checkpoint keys
prefixString"synaptic"Key prefix for all checkpoint keys

Key scheme

Redis stores checkpoints using the following keys:

  • Checkpoint data: {prefix}:checkpoint:{thread_id}:{checkpoint_id} — JSON-serialized Checkpoint
  • Thread index: {prefix}:idx:{thread_id} — Redis LIST of checkpoint IDs in chronological order

PostgreSQL Checkpointer

Quick start

use sqlx::postgres::PgPoolOptions;
use synaptic::store::postgres::PgCheckpointer;
use synaptic::graph::{create_react_agent, MessageState};
use std::sync::Arc;

// Create a connection pool
let pool = PgPoolOptions::new()
    .max_connections(5)
    .connect("postgres://user:pass@localhost/mydb")
    .await?;

// Create and initialize the checkpointer (creates table if not exists)
let checkpointer = PgCheckpointer::new(pool);
checkpointer.initialize().await?;

// Build the graph
let graph = create_react_agent(model, tools)?
    .with_checkpointer(Arc::new(checkpointer));

Schema

initialize() creates the following table if it does not exist:

CREATE TABLE IF NOT EXISTS synaptic_checkpoints (
    thread_id     TEXT        NOT NULL,
    checkpoint_id TEXT        NOT NULL,
    state         JSONB       NOT NULL,
    next_node     TEXT,
    parent_id     TEXT,
    metadata      JSONB       NOT NULL DEFAULT '{}',
    created_at    TIMESTAMPTZ NOT NULL DEFAULT now(),
    PRIMARY KEY (thread_id, checkpoint_id)
);

Custom table name

let checkpointer = PgCheckpointer::new(pool)
    .with_table("my_custom_checkpoints");
checkpointer.initialize().await?;

SQLite Checkpointer

The SqliteCheckpointer stores checkpoints in a local SQLite database. It requires no external service and is ideal for single-machine deployments, CLI tools, and development.

Quick start

use synaptic::store::sqlite::SqliteCheckpointer;
use synaptic::graph::{create_react_agent, MessageState};
use std::sync::Arc;

// File-based (persists across restarts)
let checkpointer = SqliteCheckpointer::new("/var/lib/myapp/checkpoints.db")?;

// Build the graph
let graph = create_react_agent(model, tools)?
    .with_checkpointer(Arc::new(checkpointer));

let state = MessageState { messages: vec![Message::human("Hello")] };
let config = RunnableConfig::default().with_metadata("thread_id", "user-123");
let result = graph.invoke_with_config(state, config).await?;

In-memory mode (for testing)

use synaptic::store::sqlite::SqliteCheckpointer;

let checkpointer = SqliteCheckpointer::in_memory()?;

Schema

SqliteCheckpointer::new() automatically creates two tables:

-- Checkpoint state storage
CREATE TABLE IF NOT EXISTS synaptic_checkpoints (
    thread_id     TEXT    NOT NULL,
    checkpoint_id TEXT    NOT NULL,
    state         TEXT    NOT NULL,  -- JSON-serialized Checkpoint
    created_at    INTEGER NOT NULL,  -- Unix timestamp
    PRIMARY KEY (thread_id, checkpoint_id)
);

-- Ordered index for latest/list queries
CREATE TABLE IF NOT EXISTS synaptic_checkpoint_idx (
    thread_id     TEXT    NOT NULL,
    checkpoint_id TEXT    NOT NULL,
    seq           INTEGER NOT NULL,  -- Monotonically increasing per thread
    PRIMARY KEY (thread_id, checkpoint_id)
);

Notes

  • Uses rusqlite with the bundled feature — no external libsqlite3 required.
  • Async operations use tokio::task::spawn_blocking to avoid blocking the runtime.
  • PUT is idempotent: re-inserting the same checkpoint_id replaces data but does not add a duplicate index entry.

MongoDB Checkpointer

The MongoCheckpointer stores checkpoints in MongoDB, suitable for distributed deployments where multiple processes share state.

Quick start

use synaptic::store::mongodb::MongoCheckpointer;
use synaptic::graph::{create_react_agent, MessageState};
use std::sync::Arc;

let client = mongodb::Client::with_uri_str("mongodb://localhost:27017").await?;
let db = client.database("myapp");
let checkpointer = MongoCheckpointer::new(&db, "graph_checkpoints").await?;

let graph = create_react_agent(model, tools)?
    .with_checkpointer(Arc::new(checkpointer));

let state = MessageState { messages: vec![Message::human("Hello")] };
let config = RunnableConfig::default().with_metadata("thread_id", "user-123");
let result = graph.invoke_with_config(state, config).await?;

Document schema

Each checkpoint is stored as a MongoDB document:

{
  "thread_id":     "user-123",
  "checkpoint_id": "18f4a2b1-0001",
  "seq":           0,
  "state":         "{...serialized Checkpoint JSON...}",
  "created_at":    { "$date": "2026-02-22T00:00:00Z" }
}

Two indexes are created automatically:

  • Unique index on (thread_id, checkpoint_id) — ensures idempotent puts.
  • Compound index on (thread_id, seq) — used for ordered list() and latest get().

Notes

  • Compatible with MongoDB Atlas and self-hosted MongoDB 5.0+.
  • put() uses upsert semantics: re-inserting the same checkpoint ID is safe.
  • get() without a checkpoint_id returns the document with the highest seq.

Human-in-the-loop with persistence

Persistent checkpointers enable stateful human-in-the-loop workflows:

use synaptic::graph::{StateGraph, MessageState, StreamMode};
use synaptic::store::sqlite::SqliteCheckpointer;
use std::sync::Arc;

let checkpointer = Arc::new(SqliteCheckpointer::new("/var/lib/myapp/checkpoints.db")?);

// Compile graph with interrupt before "human_review" node
let graph = builder
    .interrupt_before(vec!["human_review"])
    .compile_with_checkpointer(checkpointer)?;

// First invocation — graph pauses before "human_review"
let config = RunnableConfig::default().with_metadata("thread_id", "session-42");
let result = graph.invoke_with_config(initial_state, config.clone()).await?;

// Inject human feedback and resume
let updated = graph.update_state(config.clone(), feedback_state).await?;
let final_result = graph.invoke_with_config(updated, config).await?;

Time-travel debugging

Retrieve any historical checkpoint by ID for debugging or replaying:

use synaptic::graph::{CheckpointConfig, Checkpointer};

let config = CheckpointConfig::with_checkpoint_id("thread-123", "specific-checkpoint-id");
if let Some(checkpoint) = checkpointer.get(&config).await? {
    println!("State at checkpoint: {:?}", checkpoint.state);
}

// List all checkpoints for a thread
let all = checkpointer.list(&CheckpointConfig::new("thread-123")).await?;
println!("Total checkpoints: {}", all.len());

Comparison

CheckpointerPersistenceExternal DepTTLDistributed
StoreCheckpointerNo (in-process)NoneNoNo
SqliteCheckpointerYes (file)NoneNoNo
RedisCheckpointerYesRedisYesYes
PgCheckpointerYesPostgreSQLNoYes
MongoCheckpointerYesMongoDBNoYes

Error handling

use synaptic::core::SynapticError;

match checkpointer.get(&config).await {
    Ok(Some(cp)) => println!("Loaded checkpoint: {}", cp.id),
    Ok(None) => println!("No checkpoint found"),
    Err(SynapticError::Store(msg)) => eprintln!("Storage error: {msg}"),
    Err(e) => return Err(e.into()),
}