Pipeline Engine
A generic, composable step runner for ordered and parallel data processing.
What is the Pipeline Engine?
The Pipeline Engine is a lightweight, domain-agnostic framework for composing processing steps into pipelines. It provides contract validation, immutable context passing, and built-in concurrency control — all in ~300 lines of pure Python with no external dependencies beyond the standard library.
Everything composes from three primitives:
Sequential — steps run one after another:
Branch — fork, run in parallel, join:
Nesting — a pipeline used as a step:
Steps declare what data they read and write. The pipeline validates ordering at construction time — before any data flows — so wiring errors surface immediately, not at runtime.
Core Principles
- Three primitives — Sequential steps, parallel branches, and nested pipelines cover every composition pattern
- Contracts — Steps declare
requiresandprovidesfields; the pipeline validates ordering at construction time - Immutable context — Steps receive a frozen context and return a new one via
.replace(), making concurrent execution safe by default - Declared concurrency — Parallelism is configured on the step (
max_workers,async_boundary), not the pipeline - Per-sample error isolation — One failing sample never blocks others; every sample produces a result
- Observation hooks —
PipelineHooklets external code observe step transitions without modifying data flow (progress streaming, metrics, logging) - Cancellation —
CancellationTokenstops a running pipeline between steps;cancel_token_varmakes the token readable inside steps for intra-step cancellation
Architecture at a Glance
Pipeline and Branch both satisfy StepProtocol through structural typing — no inheritance required. This means a Pipeline can be used as a step inside another pipeline, and a Branch slots into any step position.
| Concept | What it is | Threading | Data flow |
|---|---|---|---|
| Step | Single unit of work | Sync internally | Receives and returns StepContext |
| Pipeline | Ordered chain of steps | workers=N across samples | Passes StepContext step-to-step |
| Branch | Parallel fork/join | One thread per branch | Copies context in, merges outputs |
| Nested Pipeline | Pipeline used as a step | Inherits parent threading | Same StepContext flow |
Async Boundary — Background Processing
One of the engine's key features is the async boundary: a way to split a pipeline into foreground (fast return) and background (fire-and-forget) stages.
Mark any step with async_boundary = True — the pipeline returns results immediately after the foreground steps, while everything from the boundary onward continues in background threads. Use pipe.wait_for_background() when you need the final results.
This is critical for pipelines where early steps produce user-facing output quickly but later steps (analysis, logging, scoring) are slow and don't need to block the caller. See Execution Model for full details.
When to Use
Good fit
- Ordered multi-step processing with explicit data dependencies
- Parallel fork/join patterns (multiple independent operations on the same data)
- Fire-and-forget background processing with
async_boundary- Any pipeline where you want construction-time contract validation
Not designed for
- DAG scheduling with complex dependency graphs
- Distributed computing across multiple machines
- Stream processing with backpressure
- ETL pipelines requiring a data catalog
Installation
The pipeline engine is included in the project with no extra dependencies:
from pipeline import Pipeline, Branch, StepContext, MergeStrategy, PipelineHook, CancellationToken
Using the Pipeline Engine with ACE
If you're building ACE pipelines, see Composing Pipelines for ACE-specific steps and patterns. All pipeline classes are also importable from
acedirectly:from ace import Pipeline, Branch, ...
What's Next
- Quick Start — Build and run your first pipeline in under 30 lines
- Core Concepts — Understand Step, Context, and the contract system
- Execution Model — Three types of async, workers, and background processing
- Branching & Parallelism — Parallel fork/join with merge strategies
- Error Handling — Per-sample isolation, SampleResult, and error types
- Building Custom Steps — Create your own steps with dependency injection
- API Reference — Complete signatures for all public classes