Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Each

This guide shows how to use RunnableEach to map a runnable over each element in a list.

Overview

RunnableEach wraps any BoxRunnable<I, O> and applies it to every element in a Vec<I>, producing a Vec<O>. It is the runnable equivalent of Iterator::map() -- process a batch of items through the same transformation.

Basic usage

use synaptic::runnables::{Runnable, RunnableEach, RunnableLambda};
use synaptic::core::RunnableConfig;

let upper = RunnableLambda::new(|s: String| async move {
    Ok(s.to_uppercase())
});

let each = RunnableEach::new(upper.boxed());

let config = RunnableConfig::default();
let result = each.invoke(
    vec!["hello".into(), "world".into()],
    &config,
).await?;

assert_eq!(result, vec!["HELLO", "WORLD"]);

Error propagation

If the inner runnable fails on any element, RunnableEach stops and returns that error immediately. Elements processed before the failure are discarded:

use synaptic::runnables::{Runnable, RunnableEach, RunnableLambda};
use synaptic::core::{RunnableConfig, SynapticError};

let must_be_short = RunnableLambda::new(|s: String| async move {
    if s.len() > 5 {
        Err(SynapticError::Other(format!("too long: {s}")))
    } else {
        Ok(s.to_uppercase())
    }
});

let each = RunnableEach::new(must_be_short.boxed());
let config = RunnableConfig::default();

let result = each.invoke(
    vec!["hi".into(), "toolong".into(), "ok".into()],
    &config,
).await;

assert!(result.is_err()); // fails on "toolong"

Empty input

An empty input vector produces an empty output vector:

use synaptic::runnables::{Runnable, RunnableEach, RunnableLambda};
use synaptic::core::RunnableConfig;

let identity = RunnableLambda::new(|s: String| async move { Ok(s) });
let each = RunnableEach::new(identity.boxed());

let config = RunnableConfig::default();
let result = each.invoke(vec![], &config).await?;
assert!(result.is_empty());

In a pipeline

RunnableEach implements Runnable<Vec<I>, Vec<O>>, so it composes with the pipe operator. A common pattern is to split input into parts, process each with RunnableEach, and then combine the results:

use synaptic::runnables::{Runnable, RunnableEach, RunnableLambda};

// Step 1: split a string into words
let split = RunnableLambda::new(|s: String| async move {
    Ok(s.split_whitespace().map(String::from).collect::<Vec<_>>())
});

// Step 2: process each word
let process = RunnableEach::new(
    RunnableLambda::new(|w: String| async move {
        Ok(w.to_uppercase())
    }).boxed()
);

// Step 3: join results
let join = RunnableLambda::new(|words: Vec<String>| async move {
    Ok(words.join(", "))
});

let chain = split.boxed() | process.boxed() | join.boxed();
// chain.invoke("hello world".to_string(), &config).await => Ok("HELLO, WORLD")

Type signature

pub struct RunnableEach<I: Send + 'static, O: Send + 'static> {
    inner: BoxRunnable<I, O>,
}

impl<I, O> Runnable<Vec<I>, Vec<O>> for RunnableEach<I, O> { ... }

Elements are processed sequentially in order. For concurrent processing, use RunnableParallel or the batch() method on a BoxRunnable instead.