Pipeline Engine¶
A generic, composable step runner for ordered and parallel data processing.
What is the Pipeline Engine?¶
The Pipeline Engine is a lightweight, domain-agnostic framework for composing processing steps into pipelines. It provides contract validation, immutable context passing, and built-in concurrency control — all in ~300 lines of pure Python with no external dependencies beyond the standard library.
Everything composes from three primitives:
Sequential — steps run one after another:
graph LR
A1[Step A] --> B1[Step B] --> C1[Step C]
Branch — fork, run in parallel, join:
graph LR
A2[Step A] --> B2[Step B] & C2[Step C] --> D2[Step D]
Nesting — a pipeline used as a step:
graph LR
A3[Step A] --> P3[[Inner Pipeline]] --> D3[Step D]
Steps declare what data they read and write. The pipeline validates ordering at construction time — before any data flows — so wiring errors surface immediately, not at runtime.
Core Principles¶
- Three primitives — Sequential steps, parallel branches, and nested pipelines cover every composition pattern
- Contracts — Steps declare
requiresandprovidesfields; the pipeline validates ordering at construction time - Immutable context — Steps receive a frozen context and return a new one via
.replace(), making concurrent execution safe by default - Declared concurrency — Parallelism is configured on the step (
max_workers,async_boundary), not the pipeline - Per-sample error isolation — One failing sample never blocks others; every sample produces a result
Architecture at a Glance¶
classDiagram
class StepProtocol {
<<protocol>>
+requires: set[str]
+provides: set[str]
+__call__(ctx: StepContext) StepContext
}
class Pipeline {
+then(step) Pipeline
+branch(*pipelines) Pipeline
+run(samples) list~SampleResult~
+run_async(samples) list~SampleResult~
}
class Branch {
+merge: MergeStrategy
}
class YourStep {
+requires: set[str]
+provides: set[str]
+__call__(ctx) StepContext
}
class StepContext {
<<frozen dataclass>>
+sample: str
+metadata: MappingProxyType
+replace(**kw) StepContext
}
class SampleResult {
<<dataclass>>
+context: StepContext
+error: Exception?
+ok: bool
}
StepProtocol <|.. Pipeline : satisfies
StepProtocol <|.. Branch : satisfies
StepProtocol <|.. YourStep : satisfies
Pipeline *-- "1..*" StepProtocol : contains steps
Branch *-- "2..*" Pipeline : contains pipelines
StepProtocol ..> StepContext : receives & returns
Pipeline ..> SampleResult : produces
Pipeline and Branch both satisfy StepProtocol through structural typing — no inheritance required. This means a Pipeline can be used as a step inside another pipeline, and a Branch slots into any step position.
| Concept | What it is | Threading | Data flow |
|---|---|---|---|
| Step | Single unit of work | Sync internally | Receives and returns StepContext |
| Pipeline | Ordered chain of steps | workers=N across samples |
Passes StepContext step-to-step |
| Branch | Parallel fork/join | One thread per branch | Copies context in, merges outputs |
| Nested Pipeline | Pipeline used as a step | Inherits parent threading | Same StepContext flow |
Async Boundary — Background Processing¶
One of the engine's key features is the async boundary: a way to split a pipeline into foreground (fast return) and background (fire-and-forget) stages.
graph LR
S1["Step A"] --> S2["Step B"] --> AB{{"async_boundary"}} --> S3["Step C<br/><small>background</small>"] --> S4["Step D<br/><small>background</small>"]
style S1 fill:#6366f1,stroke:#4f46e5,color:#fff
style S2 fill:#6366f1,stroke:#4f46e5,color:#fff
style AB fill:#f59e0b,stroke:#d97706,color:#000
style S3 fill:#3b82f6,stroke:#2563eb,color:#fff
style S4 fill:#3b82f6,stroke:#2563eb,color:#fff
Mark any step with async_boundary = True — the pipeline returns results immediately after the foreground steps, while everything from the boundary onward continues in background threads. Use pipe.wait_for_background() when you need the final results.
This is critical for pipelines where early steps produce user-facing output quickly but later steps (analysis, logging, scoring) are slow and don't need to block the caller. See Execution Model for full details.
When to Use¶
Good fit
- Ordered multi-step processing with explicit data dependencies
- Parallel fork/join patterns (multiple independent operations on the same data)
- Fire-and-forget background processing with
async_boundary - Any pipeline where you want construction-time contract validation
Not designed for
- DAG scheduling with complex dependency graphs
- Distributed computing across multiple machines
- Stream processing with backpressure
- ETL pipelines requiring a data catalog
Installation¶
The pipeline engine is included in the project with no extra dependencies:
What's Next¶
- Quick Start — Build and run your first pipeline in under 30 lines
- Core Concepts — Understand Step, Context, and the contract system
- Execution Model — Three types of async, workers, and background processing
- Branching & Parallelism — Parallel fork/join with merge strategies
- Error Handling — Per-sample isolation, SampleResult, and error types
- Building Custom Steps — Create your own steps with dependency injection
- API Reference — Complete signatures for all public classes