Composing Custom Pipelines¶
ACE is built on a composable pipeline engine. Every runner (ACE, BrowserUse,
LangChain, ClaudeCode, TraceAnalyser) is a thin wrapper around a Pipeline
made of steps. You can compose your own pipelines by mixing and matching these
steps — or writing custom ones.
Three Levels of ACE¶
| Level | Pattern | Control |
|---|---|---|
| Zero-config | ACELiteLLM.from_model("gpt-4o-mini") |
Roles + pipeline auto-created |
| Role customisation | ACE.from_roles(agent=..., reflector=..., ...) |
Custom roles, pipeline auto-composed |
| Pipeline composition | Pipeline([AgentStep(...), ...]) |
Full control over step ordering |
This guide covers Level 3 — composing pipelines directly.
Anatomy of an ACE Pipeline¶
Every ACE pipeline is a sequence of steps, each with a requires/provides
contract that declares what context fields it reads and writes:
AgentStep ─────> EvaluateStep ─────> ReflectStep ─────> TagStep ─────> UpdateStep ─────> ApplyStep
provides: provides: provides: (metadata) provides: (mutates
agent_output trace reflection skill_manager skillbook)
_output
The pipeline validates these contracts at construction time — if a step requires a field that no earlier step provides, you'll get an error immediately.
Composing from Steps¶
All pipeline classes and ACE steps are importable from ace_next:
from ace_next import (
# Pipeline engine
Pipeline, Branch, MergeStrategy, StepProtocol, SampleResult,
# ACE context
ACEStepContext, SkillbookView,
# Roles
Agent, Reflector, SkillManager,
# Steps
AgentStep, EvaluateStep, learning_tail,
# Types
LiteLLMClient, Sample, Skillbook, SimpleEnvironment,
)
llm = LiteLLMClient(model="gpt-4o-mini")
skillbook = Skillbook()
pipe = Pipeline([
AgentStep(Agent(llm)),
EvaluateStep(SimpleEnvironment()),
*learning_tail(Reflector(llm), SkillManager(llm), skillbook),
])
Using learning_tail()¶
The learning_tail() helper returns the standard learning step sequence:
from ace_next import learning_tail, Reflector, SkillManager, Skillbook
steps = learning_tail(
Reflector(llm),
SkillManager(llm),
Skillbook(),
dedup_manager=my_dedup_manager, # optional
checkpoint_dir="/tmp/checkpoints", # optional
)
# Returns: [ReflectStep, TagStep, UpdateStep, ApplyStep,
# DeduplicateStep, CheckpointStep]
Use it when building custom integrations that provide their own execute step but want the standard learning pipeline.
Inspecting Runner Presets with build_steps()¶
Every runner has a build_steps() classmethod that returns the step list it
would use internally. This lets you inspect, modify, and recompose:
from ace_next import ACE, Pipeline, ACERunner, Skillbook
# Get the default steps
steps = ACE.build_steps(
agent=my_agent,
reflector=my_reflector,
skill_manager=my_skill_manager,
environment=my_env,
)
# Insert a custom step after EvaluateStep
steps.insert(2, MyLoggingStep())
# Build your own pipeline and runner
skillbook = Skillbook()
pipe = Pipeline(steps)
runner = ACERunner(pipeline=pipe, skillbook=skillbook)
results = runner.run(samples)
All runners support build_steps(): ACE, BrowserUse, ClaudeCode,
LangChain, and TraceAnalyser.
Writing Custom Steps¶
A step is any object satisfying StepProtocol — no base class needed:
from ace_next import ACEStepContext
class MyLoggingStep:
requires = frozenset({"agent_output"})
provides = frozenset()
def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
print(f"Agent answered: {ctx.agent_output.final_answer}")
return ctx
Key rules:
requires: frozenset of context field names this step readsprovides: frozenset of context field names this step writes__call__: receives and returnsACEStepContext(usectx.replace(...)for updates)- Steps should be stateless — no internal counters
Mixing Integrations¶
You can compose steps from different integrations into one pipeline. For example, combining a browser-use execute step with custom learning:
from ace_next import Pipeline, learning_tail, Reflector, SkillManager, Skillbook
from ace_next.integrations.browser_use import BrowserExecuteStep, BrowserToTrace
skillbook = Skillbook()
pipe = Pipeline([
BrowserExecuteStep(browser_llm),
BrowserToTrace(),
MyCustomFilterStep(), # your custom step
*learning_tail(Reflector(llm), SkillManager(llm), skillbook),
])
Integration steps live in ace_next.integrations since they have
framework-specific dependencies.
Running the Pipeline¶
With a runner¶
The simplest way to run a custom pipeline is through ACERunner:
from ace_next import ACERunner, Sample, Skillbook
runner = ACERunner(pipeline=pipe, skillbook=skillbook)
results = runner.run(
[Sample(question="What is 2+2?", ground_truth="4")],
epochs=1,
)
Directly¶
You can also run the pipeline directly by constructing contexts yourself:
from ace_next import Pipeline, ACEStepContext, SkillbookView, Sample, Skillbook
ctx = ACEStepContext(
sample=Sample(question="What is 2+2?", ground_truth="4"),
skillbook=SkillbookView(skillbook),
)
results = pipe.run([ctx])
pipe.wait_for_background() # wait for async learning steps
Branching (Parallel Steps)¶
The pipeline engine supports parallel branches for steps that can run concurrently:
from ace_next import Pipeline, Branch, MergeStrategy
pipe = Pipeline([
AgentStep(agent),
Branch(
[EvaluateStep(env_a), EvaluateStep(env_b)],
merge=MergeStrategy.LAST,
),
*learning_tail(reflector, skill_manager, skillbook),
])
See the Pipeline Engine docs for full branching and merge strategy details.
Using RRStep (Recursive Reflector)¶
RRStep satisfies both StepProtocol and ReflectorLike, so it can be used
in two ways:
As a drop-in reflector replacement¶
Pass it anywhere a Reflector is expected:
from ace_next import ACELiteLLM, LiteLLMClient
from ace_next.rr import RRStep, RRConfig
llm = LiteLLMClient(model="gpt-4o-mini")
ace = ACELiteLLM(llm, reflector=RRStep(llm, config=RRConfig(max_iterations=10)))
As a pipeline step¶
Place it directly in a pipeline (it provides reflection):
from ace_next import Pipeline, learning_tail, SkillManager, Skillbook, LiteLLMClient
from ace_next.rr import RRStep, RRConfig, RROpikStep
llm = LiteLLMClient(model="gpt-4o-mini")
skillbook = Skillbook()
rr = RRStep(llm, config=RRConfig(max_iterations=15))
pipe = Pipeline([
MyExecuteStep(),
MyToTrace(),
rr, # replaces ReflectStep — provides "reflection"
*learning_tail(None, SkillManager(llm), skillbook)[1:], # skip ReflectStep
RROpikStep(project_name="my-project"), # optional observability
])
With a separate sub-agent model¶
Route sub-agent calls to a smaller/faster model:
from ace_next.rr import RRStep, RRConfig
main_llm = LiteLLMClient(model="gpt-4o")
fast_llm = LiteLLMClient(model="gpt-4o-mini")
rr = RRStep(
main_llm,
config=RRConfig(max_iterations=20, max_llm_calls=40),
subagent_llm=fast_llm,
)
See RR_DESIGN.md for the full architecture, sandbox API, configuration reference, and trace schema.
Available Steps¶
All steps are importable from ace_next:
| Step | Purpose |
|---|---|
AgentStep |
Execute Agent role |
EvaluateStep |
Run TaskEnvironment evaluation |
ReflectStep |
Run Reflector role (async boundary) |
TagStep |
Tag skills for update |
UpdateStep |
Run SkillManager to generate updates |
ApplyStep |
Apply updates to skillbook |
DeduplicateStep |
Merge near-duplicate skills |
CheckpointStep |
Save skillbook to disk |
LoadTracesStep |
Load JSONL trace files |
ExportSkillbookMarkdownStep |
Export skillbook as markdown |
ObservabilityStep |
Generic observability hook |
PersistStep |
Persist step output |
OpikStep |
Log traces to Opik |
RRStep |
Recursive Reflector |
Integration steps (in ace_next.integrations):
| Step | Integration |
|---|---|
BrowserExecuteStep / BrowserToTrace |
browser-use |
LangChainExecuteStep / LangChainToTrace |
LangChain |
ClaudeCodeExecuteStep / ClaudeCodeToTrace |
Claude Code |
OpenClawToTraceStep |
OpenClaw |