Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Graph Streaming

Instead of waiting for the entire graph to finish, you can stream execution and receive a GraphEvent after each node completes. This is useful for progress reporting, real-time UIs, and debugging.

stream() and StreamMode

The stream() method on CompiledGraph returns a GraphStream -- a Pin<Box<dyn Stream>> that yields Result<GraphEvent<S>, SynapticError> values:

use synaptic::graph::{StateGraph, FnNode, MessageState, StreamMode, GraphEvent, END};
use synaptic::core::Message;
use futures::StreamExt;

let step_a = FnNode::new(|mut state: MessageState| async move {
    state.messages.push(Message::ai("Step A done"));
    Ok(state)
});

let step_b = FnNode::new(|mut state: MessageState| async move {
    state.messages.push(Message::ai("Step B done"));
    Ok(state)
});

let graph = StateGraph::new()
    .add_node("a", step_a)
    .add_node("b", step_b)
    .set_entry_point("a")
    .add_edge("a", "b")
    .add_edge("b", END)
    .compile()?;

let initial = MessageState::with_messages(vec![Message::human("Start")]);

let mut stream = graph.stream(initial, StreamMode::Values);
while let Some(event) = stream.next().await {
    let event: GraphEvent<MessageState> = event?;
    println!(
        "Node '{}' completed -- {} messages in state",
        event.node,
        event.state.messages.len()
    );
}
// Output:
//   Node 'a' completed -- 2 messages in state
//   Node 'b' completed -- 3 messages in state

GraphEvent

Each event contains:

FieldTypeDescription
nodeStringThe name of the node that just executed
stateSThe state snapshot after the node ran

Stream Modes

The StreamMode enum controls what the state field contains:

ModeBehavior
StreamMode::ValuesEach event contains the full accumulated state after the node
StreamMode::UpdatesEach event contains the pre-node state (useful for computing per-node deltas)
StreamMode::MessagesSame as Values — callers filter for AI messages in chat UIs
StreamMode::DebugSame as Values — intended for detailed debug information
StreamMode::CustomEvents emitted via StreamWriter during node execution

Multi-Mode Streaming

You can request multiple stream modes simultaneously using stream_modes(). Each event is wrapped in a MultiGraphEvent tagged with its mode:

use synaptic::graph::{StreamMode, MultiGraphEvent};
use futures::StreamExt;

let mut stream = graph.stream_modes(
    initial_state,
    vec![StreamMode::Values, StreamMode::Updates],
);

while let Some(result) = stream.next().await {
    let event: MultiGraphEvent<MessageState> = result?;
    match event.mode {
        StreamMode::Values => {
            println!("Full state after '{}': {:?}", event.event.node, event.event.state);
        }
        StreamMode::Updates => {
            println!("State before '{}': {:?}", event.event.node, event.event.state);
        }
        _ => {}
    }
}

For each node execution, one event per requested mode is emitted. With two modes and three nodes, you get six events total.

Streaming with Checkpoints

You can combine streaming with checkpointing using stream_with_config():

use synaptic::graph::{MemorySaver, CheckpointConfig, StreamMode};
use std::sync::Arc;

let checkpointer = Arc::new(MemorySaver::new());
let graph = graph.with_checkpointer(checkpointer);

let config = CheckpointConfig::new("thread-1");

let mut stream = graph.stream_with_config(
    initial_state,
    StreamMode::Values,
    Some(config),
);

while let Some(event) = stream.next().await {
    let event = event?;
    println!("Node: {}", event.node);
}

Checkpoints are saved after each node during streaming, just as they are during invoke(). If the graph is interrupted (via interrupt_before or interrupt_after), the stream yields the interrupt error and terminates.

Error Handling

The stream yields Result values. If a node returns an error, the stream yields that error and terminates. Consuming code should handle both successful events and errors:

while let Some(result) = stream.next().await {
    match result {
        Ok(event) => println!("Node '{}' succeeded", event.node),
        Err(e) => {
            eprintln!("Graph error: {e}");
            break;
        }
    }
}