Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Generator

This guide shows how to use RunnableGenerator to create a runnable from a streaming generator function.

Overview

RunnableGenerator wraps a function that produces a Stream of results. It bridges the gap between streaming generators and the Runnable trait:

  • invoke() collects the entire stream into a Vec<O>
  • stream() yields each item individually as it is produced

This is useful when you want a runnable that naturally produces output incrementally -- for example, tokenizers, chunkers, or any computation that yields partial results.

Basic usage

use synaptic::runnables::{Runnable, RunnableGenerator};
use synaptic::core::{RunnableConfig, SynapticError};

let gen = RunnableGenerator::new(|input: String| {
    async_stream::stream! {
        for word in input.split_whitespace() {
            yield Ok(word.to_uppercase());
        }
    }
});

let config = RunnableConfig::default();
let result = gen.invoke("hello world".to_string(), &config).await?;
assert_eq!(result, vec!["HELLO", "WORLD"]);

Streaming

The real power of RunnableGenerator is streaming. stream() yields each item as it is produced, without waiting for the generator to finish:

use futures::StreamExt;
use synaptic::runnables::{Runnable, RunnableGenerator};
use synaptic::core::RunnableConfig;

let gen = RunnableGenerator::new(|input: String| {
    async_stream::stream! {
        for ch in input.chars() {
            yield Ok(ch.to_string());
        }
    }
});

let config = RunnableConfig::default();
let mut stream = gen.stream("abc".to_string(), &config);

// Each item arrives individually wrapped in a Vec
while let Some(item) = stream.next().await {
    let chunk = item?;
    println!("{:?}", chunk); // ["a"], ["b"], ["c"]
}

Each streamed item is wrapped in Vec<O> to match the output type of invoke(). This means stream() yields Result<Vec<O>, SynapticError> where each Vec contains a single element.

Error handling

If the generator yields an Err, invoke() stops collecting and returns that error. stream() yields the error and continues to the next item:

use synaptic::runnables::RunnableGenerator;
use synaptic::core::SynapticError;

let gen = RunnableGenerator::new(|_input: String| {
    async_stream::stream! {
        yield Ok("first".to_string());
        yield Err(SynapticError::Other("oops".into()));
        yield Ok("third".to_string());
    }
});

// invoke() fails on the error:
// gen.invoke("x".to_string(), &config).await => Err(...)

// stream() yields all three items:
// Ok(["first"]), Err(...), Ok(["third"])

In a pipeline

RunnableGenerator implements Runnable<I, Vec<O>>, so it works with the pipe operator. Place it wherever you need streaming generation in a chain:

use synaptic::runnables::{Runnable, RunnableGenerator, RunnableLambda};

let tokenize = RunnableGenerator::new(|input: String| {
    async_stream::stream! {
        for token in input.split_whitespace() {
            yield Ok(token.to_string());
        }
    }
});

let count = RunnableLambda::new(|tokens: Vec<String>| async move {
    Ok(tokens.len())
});

let chain = tokenize.boxed() | count.boxed();

// chain.invoke("one two three".to_string(), &config).await => Ok(3)

Type signature

pub struct RunnableGenerator<I: Send + 'static, O: Send + 'static> { ... }

impl<I, O> Runnable<I, Vec<O>> for RunnableGenerator<I, O> { ... }

The constructor accepts any function Fn(I) -> S where S: Stream<Item = Result<O, SynapticError>> + Send + 'static. The async_stream::stream! macro is the most ergonomic way to produce such a stream.