Error Handling¶
The pipeline engine guarantees that every sample produces a SampleResult — nothing is dropped silently. One failing sample never blocks others. Retry logic is the responsibility of individual steps, not the pipeline.
SampleResult¶
Every sample that enters run() produces exactly one SampleResult:
@dataclass
class SampleResult:
sample: Any # the original input
output: StepContext | None # final context (None if failed)
error: Exception | None # the exception (None if succeeded)
failed_at: str | None # step class name where error occurred
cause: Exception | None = None # inner exception for BranchError
| Field | On success | On failure |
|---|---|---|
sample |
original input | original input |
output |
final StepContext |
None |
error |
None |
the exception |
failed_at |
None |
class name of the failing step (e.g. "Tokenize") |
cause |
None |
inner exception when failed_at == "Branch" |
Background steps
For steps after an async_boundary, output and error may still be None when run() returns. Call pipe.wait_for_background() to block until all background work completes and results are finalized.
Construction-time errors¶
These are caught before any data flows — they surface immediately when you build the pipeline.
PipelineOrderError¶
Raised when a step requires a field that is produced by a later step in the pipeline:
from pipeline import Pipeline
from pipeline.errors import PipelineOrderError
class Uppercase:
requires = frozenset({"tokens"})
provides = frozenset({"upper_tokens"})
def __call__(self, ctx): ...
class Tokenize:
requires = frozenset()
provides = frozenset({"tokens"})
def __call__(self, ctx): ...
try:
Pipeline().then(Uppercase()).then(Tokenize())
except PipelineOrderError as e:
print(e)
# Uppercase requires {"tokens"} but it is provided by a later step
Tip
PipelineOrderError is always a bug — reorder your steps. Fields not produced by any step in the pipeline are treated as external inputs and do not trigger this error.
PipelineConfigError¶
Raised for invalid pipeline wiring:
Multiple async boundaries:
from pipeline.errors import PipelineConfigError
class StepA:
requires = frozenset()
provides = frozenset({"a"})
async_boundary = True
def __call__(self, ctx): ...
class StepB:
requires = frozenset({"a"})
provides = frozenset({"b"})
async_boundary = True
def __call__(self, ctx): ...
try:
Pipeline().then(StepA()).then(StepB())
except PipelineConfigError:
print("Only one async_boundary per pipeline is allowed")
Async boundary inside a Branch child:
try:
Pipeline().branch(
Pipeline().then(StepA()), # async_boundary = True inside branch
)
except PipelineConfigError:
print("async_boundary inside Branch children is not allowed")
Runtime errors¶
Foreground failures¶
When a step before the async_boundary (or in a pipeline with no boundary) raises an exception, the pipeline catches it per-sample and records it in the SampleResult:
class Boom:
requires = frozenset()
provides = frozenset()
def __call__(self, ctx):
raise RuntimeError(f"Failed on {ctx.sample!r}")
pipe = Pipeline().then(Tokenize()).then(Boom())
results = pipe.run([
StepContext(sample="good"),
StepContext(sample="also good"),
])
for r in results:
if r.error:
print(f"Sample '{r.sample}' failed at {r.failed_at}: {r.error}")
else:
print(f"Sample '{r.sample}' succeeded")
Sample 'good' failed at Boom: Failed on 'good'
Sample 'also good' failed at Boom: Failed on 'also good'
Each sample is processed independently — one failure does not prevent others from running.
Background failures¶
When a step after the async_boundary raises, the caller has already moved on. The exception is captured and attached to the SampleResult in-place:
pipe = Pipeline().then(Tokenize()).then(BrokenBackgroundStep())
results = pipe.run(samples)
# results returned immediately — background still running
pipe.wait_for_background(timeout=10.0)
# Now check for background failures
for r in results:
if r.error:
print(f"Background failure at {r.failed_at}: {r.error}")
BranchError¶
When one or more branch pipelines fail, a BranchError is raised with the full list of failures:
from pipeline.errors import BranchError
results = pipe.run(contexts)
for r in results:
if isinstance(r.error, BranchError):
print(f"{len(r.error.failures)} branch(es) failed:")
for f in r.error.failures:
print(f" {type(f).__name__}: {f}")
elif r.error:
print(f"Step failure at {r.failed_at}: {r.error}")
All branches run to completion before BranchError is raised — no branch is cancelled when another fails. The SampleResult.cause field carries the inner exception from the failing branch.
Inspecting results¶
The standard pattern after run():
results = pipe.run(contexts)
pipe.wait_for_background() # if using async_boundary
succeeded = [r for r in results if r.error is None]
failed = [r for r in results if r.error is not None]
print(f"{len(succeeded)} succeeded, {len(failed)} failed")
for r in failed:
print(f" Sample: {r.sample}")
print(f" Failed at: {r.failed_at}")
print(f" Error: {r.error}")
Background monitoring¶
wait_for_background()¶
Blocks until all background tasks complete:
# Wait indefinitely
pipe.wait_for_background()
# Wait with timeout — raises TimeoutError if not done
pipe.wait_for_background(timeout=30.0)
Completed threads are removed from the tracking list after this call.
background_stats()¶
Returns a snapshot of background task progress. Thread-safe — can be called from any thread while the pipeline is running: