Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Streaming

LLM responses can take seconds to generate. Without streaming, the user sees nothing until the entire response is complete. Streaming delivers tokens as they are produced, reducing perceived latency and enabling real-time UIs. This page explains how streaming works across Synaptic's layers -- from individual model calls through LCEL chains to graph execution.

Model-Level Streaming

The ChatModel trait provides two methods:

#[async_trait]
pub trait ChatModel: Send + Sync {
    async fn chat(&self, request: ChatRequest) -> Result<ChatResponse, SynapticError>;

    fn stream_chat(&self, request: ChatRequest) -> ChatStream<'_>;
}

chat() waits for the complete response. stream_chat() returns a ChatStream immediately:

pub type ChatStream<'a> =
    Pin<Box<dyn Stream<Item = Result<AIMessageChunk, SynapticError>> + Send + 'a>>;

This is a pinned, boxed, async stream of AIMessageChunk values. Each chunk contains a fragment of the response -- typically a few tokens of text, part of a tool call, or usage information.

Default Implementation

The stream_chat() method has a default implementation that wraps chat() as a single-chunk stream. If a model adapter does not implement true streaming, it falls back to this behavior -- the caller still gets a stream, but it contains only one chunk (the complete response). This means code that consumes a ChatStream works with any model, whether or not it supports true streaming.

Consuming a Stream

use futures::StreamExt;

let mut stream = model.stream_chat(request);

while let Some(chunk) = stream.next().await {
    let chunk = chunk?;
    print!("{}", chunk.content);  // print tokens as they arrive
}

AIMessageChunk Merging

Streaming produces many chunks that must be assembled into a complete message. AIMessageChunk supports the + and += operators:

let mut accumulated = AIMessageChunk::default();

while let Some(chunk) = stream.next().await {
    accumulated += chunk?;
}

let complete_message: Message = accumulated.into_message();

The merge rules:

  • content: Concatenated via push_str. Each chunk's content fragment is appended to the accumulated string.
  • tool_calls: Extended. Chunks may carry partial or complete tool call objects.
  • tool_call_chunks: Extended. Raw partial tool call data from the provider.
  • invalid_tool_calls: Extended.
  • id: The first non-None value wins. Subsequent chunks do not overwrite the ID.
  • usage: Summed field-by-field. If both sides have usage data, input_tokens, output_tokens, and total_tokens are added together. If only one side has usage, it is preserved.

After accumulation, into_message() converts the chunk into a Message::AI with the complete content and tool calls.

LCEL Streaming

The Runnable trait includes a stream() method:

fn stream<'a>(&'a self, input: I, config: &'a RunnableConfig) -> RunnableOutputStream<'a, O>;

The default implementation wraps invoke() as a single-item stream, similar to the model-level default. Components that support true streaming override this method.

Streaming Through Chains

When you call stream() on a BoxRunnable chain (e.g., prompt | model | parser), the behavior is:

  1. Intermediate steps run their invoke() method and pass the result forward.
  2. The final component in the chain streams its output.

This means in a prompt | model | parser chain, the prompt template runs synchronously, the model truly streams, and the parser processes each chunk as it arrives (if it supports streaming) or waits for the complete output (if it does not).

let chain = prompt_template.boxed() | model_runnable.boxed() | parser.boxed();

let mut stream = chain.stream(input, &config);
while let Some(item) = stream.next().await {
    let output = item?;
    // Process each streamed output
}

RunnableGenerator

For producing custom streams, RunnableGenerator wraps an async function that returns a stream:

let generator = RunnableGenerator::new(|input: String, _config| {
    Box::pin(async_stream::stream! {
        for word in input.split_whitespace() {
            yield Ok(word.to_string());
        }
    })
});

This is useful when you need to inject a streaming source into an LCEL chain that is not a model.

Graph Streaming

Graph execution can also stream, yielding events after each node completes:

use synaptic::graph::StreamMode;

let mut stream = graph.stream(initial_state, StreamMode::Values);

while let Some(event) = stream.next().await {
    let event = event?;
    println!("Node '{}' completed. Messages: {}", event.node, event.state.messages.len());
}

StreamMode

ModeYieldsUse Case
ValuesFull state after each nodeWhen you need the complete picture at each step
UpdatesPost-node state snapshotWhen you want to observe what each node changed

GraphEvent

pub struct GraphEvent<S> {
    pub node: String,
    pub state: S,
}

Each event tells you which node just executed and what the state looks like. For a ReAct agent, you would see alternating "agent" and "tools" events, with messages accumulating in the state.

Streaming Output

While model-level and LCEL streaming give you raw chunks, many applications need structured callbacks for tokens, tool calls, errors, and completion metadata. The StreamingOutput trait provides this higher-level interface.

ToolDisplayMeta

Rich display metadata attached to tool calls, useful for rendering tool invocations in a CLI or UI:

use synaptic::core::ToolDisplayMeta;

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ToolDisplayMeta {
    pub emoji: String,    // e.g. "⚡" "📖" "✏️"
    pub label: String,    // e.g. "Execute" "Read File"
    pub verb: String,     // e.g. "executing" "reading"
    pub detail: String,   // e.g. "ls -la ~/..." "src/main.rs"
}

ToolCallInfo

Information about a tool call in progress. The display field carries optional rich metadata for rendering:

use synaptic::core::ToolCallInfo;

pub struct ToolCallInfo {
    pub name: String,
    pub id: String,
    pub args: String,
    pub display: Option<ToolDisplayMeta>,
}

CompletionMeta

Metadata about a completed LLM request, delivered alongside the full response text:

use synaptic::core::CompletionMeta;

pub struct CompletionMeta {
    pub input_tokens: u32,
    pub output_tokens: u32,
    pub duration_ms: u64,
    pub request_id: Option<String>,
}

The StreamingOutput Trait

The trait defines seven lifecycle methods. The first four are the primary interface; the last three have default no-op implementations for optional use:

use synaptic::core::StreamingOutput;

#[async_trait]
pub trait StreamingOutput: Send + Sync {
    /// Called for each text token as it arrives.
    async fn on_token(&self, token: &str);

    /// Called when the model initiates a tool call.
    async fn on_tool_call(&self, info: &ToolCallInfo);

    /// Called when the full response is complete.
    async fn on_complete(&self, full_response: &str, meta: Option<&CompletionMeta>);

    /// Called when an error occurs during streaming.
    async fn on_error(&self, error: &str);

    /// Called when the model emits extended thinking / reasoning content.
    async fn on_reasoning(&self, _content: &str) {}

    /// Called when a tool finishes and returns its result.
    async fn on_tool_result(&self, _name: &str, _content: &str) {}

    /// Periodic heartbeat fired for long-running requests (>15 s).
    async fn on_heartbeat(&self) {}
}
MethodWhen it firesTypical use
on_tokenEach text token arrivesPrint to terminal, update UI
on_tool_callModel initiates a tool callShow spinner, log tool name
on_completeFull response assembledRecord usage, stop spinner
on_errorStreaming error occursDisplay error, retry logic
on_reasoningExtended thinking contentShow chain-of-thought
on_tool_resultTool execution finishesDisplay tool output
on_heartbeatEvery ~15 s during long callsKeep connection alive, update UI

RunContext Integration

StreamingOutput is passed through RunContext as an Arc<dyn Any>, making it available to middleware and inner components without tight coupling:

use std::sync::Arc;
use synaptic::core::{RunContext, StreamingOutput};

let ctx = RunContext::default()
    .with_streaming_output(Arc::new(my_streaming_impl));

// Recover inside middleware or a node:
if let Some(output) = ctx.streaming_output::<Arc<dyn StreamingOutput>>() {
    output.on_token("hello").await;
}

This pattern keeps StreamingOutput decoupled from the core request/response path -- middleware that does not need streaming simply ignores it, while components that do can retrieve and call it.

When to Use Streaming

Use model-level streaming when you need token-by-token output for a chat UI or when you want to show partial results to the user as they are generated.

Use LCEL streaming when you have a chain of operations and want the final output to stream. The intermediate steps run synchronously, but the user sees the final result incrementally.

Use graph streaming when you have a multi-step workflow and want to observe progress. Each node completion is an event, giving you visibility into the graph's execution.

Streaming and Error Handling

Streams can yield errors at any point. A network failure mid-stream, a malformed chunk from the provider, or a graph node failure all produce Err items in the stream. Consumers should handle errors on each next() call:

while let Some(result) = stream.next().await {
    match result {
        Ok(chunk) => process(chunk),
        Err(e) => {
            eprintln!("Stream error: {e}");
            break;
        }
    }
}

There is no automatic retry at the stream level. If a stream fails mid-way, the consumer decides how to handle it -- retry the entire call, return a partial result, or propagate the error. For automatic retries, wrap the model in a RetryChatModel before streaming, which retries the entire request on failure.

See Also