Skip to content

ACE Architecture Design

Specification for rewriting the legacy ace/ module to use the pipeline engine.


Implementation Status

Implemented in ace_next/ (parallel to ace/ for easy rollback). The package is fully self-contained — all types are copied locally, zero imports from ace/.

Component Status Location
Core types (ACEStepContext, SkillbookView, ACESample) Done ace_next/context.py
Data types (Skill, Skillbook, UpdateBatch, outputs) Done ace_next/skill.py, skillbook.py, updates.py, outputs.py
Environments (Sample, TaskEnvironment, etc.) Done ace_next/environments.py
Protocols (AgentLike, ReflectorLike, etc.) Done ace_next/protocols/
Steps (all 10) Done ace_next/steps/
learning_tail() helper Done ace_next/steps/__init__.py
ACERunner base class Done ace_next/runners/base.py
TraceAnalyser Done ace_next/runners/trace_analyser.py
ACE runner Done ace_next/runners/ace.py
Implementations (Agent, Reflector, SkillManager) Done ace_next/implementations/
Deduplication (DeduplicationManager, SimilarityDetector) Done ace_next/deduplication/
Integration steps (BrowserExecuteStep, LangChainExecuteStep, ClaudeCodeExecuteStep) Done ace_next/integrations/
Integration runners (BrowserUse, LangChain, ClaudeCode) Done ace_next/runners/
Convenience from_model() on integration runners Done ace_next/runners/browser_use.py, langchain.py, claude_code.py
ACELiteLLM convenience wrapper Done ace_next/runners/litellm.py
LLM providers (LiteLLMClient, InstructorClient, LangChainLiteLLMClient, ClaudeCodeLLMClient) Done ace_next/providers/
Recursive Reflector Done ace_next/rr/ (SubRunner base in ace_next/core/)
MCP Server (optional) Done ace_next/integrations/mcp/

Goals

  1. Replace the monolithic ACEBase / OfflineACE / OnlineACE in adaptation.py with pipeline-based classes.
  2. Rename to match what each class actually does:
  3. TraceAnalyser: takes pre-recorded traces, outputs a skillbook. Replaces the concept of "offline" learning.
  4. ACE: the live adaptive pipeline. Replaces the concept of "online" learning. Also supports multi-epoch batch runs.
  5. Clean OOP: shared base class, composition over inheritance, pluggable steps.
  6. Unify the integration pattern — external frameworks produce raw trace objects (any type), TraceAnalyser passes them to the Reflector as-is.
  7. Maximise step granularity — each step does one thing so concerns are separated and each step is independently testable.

Naming Changes

Legacy New What it does
OfflineACE TraceAnalyser Analyse pre-recorded traces → evolve a skillbook
OnlineACE ACE Live execution → feedback → learning loop
ACEBase ACERunner Shared runner infrastructure (composition, not inheritance from Pipeline)
ACEStepResult Removed — use SampleResult from the pipeline engine Unified result type

Core Types

Sample (unchanged)

The existing Sample dataclass stays as-is. ACE uses it.

@dataclass
class Sample:
    question: str
    context: str = ""
    ground_truth: str | None = None
    metadata: dict = field(default_factory=dict)
    id: str | None = None

ACESample — protocol for step access

Steps access ctx.sample.question uniformly. A Protocol makes this duck typing explicit and type-safe:

class ACESample(Protocol):
    """Minimal interface that Sample satisfies."""

    @property
    def question(self) -> str: ...

    @property
    def context(self) -> str: ...

    @property
    def ground_truth(self) -> str | None: ...

    @property
    def metadata(self) -> dict: ...

ACEStepContext.sample is typed as ACESample. Sample satisfies it structurally. Mypy validates both sides: producers must provide the attributes, consumers can rely on them.

SkillbookView — read-only projection

The Skillbook is mutable — steps add, tag, and remove skills. Putting it directly on a frozen=True context would allow mutation through the reference (ctx.skillbook.tag_skill(...) succeeds even though ctx.skillbook = other fails). That breaks the immutability guarantee.

SkillbookView solves this. It wraps a Skillbook and exposes only read methods. Write methods don't exist on the class — calling them raises AttributeError at runtime and a type error at check time.

class SkillbookView:
    """Read-only projection of a Skillbook. Safe on a frozen context."""

    __slots__ = ("_sb",)

    def __init__(self, skillbook: Skillbook) -> None:
        self._sb = skillbook

    def as_prompt(self) -> str:
        return self._sb.as_prompt()

    def get_skill(self, skill_id: str) -> Skill | None:
        return self._sb.get_skill(skill_id)

    def skills(self, include_invalid: bool = False) -> list[Skill]:
        return self._sb.skills(include_invalid=include_invalid)

    def stats(self) -> dict[str, object]:
        return self._sb.stats()

    def __len__(self) -> int:
        return len(self._sb.skills())

    def __iter__(self):
        return iter(self._sb.skills())

    def __repr__(self) -> str:
        return f"SkillbookView({len(self)} skills)"

Enforcement: - Type checker — mypy/pyright flags ctx.skillbook.add_skill(...) because SkillbookView has no such method. - RuntimeAttributeError if someone calls a write method anyway. - Convention — the underlying _sb is underscore-prefixed. Accessing it is a deliberate violation, not an accident.

Steps that only read the skillbook (AgentStep, ReflectStep, UpdateStep, OpikStep) access ctx.skillbook — the view. Steps that write the skillbook (TagStep, ApplyStep, DeduplicateStep, CheckpointStep) receive the real Skillbook via constructor injection and use self.skillbook.

ACEStepContext

Subclass of the pipeline engine's StepContext. Carries all step-to-step data for the ACE pipeline. The pipeline engine only knows about sample and metadata; all ACE-specific fields live here.

@dataclass(frozen=True)
class ACEStepContext(StepContext):
    """Immutable context for the ACE pipeline.

    The skillbook field is a SkillbookView (read-only). Steps that need to
    write to the skillbook receive the real Skillbook via constructor injection.
    """

    sample: ACESample | None = None
    skillbook: SkillbookView | None = None
    trace: object | None = None
    agent_output: AgentOutput | None = None
    reflection: ReflectorOutput | None = None
    skill_manager_output: UpdateBatch | None = None
    epoch: int = 1
    total_epochs: int = 1
    step_index: int = 0
    total_steps: int | None = None
    global_sample_index: int = 0

The trace field holds the raw execution record from any external system — a browser-use AgentHistoryList, a LangChain result dict, a Claude Code transcript, or any arbitrary Python object. It has no enforced schema. The Reflector receives the raw trace and is responsible for making sense of it — this gives maximum flexibility for analysis without constraining trace format. Extraction helpers can be added later as an optional layer if needed.

What goes on the context vs what gets injected:

On the context Injected via constructor
Nature Step-to-step data + read-only dependencies Mutable shared state
Lifetime Per-sample (born in _build_context, dies after pipeline) Per-runner (created once, shared across samples)
Immutable? Yes — frozen fields, read-only views No — mutable by design
Examples agent_output, reflection, skillbook (view) skillbook (real), environment, dedup_manager
Validated by engine? Yes — requires/provides No — runtime error if missing

Protocols

Steps depend on protocols, not concrete classes. Each protocol defines the minimal interface a step needs. Concrete implementations satisfy them structurally — no inheritance required.

All protocols live in ace_next/protocols/ (one file per protocol, re-exported from __init__.py).

Protocol Method Used by Satisfied by
AgentLike generate(question, context, skillbook, reflection, **kwargs) → AgentOutput AgentStep Agent
ReflectorLike reflect(question, agent_output, skillbook, ground_truth, feedback, **kwargs) → ReflectorOutput ReflectStep Reflector
SkillManagerLike update_skills(reflection, skillbook, question_context, progress, **kwargs) → SkillManagerOutput UpdateStep SkillManager
DeduplicationManagerLike get_similarity_report(skillbook) → str \| None DeduplicateStep DeduplicationManager
LLMClientLike complete(prompt, **kwargs) → Any + complete_structured(prompt, response_model, **kwargs) → T Agent, Reflector, SkillManager Any LLM client with both methods

LLMClientLike

The implementations (Agent, Reflector, SkillManager) all depend on LLMClientLike — a protocol requiring two methods:

@runtime_checkable
class LLMClientLike(Protocol):
    def complete(self, prompt: str, **kwargs: Any) -> Any: ...
    def complete_structured(self, prompt: str, response_model: type[T], **kwargs: Any) -> T: ...

complete_structured returns a validated Pydantic model instance. This is the key capability — implementations call llm.complete_structured(prompt, AgentOutput) and get back a typed, validated object. Any LLM client that provides both methods satisfies the protocol: LiteLLMClient wrapped with Instructor, a custom OpenAI wrapper, or a mock for testing.

Design decision: The old ace/roles.py auto-wrapped LLM clients with Instructor if complete_structured was missing. In ace_next, this auto-wrapping is removed — callers must pass a pre-wrapped client (e.g. wrap_with_instructor(LiteLLMClient(...)) from ace_next.providers). This makes the requirement explicit.

Why protocols, not ABC

Protocols use structural typing (duck typing checked by mypy). A class satisfies a protocol if it has the right methods — no class Agent(AgentLike) inheritance needed. This means: - Users can pass any object with a matching method, not just subclasses. - Mocks satisfy protocols without ceremony. - Steps are decoupled from implementations at the type level, not just by convention.

Architecture Layers

The framework separates concerns into four layers:

Layer Location Responsibility Example
Protocols ace_next/protocols/ Interface contracts ReflectorLike.reflect()
Roles ace_next/implementations/ Business logic (LLM calls) Reflector, RRStep
Steps ace_next/steps/ Context plumbing (extract → call role → put back) ReflectStep
Runners ace_next/runners/ Orchestration (sample loop, epoch management) ACELiteLLM

Protocols define what a role must look like. Roles implement the logic. Steps adapt between the pipeline's context-based data flow and the role's parameter-based API. Runners compose steps into pipelines and iterate over inputs.

This means roles are interchangeable anywhere their protocol is expected:

# Simple single-pass reflection
ace = ACELiteLLM(llm, reflector=Reflector(llm))

# Deep recursive reflection (drop-in replacement)
ace = ACELiteLLM(llm, reflector=RRStep(llm, config=RRConfig(max_iterations=10)))

Both Reflector and RRStep satisfy ReflectorLike. The runner and pipeline don't know or care which one is in use.


Class Hierarchy

ACERunner (shared infrastructure: epoch loop, delegates to Pipeline.run())
├── TraceAnalyser       — [Reflect → Tag → Update → Apply]; input = any trace object
├── ACE                 — [Agent → Evaluate → Reflect → Tag → Update → Apply]; input = Sample + Environment
├── BrowserUse          — [BrowserExecute → BrowserToTrace → Reflect → Tag → Update → Apply]; input = task strings
├── LangChain           — [LangChainExecute → LangChainToTrace → Reflect → Tag → Update → Apply]; input = chain inputs
├── ClaudeCode          — [ClaudeCodeExecute → ClaudeCodeToTrace → Reflect → Tag → Update → Apply]; input = task strings
└── OpenClaw (script)   — [LoadTraces → OpenClawToTrace → Reflect → Tag → Update → Apply]; input = JSONL trace files

ACELiteLLM (standalone convenience wrapper — not an ACERunner subclass)
├── ask()               — direct Agent call, no pipeline
├── learn()             — delegates to lazy-init ACE runner
├── learn_from_traces() — delegates to lazy-init TraceAnalyser
└── learn_from_feedback()— runs learning_tail pipeline from last ask()

RRStep (SubRunner[ACEStepContext] — composable iterative step)
├── __call__()          — StepProtocol[ACEStepContext] entry; can be placed in any runner's pipeline
├── reflect()           — ReflectorLike entry; usable as drop-in reflector in runners
└── run_loop()          — SubRunner loop driver; inner Pipeline([LLMCall, ExtractCode, SandboxExec, CheckResult])

All runners compose a Pipeline rather than extending it. RRStep extends SubRunner[ACEStepContext] (from ace_next/core/sub_runner.py) and can be used as a step in any runner's pipeline — it is a black box that satisfies StepProtocol[ACEStepContext]. The pipeline is an implementation detail, not part of the public interface. Each subclass only overrides run() (public signature) and _build_context() (input mapping).

Integration runners (BrowserUse, LangChain, ClaudeCode) each provide two construction paths: from_roles() for pre-built role instances, and from_model() for auto-building roles from a model string. ACELiteLLM is a standalone class (not an ACERunner subclass) because it wraps two different runners and exposes a different API (ask, learn, learn_from_traces).

Each integration runner uses two steps before the learning tail: (1) an execute step that produces an integration-specific result type (e.g. BrowserResult), and (2) a ToTrace step that converts that result into the standardised trace dict the learning tail expects. This separation keeps framework-specific logic in the execute step and trace formatting in the converter — each is independently testable.


ACERunner — shared base

Encapsulates everything that TraceAnalyser, ACE, and integration runners have in common. The runner's only job is the epoch loop and Iterable validation. Per-sample iteration, error handling, background execution, and checkpoints are all delegated to Pipeline.run().

Subclasses only override run() (public signature) and _build_context() (input mapping).

class ACERunner:
    """Shared runner infrastructure for all ACE runners."""

    def __init__(
        self,
        pipeline: Pipeline,
        skillbook: Skillbook,
    ) -> None:
        self.pipeline = pipeline
        self.skillbook = skillbook

    def save(self, path: str) -> None:
        """Save the current skillbook to disk."""
        self.skillbook.save_to_file(path)

    def wait_for_background(self, timeout: float | None = None) -> None:
        """Block until all background learning tasks complete.

        Delegates to Pipeline.wait_for_background(). Call this after run(wait=False)
        before saving the skillbook or reading final results.
        """
        self.pipeline.wait_for_background(timeout)

    @property
    def learning_stats(self) -> dict:
        """Return background learning progress.

        Useful after run(wait=False) to monitor learning without blocking.
        Delegates to Pipeline.background_stats() to avoid reaching into
        pipeline internals.
        """
        return self.pipeline.background_stats()

Responsibilities

Concern Owner
Epoch loop + Iterable validation ACERunner._run()
Per-sample iteration + error isolation Pipeline.run()
Foreground/background split Pipeline.run() (via async_boundary)
Concurrent workers Pipeline.run(workers=N)
Checkpoints CheckpointStep (in the pipeline, configured at construction)
Background drain ACERunner.wait_for_background()Pipeline.wait_for_background()
Background monitoring ACERunner.learning_stats
Skillbook I/O save(path) on the runner (delegates to skillbook.save_to_file())

Each sample is independent — no state persists across samples. The skillbook is the only cross-sample coupling: read-only steps see it via ctx.skillbook (a SkillbookView), write steps mutate it via self.skillbook (the real Skillbook, injected at construction).

Eventual consistency: SkillbookView is a thin delegation wrapper, not a snapshot — it reads from the live Skillbook at call time. When background learning is active (async_boundary on ReflectStep), concurrent samples may observe partially-updated skillbook state. For example, Sample 2's ReflectStep might read the skillbook mid-mutation by Sample 1's ApplyStep. This is by design: steps see a best-effort view of the current skillbook rather than a point-in-time snapshot. The trade-off is acceptable because (1) the Reflector and SkillManager use the skillbook as LLM prompt context, where a few missing or extra skills have negligible impact on output quality, (2) serialising all skillbook reads would eliminate the concurrency benefit of max_workers > 1 on ReflectStep, and (3) write steps (TagStep, ApplyStep) already run with max_workers = 1, so writes are serialised — only reads interleave with writes. If stricter isolation is ever needed, SkillbookView can be changed to snapshot on construction (deep copy) without altering step code.

Generic run loop

Every subclass delegates to _run(). The only thing that varies per subclass is (1) the public run() signature and (2) the _build_context() method that maps input items to ACEStepContext.

def _run(
    self,
    items: Sequence | Iterable,
    *,
    epochs: int,
    wait: bool = True,
    **kwargs,
) -> list[SampleResult]:
    if epochs > 1 and not isinstance(items, Sequence):
        raise ValueError("Multi-epoch requires a Sequence, not a consumed Iterable.")

    results: list[SampleResult] = []
    n = len(items) if isinstance(items, Sequence) else None

    for epoch in range(1, epochs + 1):
        contexts = [
            self._build_context(item, epoch=epoch, total_epochs=epochs,
                                index=idx, total=n,
                                global_sample_index=(epoch - 1) * n + idx if n is not None else idx,
                                **kwargs)
            for idx, item in enumerate(items, start=1)
        ]
        epoch_results = self.pipeline.run(contexts)
        results.extend(epoch_results)

    if wait:
        self.pipeline.wait_for_background()
    return results

wait parameter: When wait=True (default), _run() blocks until all background learning completes before returning — results are fully populated. When wait=False, _run() returns immediately after the foreground steps finish. Background learning continues asynchronously. Use wait_for_background() to drain later, or learning_stats to monitor progress.

The runner builds fully-initialized ACEStepContext objects (epoch counters, pre-filled outputs for traces, etc.) and hands them to Pipeline.run(contexts). Construction IS initialization — from that point on contexts are frozen and the pipeline processes what it receives without wrapping or guessing. Pipeline.run() handles iteration, error isolation, foreground/background split, and concurrent workers. The runner only owns the epoch loop.


TraceAnalyser

Analyses pre-recorded traces without executing an agent. Runs the learning tail only. Accepts raw trace objects of any type — the raw trace is placed directly on ctx.trace and the Reflector is responsible for making sense of it.

When to use

  • You have execution logs from an external system (browser-use, LangChain, custom agent, human sessions).
  • You want to build or refine a skillbook from historical data.
  • You want to re-analyse the same data multiple times (multi-epoch) to extract deeper patterns.

Pipeline

[ReflectStep] → [TagStep] → [UpdateStep] → [ApplyStep]

No AgentStep, no EvaluateStep. The trace already contains the agent's output and the evaluation feedback.

Context building

TraceAnalyser places the raw trace directly on ctx.trace. No extraction, no conversion — the Reflector receives the trace as-is and has full freedom to analyze it however it sees fit.

def _build_context(self, raw_trace, *, epoch, total_epochs, index, total, global_sample_index) -> ACEStepContext:
    return ACEStepContext(
        skillbook=SkillbookView(self.skillbook),
        trace=raw_trace,                         # raw object, no enforced schema
        epoch=epoch,
        total_epochs=total_epochs,
        step_index=index,
        total_steps=total,
        global_sample_index=global_sample_index,
    )

The skillbook field is a SkillbookView — read-only steps access it from the context. Write steps (TagStep, ApplyStep) receive the real Skillbook via constructor injection. Each sample is independent — no state carries over from previous samples.

Interface

class TraceAnalyser(ACERunner):
    """Analyse pre-recorded traces to build a skillbook."""

    @classmethod
    def from_roles(cls, *, reflector, skill_manager, skillbook=None, **kwargs) -> "TraceAnalyser": ...

    def run(
        self,
        traces: Sequence[Any],
        epochs: int = 1,
    ) -> list[SampleResult]: ...

Note: no environment parameter, no converter. The raw trace goes straight onto the context. The Reflector is responsible for making sense of it — this gives maximum flexibility for analysis without constraining trace format. Extraction into structured fields can be added later as an optional step if needed. No checkpoint parameters — checkpoints are configured at construction time via the factory methods.

Multi-epoch semantics

Each epoch re-processes all traces with the current (evolving) skillbook. Early epochs extract obvious patterns; later epochs refine and consolidate.

Epoch 1:  trace₁ → trace₂ → ... → traceₙ   (skillbook grows)
Epoch 2:  trace₁ → trace₂ → ... → traceₙ   (skillbook refines)
Epoch 3:  trace₁ → trace₂ → ... → traceₙ   (diminishing returns)

Each sample is independent. The only thing that evolves across samples (and epochs) is the skillbook itself — visible as a read-only SkillbookView on the context, mutated by write steps via the real Skillbook (constructor-injected).

run() — delegates to _run()

def run(self, traces, epochs=1, *, wait=True):
    return self._run(traces, epochs=epochs, wait=wait)

No epoch loop, no per-sample iteration — _run() handles all of that.


ACE

The full live adaptive pipeline. An agent executes, the reflector analyses, the skill manager updates. Optionally evaluates against a TaskEnvironment for feedback-driven learning.

When to use

  • You are building a new agent from scratch.
  • You want closed-loop learning where the agent improves in real time.
  • Optionally: you have a TaskEnvironment that can evaluate outputs (provides richer feedback for the Reflector).

Pipeline

[AgentStep] → [EvaluateStep] → [ReflectStep] → [TagStep] → [UpdateStep] → [ApplyStep]

Context building

def _build_context(self, sample, *, epoch, total_epochs, index, total, global_sample_index, **_) -> ACEStepContext:
    return ACEStepContext(
        sample=sample,
        skillbook=SkillbookView(self.skillbook),
        epoch=epoch,
        total_epochs=total_epochs,
        step_index=index,
        total_steps=total,
        global_sample_index=global_sample_index,
    )

Each sample is independent. The skillbook field is a SkillbookView (read-only). Write steps receive the real Skillbook via constructor injection. The environment (if any) is injected into EvaluateStep at construction time — it does not appear on the context.

Interface

class ACE(ACERunner):
    """Live adaptive pipeline: Agent → Evaluate → Reflect → Tag → Update → Apply."""

    @classmethod
    def from_roles(cls, *, agent, reflector, skill_manager, environment=None, skillbook=None, **kwargs) -> "ACE": ...

    def run(
        self,
        samples: Sequence[Sample] | Iterable[Sample],
        epochs: int = 1,
        *,
        wait: bool = True,
    ) -> list[SampleResult]: ...

The environment is optional and provided at construction time, not at run() time. When provided, EvaluateStep uses it to generate feedback that enriches the trace. When omitted, the trace still contains the agent's output, question, context, and ground truth — the Reflector can learn from ground-truth comparison or from the agent's reasoning alone.

Single-pass vs multi-epoch

A single class handles both use cases. epochs=1 gives single-pass behaviour. epochs > 1 gives multi-epoch batch training.

# Single pass (was OnlineACE)
results = ace.run(samples, epochs=1)

# Multi-epoch batch (was OfflineACE)
results = ace.run(training_set, epochs=3)

# Fire-and-forget — agent results returned fast, learning continues in background
results = ace.run(samples, wait=False)

When samples is an Iterable (not Sequence), epochs must be 1 — you cannot replay a consumed iterable. _run() raises ValueError if epochs > 1 and samples is not a Sequence. Note: _run() materializes the full iterable into a list of contexts before passing them to Pipeline.run(). This is a deliberate simplification — see Potential Improvements.

run() — delegates to _run()

def run(self, samples, epochs=1, *, wait=True):
    return self._run(samples, epochs=epochs, wait=wait)

No instance state is modified — the runner stays reentrant.


Factory Methods

All runners provide a from_roles factory that takes pre-built role instances. Integration runners (BrowserUse, LangChain, ClaudeCode) also provide a from_model() factory that auto-builds roles from a model string (see High-Level Convenience API).

from_roles — explicit construction

# TraceAnalyser: bring your own roles
analyser = TraceAnalyser.from_roles(
    reflector=Reflector(llm, prompt_template=custom_prompt),
    skill_manager=SkillManager(llm),
    skillbook=existing_skillbook,
)

# ACE: bring your own roles
ace = ACE.from_roles(
    agent=Agent(llm),
    reflector=Reflector(llm),
    skill_manager=SkillManager(llm),
    skillbook=existing_skillbook,
    dedup_manager=DeduplicationManager(DeduplicationConfig(similarity_threshold=0.85)),
)

Common parameters on from_roles

Parameter Default Description
skillbook Skillbook() Starting skillbook (empty if not provided)
dedup_manager None Appends a DeduplicateStep to the pipeline
dedup_interval 10 Deduplication frequency (samples between runs)
checkpoint_dir None Appends a CheckpointStep to the pipeline
checkpoint_interval 10 Checkpoint frequency (samples between saves)
extra_steps None Additional steps appended after the learning tail (e.g. OpikStep)

Checkpoint and deduplication are configured at construction time. The factory conditionally appends the corresponding steps to the pipeline tail. extra_steps are appended last — after dedup and checkpoint. Both classes follow the same pattern — ACE prepends its execute steps:

# TraceAnalyser — learning tail only
@classmethod
def from_roles(cls, *, reflector, skill_manager, skillbook=None,
               dedup_manager=None, dedup_interval=10,
               checkpoint_dir=None, checkpoint_interval=10,
               extra_steps=None):
    skillbook = skillbook or Skillbook()
    steps = learning_tail(
        reflector, skill_manager, skillbook,
        dedup_manager=dedup_manager, dedup_interval=dedup_interval,
        checkpoint_dir=checkpoint_dir, checkpoint_interval=checkpoint_interval,
    )
    if extra_steps:
        steps.extend(extra_steps)
    return cls(pipeline=Pipeline(steps), skillbook=skillbook)

# ACE — execute head + learning tail
@classmethod
def from_roles(cls, *, agent, reflector, skill_manager, environment=None,
               skillbook=None, dedup_manager=None, dedup_interval=10,
               checkpoint_dir=None, checkpoint_interval=10,
               extra_steps=None):
    skillbook = skillbook or Skillbook()
    steps = [
        AgentStep(agent),
        EvaluateStep(environment),
        *learning_tail(
            reflector, skill_manager, skillbook,
            dedup_manager=dedup_manager, dedup_interval=dedup_interval,
            checkpoint_dir=checkpoint_dir, checkpoint_interval=checkpoint_interval,
        ),
    ]
    if extra_steps:
        steps.extend(extra_steps)
    return cls(pipeline=Pipeline(steps), skillbook=skillbook)

Read-only steps (ReflectStep, UpdateStep) access the skillbook via ctx.skillbook (a SkillbookView). Write steps (TagStep, ApplyStep, DeduplicateStep, CheckpointStep) receive the real Skillbook via constructor.


Steps

Reusable step implementations live in ace_next/steps/. Each is a single class in a single file. All satisfy StepProtocol[ACEStepContext] from the pipeline engine. Each step does exactly one thing.

Design principle: steps are stateless. A step's __call__ is a pure function of its constructor arguments and the incoming ACEStepContext. No internal counters, no accumulated state between invocations. If a step needs run-scoped information (like a global sample index for interval logic), the runner computes it and places it on the context. This keeps steps predictable across multiple run() calls — behaviour depends only on what's in the context, not on invocation history.

Public API

All pipeline primitives, ACE steps, and context types are importable from ace_next:

# Pipeline engine
from ace_next import Pipeline, Branch, MergeStrategy, StepProtocol, SampleResult

# ACE context
from ace_next import ACEStepContext, SkillbookView

# Runner base class (for custom runners)
from ace_next import ACERunner

# Core steps
from ace_next import (
    AgentStep, EvaluateStep, ReflectStep, TagStep, UpdateStep, ApplyStep,
    DeduplicateStep, CheckpointStep, LoadTracesStep, ExportSkillbookMarkdownStep,
    ObservabilityStep, PersistStep, learning_tail,
)

Integration steps live in ace_next.integrations (they have framework-specific dependencies):

from ace_next.integrations.browser_use import BrowserExecuteStep, BrowserToTrace
from ace_next.integrations.langchain import LangChainExecuteStep, LangChainToTrace
from ace_next.integrations.claude_code import ClaudeCodeExecuteStep, ClaudeCodeToTrace
from ace_next.integrations.openclaw import OpenClawToTraceStep

Every runner also exposes a build_steps() classmethod that returns the step list it would compose internally, enabling users to inspect, modify, and recompose pipelines. See the Composing Pipelines guide.

Step Summary

Step Requires (context) Injected (constructor) Provides Side effects max_workers
AgentStep sample, skillbook agent agent_output None default (1)
EvaluateStep sample, agent_output environment (optional) trace None default (1)
ReflectStep trace, skillbook reflector reflection None (pure) 3; async_boundary = True
TagStep reflection skillbook (real) Tags skills on skillbook 1
UpdateStep reflection, skillbook skill_manager skill_manager_output None (pure) 1
ApplyStep skill_manager_output skillbook (real) Applies update batch to skillbook 1
DeduplicateStep global_sample_index manager (DeduplicationManagerLike), skillbook (real) Consolidates similar skills 1
CheckpointStep global_sample_index skillbook (real) Saves skillbook to disk 1
OpikStep skillbook project_name, tags Logs pipeline traces to Opik 1
RROpikStep reflection project_name, tags Logs RR REPL traces to Opik (hierarchical) 1
LoadTracesStep sample trace None (pure) default (1)
OpenClawToTraceStep trace trace None (pure) default (1)
PersistStep skillbook target_path Writes skillbook to CLAUDE.md or similar 1
ExportSkillbookMarkdownStep skillbook path, skillbook (real) Rewrites human-readable markdown file from skillbook 1

Requires vs Injected: Requires lists context fields read by the step — validated by the pipeline engine at construction time. The skillbook field on the context is a SkillbookView (read-only). Steps that write to the skillbook (TagStep, ApplyStep, DeduplicateStep, CheckpointStep) receive the real Skillbook via constructor injection — marked as "(real)" in the table. These injected dependencies are not tracked by requires/provides.

trace as the universal learning input: The learning tail's entry point (ReflectStep) requires only trace and skillbook from the context — subsequent steps in the tail chain off ReflectStep's output (reflection). In the standard ACE pipeline, EvaluateStep bundles the structured fields (sample, agent_output, and optionally environment feedback) into a trace dict. In TraceAnalyser, _build_context places the raw trace directly. In integrations, the execute step provides trace from its framework's native output. This means the learning tail is agnostic to trace format — ReflectStep passes ctx.trace to the Reflector, which is responsible for making sense of whatever it receives.

Steps with provides = — are pure side-effect steps (provides = frozenset()). They mutate shared state (skillbook) or write to external systems (disk, Opik) but add no new fields to the context. OpikStep is not included in learning_tail() — users append it explicitly to keep observability decoupled from core learning.

AgentStep

class AgentStep:
    requires = frozenset({"sample", "skillbook"})
    provides = frozenset({"agent_output"})

    def __init__(self, agent: AgentLike) -> None:
        self.agent = agent

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        agent_output = self.agent.generate(
            question=ctx.sample.question,
            context=ctx.sample.context,
            skillbook=ctx.skillbook,       # SkillbookView (read-only)
            sample=ctx.sample,
        )
        return ctx.replace(agent_output=agent_output)

Reads the skillbook via ctx.skillbook (a SkillbookView). No constructor injection needed — read-only access is sufficient.

EvaluateStep

class EvaluateStep:
    requires = frozenset({"sample", "agent_output"})
    provides = frozenset({"trace"})

    def __init__(self, environment: TaskEnvironment | None = None) -> None:
        self.environment = environment

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        trace = {
            "question": ctx.sample.question,
            "context": ctx.sample.context,
            "ground_truth": ctx.sample.ground_truth,
            "reasoning": ctx.agent_output.reasoning,
            "answer": ctx.agent_output.final_answer,
            "skill_ids": ctx.agent_output.skill_ids,
        }
        if self.environment:
            result = self.environment.evaluate(
                sample=ctx.sample, agent_output=ctx.agent_output,
            )
            trace["feedback"] = result.feedback
        return ctx.replace(trace=trace)

Bridges the execute head (typed ACE objects) to the learning tail (raw traces). Always bundles the structured fields into a trace dict. Optionally evaluates the agent output against a TaskEnvironment — when provided via constructor, the environment's feedback is included in the trace. When no environment is provided, the trace still contains the agent's output, question, context, and ground truth — the Reflector can learn from these directly. The TaskEnvironment is injected at construction time (not on the context) to keep the context free of per-runner dependencies. This also means different ACE instances can use different environments without changing the pipeline shape.

ReflectStep

class ReflectStep:
    requires = frozenset({"trace", "skillbook"})
    provides = frozenset({"reflection"})

    async_boundary = True
    max_workers = 3

    def __init__(self, reflector: ReflectorLike) -> None:
        self.reflector = reflector

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        trace = ctx.trace

        if isinstance(trace, dict):
            # Structured trace from EvaluateStep — extract known fields
            agent_output = AgentOutput(
                reasoning=trace.get("reasoning", ""),
                final_answer=trace.get("answer", ""),
                skill_ids=trace.get("skill_ids", []),
            )
            reflection = self.reflector.reflect(
                question=trace.get("question", ""),
                agent_output=agent_output,
                skillbook=ctx.skillbook,
                ground_truth=trace.get("ground_truth"),
                feedback=trace.get("feedback"),
            )
        else:
            # Raw trace from TraceAnalyser or integration — pass as-is
            reflection = self.reflector.reflect(
                question="",
                agent_output=AgentOutput(reasoning="", final_answer=""),
                skillbook=ctx.skillbook,
                trace=trace,
            )

        return ctx.replace(reflection=reflection)

Pure — produces a reflection object, no side effects. Handles two trace formats: (1) when ctx.trace is a dict (from EvaluateStep or a ToTrace converter), it extracts known fields and calls the Reflector's existing API with typed arguments; (2) when ctx.trace is any other object (from TraceAnalyser or integrations without a converter), it passes the raw trace via **kwargs — the Reflector accepts it for protocol compatibility but does not forward it to the LLM. Integration-specific traces should always go through a ToTrace converter step that produces the standardised dict. Declares async_boundary = True — everything from here onward runs in a background thread pool. This lets the execute head return fast while learning continues.

TagStep

class TagStep:
    requires = frozenset({"reflection"})
    provides = frozenset()

    max_workers = 1

    def __init__(self, skillbook: Skillbook) -> None:
        self.skillbook = skillbook

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        for tag in ctx.reflection.skill_tags:
            try:
                self.skillbook.tag_skill(tag.id, tag.tag)
            except ValueError:
                logger.warning("TagStep: skill_id %r not found, skipping tag %r", tag.id, tag.tag)
        return ctx

Side-effect step — tags skills on self.skillbook (the real Skillbook, injected via constructor — not the SkillbookView on the context). max_workers = 1 serialises skillbook writes. Hallucinated skill IDs from the Reflector are logged at WARNING level rather than silently swallowed — this provides a diagnostic signal without aborting the pipeline.

Separated from ReflectStep so that: - ReflectStep is a pure function (LLM call → reflection object) and can be tested without a skillbook. - TagStep can be tested with a mock reflection without an LLM.

UpdateStep

class UpdateStep:
    requires = frozenset({"reflection", "skillbook"})
    provides = frozenset({"skill_manager_output"})

    max_workers = 1

    def __init__(self, skill_manager: SkillManagerLike) -> None:
        self.skill_manager = skill_manager

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        output = self.skill_manager.update(
            reflection=ctx.reflection,     # ReflectorOutput
            skillbook=ctx.skillbook,       # SkillbookView (read-only)
        )
        return ctx.replace(skill_manager_output=output)

Pure — generates update operations from the ReflectorOutput and the current skillbook state. The Reflector has already done the heavy lifting of analysing the trace; the SkillManager turns those insights into concrete skillbook operations (ADD, UPDATE, TAG, REMOVE). Does not mutate the skillbook. max_workers = 1 because the skill manager reads the current skillbook state and concurrent calls would see stale data.

ApplyStep

class ApplyStep:
    requires = frozenset({"skill_manager_output"})
    provides = frozenset()

    max_workers = 1

    def __init__(self, skillbook: Skillbook) -> None:
        self.skillbook = skillbook

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        self.skillbook.apply_update(ctx.skill_manager_output)
        return ctx

Side-effect step — applies the update batch to self.skillbook (the real Skillbook, injected via constructor). Separated from UpdateStep so that: - UpdateStep can be tested without mutating a skillbook (check that correct operations are generated). - ApplyStep can be tested with a mock update batch (check that operations are applied correctly).

DeduplicateStep

class DeduplicateStep:
    requires = frozenset({"global_sample_index"})
    provides = frozenset()

    max_workers = 1

    def __init__(self, manager: DeduplicationManagerLike, skillbook: Skillbook, *, interval: int = 10) -> None:
        self.manager = manager
        self.skillbook = skillbook
        self.interval = interval

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        if ctx.global_sample_index % self.interval != 0:
            return ctx
        report = self.manager.get_similarity_report(self.skillbook)
        if report:
            logger.info("DeduplicateStep: similarity report at sample %d:\n%s",
                        ctx.global_sample_index, report)
        return ctx

Optional side-effect step — consolidates similar skills in self.skillbook (injected). Appended to the pipeline by factory methods when a DeduplicationManagerLike is provided. The step takes a protocol, not the concrete DeduplicationManager — this keeps ace_next decoupled from the deduplication implementation in ace/. Stateless — uses ctx.global_sample_index (computed by the runner) with a configurable interval (default 10) to skip most invocations. Deduplication involves O(n²) similarity comparisons across all skills, so running it on every sample would be expensive as the skillbook grows.

CheckpointStep

Optional tail step that periodically saves the skillbook to disk. Stateless — derives the checkpoint decision from context fields.

class CheckpointStep:
    requires = frozenset({"global_sample_index"})
    provides = frozenset()

    def __init__(self, directory: str | Path, skillbook: Skillbook, *, interval: int = 10) -> None:
        self.directory = Path(directory)
        self.skillbook = skillbook
        self.interval = interval

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        if ctx.global_sample_index % self.interval != 0:
            return ctx

        self.directory.mkdir(parents=True, exist_ok=True)
        self.skillbook.save_to_file(str(self.directory / f"checkpoint_{ctx.global_sample_index}.json"))
        self.skillbook.save_to_file(str(self.directory / "latest.json"))
        return ctx

Key points: - Stateless. Uses ctx.global_sample_index (computed by the runner) for interval logic. No internal counter, no reset needed. - provides is empty — it only writes to disk, does not modify the context. - Skillbook via constructor — saves self.skillbook, not a context field. - Placement: Appended after ApplyStep by the factory when checkpoint_dir is provided. When async_boundary is set, checkpoints happen in the background tail. - max_workers not set — inherits default of 1 from the pipeline engine, which is correct (disk writes should be serialised).

OpikStep

class OpikStep:
    requires = frozenset({"skillbook"})
    provides = frozenset()

    def __init__(
        self,
        project_name: str = "ace-framework",
        tags: list[str] | None = None,
    ) -> None: ...

Explicit, opt-in observability step — creates an Opik trace per sample with pipeline metadata, agent output, reflection insights, and skill manager operations. Does NOT register the LiteLLM callback — call register_opik_litellm_callback() separately if you also want per-LLM-call token/cost tracking. Two independent tracing modes: (1) pipeline step (this class) — client-agnostic, reads ACEStepContext fields; (2) LiteLLM callback (register_opik_litellm_callback) — LiteLLM-specific, registers OpikLogger on litellm.callbacks.

Only requires skillbook (always present). Reads other context fields (reflection, skill_manager_output, trace, agent_output) with guards — they may or may not be populated depending on pipeline shape. When used directly, gracefully degrades to a no-op when Opik is not installed or OPIK_DISABLED=true. When used via ACELiteLLM(opik=True), fails loudly — raises ImportError if the package is missing, RuntimeError if client init fails.

Passes OPIK_API_KEY, OPIK_WORKSPACE, and OPIK_URL_OVERRIDE explicitly from environment variables — does not depend on the global ~/.opik.config file.

Call flush() after the pipeline finishes to drain buffered traces before the process exits (the Opik client batches sends asynchronously).

Not wired into learning_tail(). Users append it via extra_steps on from_roles(), or manually after calling learning_tail():

from ace_next.steps import OpikStep, learning_tail

# Append to a custom pipeline
steps = [
    MyExecuteStep(agent),
    MyToTrace(),
    *learning_tail(reflector, skill_manager, skillbook),
    OpikStep(project_name="my-project"),
]

# Via from_roles() extra_steps parameter
ace = ACE.from_roles(agent=a, reflector=r, skill_manager=sm,
                     extra_steps=[OpikStep(project_name="my-project")])

# Via ACELiteLLM (explicit opt-in — enables both pipeline + LiteLLM tracing)
ace = ACELiteLLM.from_model("gpt-4o-mini", opik=True, opik_project="my-project")

# LLM-level token tracking only (no pipeline traces)
from ace_next import register_opik_litellm_callback
register_opik_litellm_callback()

RROpikStep

class RROpikStep:
    requires = frozenset({"reflection"})
    provides = frozenset()

    def __init__(
        self,
        project_name: str = "ace-rr",
        tags: list[str] | None = None,
    ) -> None: ...

Dedicated observability step for the Recursive Reflector. Reads ctx.reflection.raw["rr_trace"] — a dict populated by RRStep containing per-iteration REPL data and sub-agent call history — and creates a hierarchical Opik trace with child spans per iteration.

Follows the same patterns as OpikStep: explicit opt-in only, soft-imports opik, respects OPIK_DISABLED env var, gracefully degrades to a no-op. Located in ace_next/rr/opik.py (co-located with the RR module, not in ace_next/steps/).

Trace hierarchy:

rr_reflect (trace)
├── rr_iteration_0 (span)    ← code, stdout, stderr
├── rr_iteration_1 (span)
└── rr_iteration_2 (span)    ← FINAL called here

Sub-agent call history is attached to the parent trace metadata. Each iteration span logs the code sent to the sandbox, stdout/stderr output, and whether FINAL() was called.

Data flow: RRStep collects iteration data during run_loop() and enriches ReflectorOutput.raw["rr_trace"] after the loop completes. RROpikStep reads this data — observability is fully decoupled from business logic, matching the OpikStep philosophy.

from ace_next.rr import RRStep, RRConfig, RROpikStep

# Place RROpikStep after RRStep in your pipeline
steps = [
    ...,
    rr_step,
    RROpikStep(project_name="my-project"),
]

LoadTracesStep

class LoadTracesStep:
    requires = frozenset({"sample"})
    provides = frozenset({"trace"})

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        path = Path(ctx.sample)
        events: list[dict] = []
        for line in path.read_text().splitlines():
            line = line.strip()
            if not line:
                continue
            try:
                events.append(json.loads(line))
            except json.JSONDecodeError:
                continue
        return ctx.replace(trace=events)

Generic JSONL file loader — reads a file path from ctx.sample, parses each line as JSON, and populates ctx.trace with the parsed event dicts. Designed for integration runners that learn from pre-recorded traces on disk (e.g., OpenClaw session transcripts). Silently skips unparseable lines (blank lines, corrupted JSON). Returns an empty list for missing or empty files. Pure function — no constructor dependencies, no side effects.

Placement: Used as the first step in trace-file-based pipelines, before a framework-specific ToTrace converter:

steps = [
    LoadTracesStep(),
    OpenClawToTraceStep(),  # or any ToTrace converter
    *learning_tail(reflector, skill_manager, skillbook),
]

OpenClawToTraceStep

class OpenClawToTraceStep:
    requires = frozenset({"trace"})
    provides = frozenset({"trace"})

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        events = ctx.trace  # list[dict] from LoadTracesStep
        trace_dict = _events_to_trace(events)
        return ctx.replace(trace=trace_dict)

OpenClaw-specific trace converter — transforms raw JSONL events (loaded by LoadTracesStep) into the structured trace format expected by ReflectStep. Extracts user messages into question, assistant thinking/tool calls/responses into reasoning, last assistant text into answer, and a session summary into feedback. Follows the same pattern as BrowserToTrace, LangChainToTrace, and ClaudeCodeToTrace. Lives in ace_next/integrations/openclaw/to_trace.py.

Placement: Used after LoadTracesStep in OpenClaw pipelines. The examples/openclaw/kayba-ace/learn_from_traces.py script uses both steps standalone (not in a full Pipeline) and feeds results to TraceAnalyser.from_roles().

PersistStep

class PersistStep:
    requires = frozenset({"skillbook"})
    provides = frozenset()

    def __init__(self, target_path: str | Path, skillbook: Skillbook) -> None:
        self.target_path = Path(target_path)
        self.skillbook = skillbook

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        self.skillbook.save_to_file(str(self.target_path))
        return ctx

Integration-specific side-effect step — writes the current skillbook to an external file (e.g., CLAUDE.md for Claude Code). Used by ClaudeCode runner to persist learned strategies into the project's instruction file after each learning cycle. Unlike CheckpointStep (which saves the full skillbook JSON at intervals), PersistStep runs on every sample and writes in the target format expected by the integration. Receives the real Skillbook via constructor injection.

ExportSkillbookMarkdownStep

class ExportSkillbookMarkdownStep:
    requires = frozenset({"skillbook"})
    provides = frozenset()

    def __init__(self, path: str | Path, skillbook: Skillbook) -> None:
        self.path = Path(path)
        self.skillbook = skillbook

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        # Rewrites the markdown file from the current skillbook state
        ...
        return ctx

Exports the skillbook as a human-readable markdown file, grouped by section. The file is rewritten from scratch on every invocation so it always reflects the current skillbook state. Intended for use as an extra_step appended after the learning tail. Receives the real Skillbook via constructor injection.


Implementations

Concrete LLM-based implementations of the role protocols. Live in ace_next/implementations/ — fully self-contained with no imports from ace/.

Overview

Class Protocol Method Location
Agent AgentLike generate() implementations/agent.py
Reflector ReflectorLike reflect() implementations/reflector.py
RRStep ReflectorLike + StepProtocol[ACEStepContext] reflect() / __call__() rr/runner.py
SkillManager SkillManagerLike update_skills() implementations/skill_manager.py

RRStep has a different constructor pattern — see RR_DESIGN.md for the full Recursive Reflector architecture, configuration (RRConfig), sandbox API, sub-agent system, trace context, and observability (RROpikStep).

All three share the same constructor pattern:

def __init__(self, llm: LLMClientLike, prompt_template: str = DEFAULT_PROMPT, *, max_retries: int = 3) -> None:

The llm parameter must satisfy LLMClientLike — it must have both complete() and complete_structured(). No auto-wrapping with Instructor; callers pass pre-wrapped clients.

Agent

Produces answers using the current skillbook of strategies. Formats the prompt with the skillbook, reflection, question, and context, then calls llm.complete_structured(prompt, AgentOutput). After the LLM call, extracts cited skill IDs from the reasoning using extract_cited_skill_ids() (regex matching [section-00001] patterns).

agent = Agent(llm)
output = agent.generate(
    question="What is the capital of France?",
    context="Answer concisely",
    skillbook=skillbook,
)
# output.final_answer == "Paris"
# output.skill_ids == ["geography-00001"]  (extracted from reasoning)

Reflector

Analyzes agent outputs to extract lessons and improve strategies. Builds a skillbook excerpt from the agent's cited skill IDs (via make_skillbook_excerpt()), formats the prompt, and calls llm.complete_structured(prompt, ReflectorOutput).

SIMPLE mode only — single-pass reflection. For recursive multi-iteration reflection, use RRStep — it satisfies the same ReflectorLike protocol and can be used as a drop-in replacement in any runner. See RR_DESIGN.md for the full architecture.

reflector = Reflector(llm)
reflection = reflector.reflect(
    question="What is 2+2?",
    agent_output=agent_output,
    skillbook=skillbook,
    ground_truth="4",
    feedback="Correct!",
)
# reflection.key_insight, reflection.skill_tags, reflection.extracted_learnings

SkillManager

Transforms reflections into actionable skillbook updates. Serializes the ReflectorOutput into a JSON dict, formats the prompt with progress and skillbook stats, and calls llm.complete_structured(prompt, SkillManagerOutput).

No dedup integration — in ace_next, deduplication is handled by a separate DeduplicateStep in the pipeline. The SkillManager only produces SkillManagerOutput; it does not call a dedup manager itself.

sm = SkillManager(llm)
output = sm.update_skills(
    reflection=reflection_output,
    skillbook=skillbook,
    question_context="Math problem solving",
    progress="5/10 correct",
)
skillbook.apply_update(output.update)

Shared Helpers (implementations/helpers.py)

Function Purpose
extract_cited_skill_ids(text) Regex [section-00001] → deduplicated list of IDs
format_optional(value) Returns "(none)" for falsy values
make_skillbook_excerpt(skillbook, skill_ids) Builds [id] content lines for cited skills

Prompt Templates (implementations/prompts.py)

Self-contained copy of the v2.1 prompts from ace/prompts_v2_1.py. The {current_date} placeholder is filled at import time via datetime.now().strftime(...).

Constant Role
AGENT_PROMPT Agent prompt with strategic problem-solving protocol
REFLECTOR_PROMPT Reflector prompt with diagnostic analysis protocol
SKILL_MANAGER_PROMPT SkillManager prompt with atomic strategy creation
SKILLBOOK_USAGE_INSTRUCTIONS Shared text for skillbook usage guidance

Also exports wrap_skillbook_for_external_agent(skillbook) — the canonical function for injecting skillbook context into external agentic systems.


Deduplication

Skill deduplication subsystem. Lives in ace_next/deduplication/ — fully self-contained with no imports from ace/.

Overview

Class Role Location
SimilarityDetector Computes embeddings, detects similar pairs deduplication/detector.py
DeduplicationManager Coordinates detection and consolidation deduplication/manager.py
MergeOp, DeleteOp, KeepOp, UpdateOp Consolidation operation types deduplication/operations.py

SimilarityDetector

Computes embeddings and detects similar skill pairs using cosine similarity. Supports two embedding providers:

  • LiteLLM — uses litellm.embedding() for remote embedding models
  • sentence-transformers — uses a local SentenceTransformer model (lazy-loaded)

Feature detection uses inline importlib.import_module checks instead of ace.features. Cosine similarity has a numpy implementation with a pure-Python fallback.

Key methods: - ensure_embeddings(skillbook) — compute embeddings for all skills that lack one - detect_similar_pairs(skillbook, threshold) — find all pairs above the similarity threshold - Supports within_section_only mode (compare skills only within the same section) - Respects existing KEEP decisions via skillbook.has_keep_decision()

DeduplicationManager

Satisfies DeduplicationManagerLike protocol. Coordinates the full dedup workflow:

  1. get_similarity_report(skillbook) — ensures embeddings, detects similar pairs, generates a formatted report for the SkillManager prompt. Returns None if dedup is disabled or too few pairs found.
  2. parse_consolidation_operations(response_data) — parses consolidation_operations from SkillManager response JSON into typed operation objects.
  3. apply_operations(operations, skillbook) — applies consolidation operations to the skillbook.

Consolidation Operations

Four operation types, all dataclasses:

Operation Effect
MergeOp Combine skills — accumulate counters into keep_id, soft-delete others, update content
DeleteOp Soft-delete a redundant skill
KeepOp Store a SimilarityDecision so the pair is not flagged again
UpdateOp Refine a skill's content to differentiate it, clear its embedding

apply_consolidation_operations(operations, skillbook) dispatches each operation to the appropriate apply function.

Pipeline Integration

Deduplication runs as a separate DeduplicateStep in the pipeline, not inside the SkillManager role. The step is appended by factory methods when a DeduplicationManagerLike is provided:

ace = ACE.from_roles(
    agent=agent, reflector=reflector, skill_manager=skill_manager,
    dedup_manager=DeduplicationManager(DeduplicationConfig(similarity_threshold=0.85)),
    dedup_interval=10,
)
# Pipeline: Agent → Evaluate → Reflect → Tag → Update → Apply → Deduplicate

Integration Pattern

External frameworks (browser-use, LangChain, Claude Code) integrate via composable pipeline steps in ace_next/integrations/. Each integration provides three things:

  1. Result type — an integration-specific dataclass (e.g. BrowserResult, ClaudeCodeResult)
  2. Execute step — INJECT skillbook context + EXECUTE the framework, writes the result to ctx.trace
  3. ToTrace step — converts the integration-specific result into the standardised trace dict that ReflectStep expects

Runners in ace_next/runners/ compose these steps with learning_tail().

Core idea: execute → convert → learn

Each integration defines its own input/output format. A converter step acts as a compatibility layer between the integration-specific result and the learning tail's standardised trace dict.

MCP Server — a different pattern. The MCP integration does not add pipeline steps. It is a thin async layer over ACELiteLLM that exposes ACE as an MCP tool provider. MCP handlers validate requests, manage sessions, enforce safety guards, then delegate to ACELiteLLM methods (ask, learn, learn_from_feedback, save, load) via asyncio.to_thread(). See MCP Server docs for details.

Standard ACE:      [Agent → Evaluate]                          → [Reflect → Tag → Update → Apply]
                    ╰── execute (built-in) ──╯                    ╰──────── learn (shared) ──────╯
                         provides: trace (dict) ─────────────────► requires: trace

Browser-use:       [BrowserExecute] → [BrowserToTrace]         → [Reflect → Tag → Update → Apply]
                    ╰── execute ────╯   ╰── convert ──╯           ╰──────── learn (shared) ──────╯
                    provides: trace      rewrites trace             requires: trace
                    (BrowserResult)      (BrowserResult → dict)

TraceAnalyser:     [_build_context]                            → [Reflect → Tag → Update → Apply]
                    ╰── sets ctx.trace (raw object) ───────╯      ╰──────── learn (shared) ──────╯

The execute step writes an integration-specific result type to ctx.trace. The ToTrace step reads that result and rewrites ctx.trace with a standardised dict. The learning tail only ever sees the dict.

Two-step contract

Every integration provides two steps that chain together:

Step 1 — Execute step (INJECT + EXECUTE):

class SomeExecuteStep:
    requires = frozenset({"sample", "skillbook"})
    provides = frozenset({"trace"})

    def __init__(self, framework_client) -> None:
        self.framework_client = framework_client

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        task = ctx.sample                               # raw input (string, dict, etc.)
        enhanced = self._inject(task, ctx.skillbook)    # prepend skillbook context
        result = self.framework_client.run(enhanced)    # framework-specific execution
        return ctx.replace(trace=SomeResult(...))       # integration-specific result type

Step 2 — ToTrace step (convert to standardised dict):

class SomeToTrace:
    requires = frozenset({"trace"})
    provides = frozenset({"trace"})         # overwrites trace with standardised dict

    def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        r: SomeResult = ctx.trace
        trace = {
            "question": r.task,
            "reasoning": r.execution_trace,     # integration-specific formatting
            "answer": r.output,
            "skill_ids": r.cited_skill_ids,
            "feedback": f"Task {'succeeded' if r.success else 'failed'}",
            "ground_truth": None,
        }
        return ctx.replace(trace=trace)

The standardised trace dict keys match what ReflectStep expects: question, reasoning, answer, skill_ids, feedback, ground_truth.

Why two steps instead of one

Splitting execute from trace conversion gives three benefits:

  1. Independent testability — test the execute step with a mock framework without worrying about trace format; test the ToTrace step with a fixture result without running a real framework.
  2. Reusability — the execute step can be used standalone (without learning) to get framework-specific results. The ToTrace step can be swapped for a custom converter.
  3. Separation of concerns — framework interaction logic stays in the execute step; trace formatting stays in the converter. Neither knows about the other's internals.

Result types

Each integration defines its own result dataclass:

Integration Result type Key fields
Browser-use BrowserResult task, success, output, error, steps_count, duration_seconds, cited_skill_ids, chronological_steps, raw_history
Claude Code ClaudeCodeResult task, success, output, execution_trace, returncode, error
LangChain LangChainResult task, output, result_type (simple/agent/langgraph/error), success, error, intermediate_steps, messages, raw_result

Example — browser-use execute step

class BrowserExecuteStep:
    requires = frozenset({"sample", "skillbook"})
    provides = frozenset({"trace"})

    def __init__(self, browser_llm, browser=None, **agent_kwargs) -> None:
        self.browser_llm = browser_llm
        self.browser = browser
        self.agent_kwargs = agent_kwargs

    async def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
        task: str = ctx.sample      # raw task string, not a Sample object

        # INJECT — prepend skillbook context
        enhanced_task = self._inject(task, ctx.skillbook)

        # EXECUTE — run browser-use agent
        agent = Agent(task=enhanced_task, llm=self.browser_llm, **self.agent_kwargs)
        history = await agent.run()

        # Build integration-specific result
        result = BrowserResult(
            task=task, success=True, output=history.final_result(),
            steps_count=history.number_of_steps(),
            chronological_steps=..., raw_history=history,
        )
        return ctx.replace(trace=result)

Composing into a runner

Runners compose execute step + ToTrace step + learning tail:

class BrowserUse(ACERunner):
    """Browser-use agent with ACE learning pipeline."""

    @classmethod
    def from_roles(cls, *, browser_llm, reflector, skill_manager,
                   skillbook=None, **kwargs):
        skillbook = skillbook or Skillbook()
        steps = [
            BrowserExecuteStep(browser_llm),
            BrowserToTrace(),
            *learning_tail(reflector, skill_manager, skillbook, **kwargs),
        ]
        return cls(pipeline=Pipeline(steps), skillbook=skillbook)

    def run(self, tasks, epochs=1, *, wait=True):
        return self._run(tasks, epochs=epochs, wait=wait)

    def _build_context(self, task, *, epoch, total_epochs, index, total,
                       global_sample_index, **_):
        return ACEStepContext(
            sample=task,    # raw string — not wrapped in Sample
            skillbook=SkillbookView(self.skillbook),
            epoch=epoch, total_epochs=total_epochs,
            step_index=index, total_steps=total,
            global_sample_index=global_sample_index,
        )

The pattern is the same for every integration: subclass ACERunner, compose [ExecuteStep, ToTrace, *learning_tail()] in the factory, accept raw inputs (strings, dicts) in run(), and map them to ACEStepContext in _build_context(). No Sample wrapping — the raw input goes directly on ctx.sample.

learning_tail() — reusable learning steps

Every integration assembles the same [Reflect → Tag → Update → Apply] suffix with optional dedup and checkpoint steps. Rather than duplicating this wiring in every factory method, learning_tail() returns the standard step list:

# ace_next/steps/__init__.py

def learning_tail(
    reflector: ReflectorLike,
    skill_manager: SkillManagerLike,
    skillbook: Skillbook,
    *,
    dedup_manager: DeduplicationManagerLike | None = None,
    dedup_interval: int = 10,
    checkpoint_dir: str | Path | None = None,
    checkpoint_interval: int = 10,
) -> list[StepProtocol[ACEStepContext]]:
    """Return the standard ACE learning steps.

    Use this when building custom integrations that provide their own
    execute step(s) but want the standard learning pipeline.
    """
    steps: list[StepProtocol[ACEStepContext]] = [
        ReflectStep(reflector),
        TagStep(skillbook),
        UpdateStep(skill_manager),
        ApplyStep(skillbook),
    ]
    if dedup_manager:
        steps.append(DeduplicateStep(dedup_manager, skillbook, interval=dedup_interval))
    if checkpoint_dir:
        steps.append(CheckpointStep(checkpoint_dir, skillbook, interval=checkpoint_interval))
    return steps

Integration factories become shorter and less error-prone:

class BrowserUse(ACERunner):
    @classmethod
    def from_roles(cls, *, browser_llm, reflector, skill_manager, skillbook=None, **kwargs):
        skillbook = skillbook or Skillbook()
        steps = [
            BrowserExecuteStep(browser_llm),
            BrowserToTrace(),
            *learning_tail(reflector, skill_manager, skillbook, **kwargs),
        ]
        return cls(pipeline=Pipeline(steps), skillbook=skillbook)

Power users building fully custom pipelines can also use it:

from ace_next.steps import learning_tail

skillbook = Skillbook.load_from_file("expert.json")
steps = [
    MyCustomExecuteStep(my_agent),
    MyValidationStep(),  # custom step before learning
    *learning_tail(reflector, skill_manager, skillbook, dedup_manager=dedup),
]
runner = ACERunner(Pipeline(steps), skillbook)

TraceAnalyser — batch learning from recorded executions

Integrations also support offline learning. When an integration records execution history (browser-use AgentHistory, LangChain intermediate_steps, Claude Code transcripts), it feeds the raw objects directly to TraceAnalyser:

# Record browser executions
histories = [await agent.run(task) for task in tasks]

# Feed raw histories directly — Reflector analyses them as-is
analyser = TraceAnalyser.from_roles(
    reflector=Reflector(llm_client),
    skill_manager=SkillManager(llm_client),
)
analyser.run(histories, epochs=2)
analyser.save("browser_expert.json")

Live vs offline

Integration Runner TraceAnalyser
When Live execution Post-hoc analysis
Agent Framework runs it Already ran
Feedback Generated live Baked into trace
Use case Production deployment Historical batch learning, debugging

Both update the same skillbook. A common workflow: TraceAnalyser builds an initial skillbook from historical data, then an integration runner refines it during live deployment.


High-Level Convenience API

Integration runners provide two construction paths directly on the class — no separate wrapper classes needed:

  1. from_roles() — accepts pre-built role instances (Reflector, SkillManager, etc.)
  2. from_model() — accepts a model string and auto-builds roles internally

This keeps the API surface minimal: one class per integration, two ways to construct it.

# Explicit construction — bring your own roles
runner = BrowserUse.from_roles(
    browser_llm=browser_llm,
    reflector=Reflector(llm),
    skill_manager=SkillManager(llm),
)

# Convenience construction — just specify the model
runner = BrowserUse.from_model(browser_llm, ace_model="gpt-4o-mini")

# Both return the same BrowserUse instance with the same API
results = runner.run(["Find top HN post", "Check weather in NYC"])
runner.save("browser_expert.json")

from_model() on integration runners

Each integration runner's from_model() builds a LiteLLMClient, wraps it in Reflector and SkillManager, and delegates to from_roles():

class BrowserUse(ACERunner):
    @classmethod
    def from_model(cls, browser_llm, *, ace_model="gpt-4o-mini",
                   ace_max_tokens=2048, ace_llm=None, **kwargs) -> BrowserUse:
        if ace_llm is None:
            from ..providers import LiteLLMClient
            ace_llm = LiteLLMClient(model=ace_model, max_tokens=ace_max_tokens)
        return cls.from_roles(
            browser_llm=browser_llm,
            reflector=Reflector(ace_llm),
            skill_manager=SkillManager(ace_llm),
            **kwargs,
        )

The same pattern applies to LangChain.from_model(runnable, ...) and ClaudeCode.from_model(working_dir=..., ...). Providers are imported lazily inside from_model() to avoid hard dependencies on litellm at import time.

Additional convenience on from_roles()

Integration runners also accept skillbook_path and dedup_config on from_roles() for common resolution patterns:

runner = BrowserUse.from_roles(
    browser_llm=browser_llm,
    reflector=reflector,
    skill_manager=skill_manager,
    skillbook_path="browser_expert.json",   # loads skillbook from file
    dedup_config=DeduplicationConfig(similarity_threshold=0.85),  # builds DeduplicationManager
)

Convenience lifecycle methods on runners

All integration runners provide:

  • get_strategies() -> str — formatted skillbook strategies for display
  • Backward-compat aliases: save_skillbook, load_skillbook, wait_for_learning

These are defined directly on the runner class, not on a separate wrapper.

ACELiteLLM — standalone convenience wrapper

ACELiteLLM is the only standalone wrapper class (not an ACERunner subclass). It exists because it wraps two different runners (ACE and TraceAnalyser) and exposes a fundamentally different API (ask, learn, learn_from_traces, learn_from_feedback).

class ACELiteLLM:
    def __init__(self, llm, *, skillbook=None, environment=None,
                 opik=False, opik_project="ace-framework", opik_tags=None, ...):
        self.agent = Agent(llm)
        self.reflector = Reflector(llm)
        self.skill_manager = SkillManager(llm)
        self._skillbook = skillbook or Skillbook()
        self.environment = environment
        self._ace: ACE | None = None          # lazy-init
        self._analyser: TraceAnalyser | None = None  # lazy-init

        # Opik observability (explicit opt-in)
        self._opik_step = None
        if opik:
            self._opik_step = OpikStep(project_name=opik_project, tags=opik_tags)
            register_opik_litellm_callback(project_name=opik_project)

    @classmethod
    def from_model(cls, model="gpt-4o-mini", *, max_tokens=2048,
                   temperature=0.0, opik=False, opik_project="ace-framework",
                   opik_tags=None, **kwargs) -> ACELiteLLM:
        """Build from a model string."""
        llm = LiteLLMClient(model=model, max_tokens=max_tokens, temperature=temperature)
        return cls(llm, opik=opik, opik_project=opik_project, opik_tags=opik_tags, **kwargs)

    def _get_extra_steps(self):
        """Return extra pipeline steps (e.g. OpikStep) or None."""
        if self._opik_step is not None:
            return [self._opik_step]
        return None

    def _get_ace(self, environment=None):
        """Return (or build) cached ACE runner. Passes extra_steps."""
        ...
        self._ace = ACE.from_roles(..., extra_steps=self._get_extra_steps())
        ...

    def _get_analyser(self):
        """Return (or build) cached TraceAnalyser. Passes extra_steps."""
        ...
        self._analyser = TraceAnalyser.from_roles(..., extra_steps=self._get_extra_steps())
        ...

    def ask(self, question, context="") -> str:
        """Direct Agent call — no pipeline. Stores interaction for learn_from_feedback()."""
        ...

    def learn(self, samples, environment=None, epochs=1, *, wait=True):
        """Delegate to lazy-init ACE runner."""
        return self._get_ace(environment).run(samples, epochs=epochs, wait=wait)

    def learn_from_traces(self, traces, epochs=1, *, wait=True):
        """Delegate to lazy-init TraceAnalyser."""
        return self._get_analyser().run(traces, epochs=epochs, wait=wait)

    def learn_from_feedback(self, feedback, ground_truth=None) -> bool:
        """Manual single-shot learning from last ask() call."""
        ...

    def load(self, path):
        """Load skillbook — invalidates cached runners (stale refs)."""
        self._skillbook = Skillbook.load_from_file(path)
        self._ace = None
        self._analyser = None

When opik=True, ACELiteLLM creates an OpikStep (pipeline-level per-sample tracing) and calls register_opik_litellm_callback() (LiteLLM per-call token/cost tracking). Both tracing modes are activated together because the runner knows it's LiteLLM-backed. The OpikStep is passed to the runners via extra_steps on from_roles().

Runners are cached and invalidated on load() (new skillbook object means stale references). Since runners are reentrant (no per-call instance state), caching is safe.

Why not separate wrapper classes

The original design had separate wrapper classes (ACEAgent, ACELangChain, ACEClaudeCode) that delegated to the corresponding runners. This was rejected because:

  • Two classes for one concept — users must understand both BrowserUse (the runner) and ACEAgent (the wrapper), and choose which to use.
  • Thin delegation — the wrappers only added from_model() and a few lifecycle helpers, all of which fit naturally on the runner itself.
  • No added value — the runner already manages the pipeline, skillbook, and epoch loop. Adding a wrapper just adds indirection.

The exception is ACELiteLLM, which is genuinely different: it wraps two runners, has ask() (direct Agent call, no pipeline), and has learn_from_feedback() (manual single-shot learning). These don't map to any single runner's API.


Directory Structure

ace_next/
  __init__.py               ← Public API re-exports
  context.py                ← ACEStepContext, SkillbookView, ACESample
  skill.py                  ← Skill, SimilarityDecision
  skillbook.py              ← Skillbook
  updates.py                ← UpdateOperation, UpdateBatch
  outputs.py                ← AgentOutput, ReflectorOutput, SkillManagerOutput, etc.
  environments.py           ← Sample, TaskEnvironment, SimpleEnvironment, EnvironmentResult
  protocols/                ← Role protocols (one file per protocol)
    __init__.py
    agent.py                ← AgentLike
    reflector.py            ← ReflectorLike
    skill_manager.py        ← SkillManagerLike
    deduplication.py        ← DeduplicationConfig, DeduplicationManagerLike
    llm.py                  ← LLMClientLike
  implementations/          ← Concrete LLM-based role implementations
    __init__.py             ← Exports Agent, Reflector, SkillManager
    agent.py                ← Agent (implements AgentLike)
    reflector.py            ← Reflector (implements ReflectorLike)
    skill_manager.py        ← SkillManager (implements SkillManagerLike)
    helpers.py              ← Shared utilities (extract_cited_skill_ids, etc.)
    prompts.py              ← Default v2.1 prompt templates
  deduplication/            ← Skill deduplication subsystem
    __init__.py             ← Exports DeduplicationManager, SimilarityDetector, etc.
    detector.py             ← SimilarityDetector (embeddings + cosine similarity)
    manager.py              ← DeduplicationManager (implements DeduplicationManagerLike)
    operations.py           ← ConsolidationOperation types + apply logic
    prompts.py              ← Similarity report generation
  steps/                    ← Pipeline steps (one file per class)
    __init__.py             ← learning_tail() helper
    agent.py                ← AgentStep
    evaluate.py             ← EvaluateStep
    reflect.py              ← ReflectStep
    tag.py                  ← TagStep
    update.py               ← UpdateStep
    apply.py                ← ApplyStep
    deduplicate.py          ← DeduplicateStep
    checkpoint.py           ← CheckpointStep
    observability.py        ← ObservabilityStep (logger.info)
    opik.py                 ← OpikStep
    load_traces.py          ← LoadTracesStep (generic JSONL loader)
    export_markdown.py      ← ExportSkillbookMarkdownStep
    persist.py              ← PersistStep
  runners/                    ← Runner classes (compose Pipeline, manage epoch loop)
    __init__.py               ← Re-exports ACERunner, TraceAnalyser, ACE, BrowserUse, LangChain, ClaudeCode, ACELiteLLM
    base.py                   ← ACERunner base class
    trace_analyser.py         ← TraceAnalyser (learning tail only)
    ace.py                    ← ACE (full adaptive pipeline)
    browser_use.py            ← BrowserUse runner (from_roles + from_model)
    langchain.py              ← LangChain runner (from_roles + from_model)
    claude_code.py            ← ClaudeCode runner (from_roles + from_model)
    litellm.py                ← ACELiteLLM convenience wrapper (ask + learn + learn_from_traces)
  integrations/               ← Integration steps (execute + result type + trace converter)
    __init__.py               ← Exports steps, result types, ToTrace converters, wrap_skillbook_context
    browser_use.py            ← BrowserExecuteStep, BrowserResult, BrowserToTrace
    langchain.py              ← LangChainExecuteStep, LangChainResult, LangChainToTrace
    claude_code.py            ← ClaudeCodeExecuteStep, ClaudeCodeResult, ClaudeCodeToTrace
    openclaw/
      __init__.py             ← Exports OpenClawToTraceStep
      to_trace.py             ← OpenClawToTraceStep (JSONL events → structured trace dict)
    mcp/                        ← Optional MCP server (install with ace-framework[mcp])
      __init__.py               ← Package marker
      server.py                 ← Server creation and CLI entrypoint (ace-mcp)
      config.py                 ← MCPServerConfig (pydantic-settings, env vars)
      registry.py               ← SessionRegistry (session lifecycle, TTL sweep)
      handlers.py               ← MCPHandlers (validation, safety, delegation to ACELiteLLM)
      adapters.py               ← MCP SDK glue (tool registration, schema inlining, error mapping)
      models.py                 ← Pydantic request/response models for all six tools
      errors.py                 ← ACEMCPError hierarchy and MCP error mapping
  providers/                  ← LLM client wrappers (not pipeline steps)
    __init__.py               ← Exports LiteLLMClient, InstructorClient, etc.
    litellm.py                ← LiteLLMClient, LiteLLMConfig, LLMResponse
    instructor.py             ← InstructorClient, wrap_with_instructor
    langchain.py              ← LangChainLiteLLMClient (optional: langchain-litellm)
    claude_code.py            ← ClaudeCodeLLMClient, ClaudeCodeLLMConfig (optional: claude CLI)

Each integration provides: (1) an execute step, (2) a result type, and (3) a ToTrace converter step. Runners in ace_next/runners/ compose these with learning_tail(). For offline analysis, raw trace objects are passed directly to TraceAnalyser.

What moves where

Old location New location Notes
ace/adaptation.py Deleted Replaced by ace_next/runners/
ace/async_learning.py Deleted Replaced by pipeline engine async_boundary
ace/environments.py ace_next/environments.py Sample, EnvironmentResult, TaskEnvironment, SimpleEnvironment (copied)
ace/roles.py (protocols) ace_next/protocols/ Protocols extracted from role classes
ace/roles.py (implementations) ace_next/implementations/ Concrete Agent, Reflector, SkillManager classes
ace/llm.py (interface) ace_next/protocols/llm.py LLMClientLike protocol
ace/prompts_v2_1.py ace_next/implementations/prompts.py v2.1 prompt templates (self-contained copy)
ace/deduplication/ ace_next/deduplication/ Full dedup subsystem (detector, manager, operations, prompts)
ace2/ Deleted Superseded by this design
New ace_next/steps/tag.py TagStep (split from ReflectStep)
New ace_next/steps/apply.py ApplyStep (split from UpdateStep)
New ace_next/steps/deduplicate.py DeduplicateStep (extracted from SkillManager)
New ace_next/steps/checkpoint.py CheckpointStep
New ace_next/steps/observability.py ObservabilityStep (logger.info)
New ace_next/steps/opik.py OpikStep (Opik trace logging)
New ace_next/rr/opik.py RROpikStep (hierarchical RR trace logging to Opik)
New ace_next/steps/persist.py PersistStep
New ace_next/runners/ ACERunner, TraceAnalyser, ACE, BrowserUse, LangChain, ClaudeCode
ace/integrations/browser_use.py ace_next/integrations/browser_use.py + ace_next/runners/browser_use.py Split into execute step + result type + ToTrace converter + runner
ace/integrations/langchain.py ace_next/integrations/langchain.py + ace_next/runners/langchain.py Split into execute step + result type + ToTrace converter + runner
ace/integrations/claude_code.py ace_next/integrations/claude_code.py + ace_next/runners/claude_code.py Split into execute step + result type + ToTrace converter + runner
ace/llm_providers/litellm_client.py ace_next/providers/litellm.py Self-contained: LiteLLMClient, LiteLLMConfig, LLMResponse (no ABC)
ace/llm_providers/instructor_client.py ace_next/providers/instructor.py Self-contained: InstructorClient, wrap_with_instructor
ace/llm_providers/langchain_client.py ace_next/providers/langchain.py Self-contained: LangChainLiteLLMClient (no ABC)
ace/llm_providers/claude_code_client.py ace_next/providers/claude_code.py Self-contained: ClaudeCodeLLMClient, ClaudeCodeLLMConfig (no ABC)

Async Behaviour

Both TraceAnalyser and ACE inherit async capabilities from the pipeline engine. No custom async machinery is needed.

ReflectStep as async boundary

ReflectStep.async_boundary = True means: when the pipeline processes a sample, everything before ReflectStep (Agent, Evaluate) runs in the foreground, and everything from ReflectStep onwards (Tag, Update, Apply, Deduplicate, Checkpoint) runs in a background thread pool.

sample 1:  [AgentStep] [EvaluateStep] ──fire──► [ReflectStep] [TagStep] [UpdateStep] [ApplyStep]  (background)
sample 2:  [AgentStep] [EvaluateStep] ──fire──► [ReflectStep] [TagStep] [UpdateStep] [ApplyStep]  (background)
                                 async_boundary

For TraceAnalyser, there is no AgentStep or EvaluateStep in the foreground. The boundary still applies — context building is foreground, the learning tail is background:

trace 1:  [build_context] ──fire──► [ReflectStep] [TagStep] [UpdateStep] [ApplyStep]  (background)
trace 2:  [build_context] ──fire──► [ReflectStep] [TagStep] [UpdateStep] [ApplyStep]  (background)

Controlling concurrency

Knob Where Effect
ReflectStep.max_workers = 3 Step class attribute Up to 3 reflections run in parallel
TagStep.max_workers = 1 Step class attribute Serialises skill tagging
UpdateStep.max_workers = 1 Step class attribute Serialises skill manager LLM calls
ApplyStep.max_workers = 1 Step class attribute Serialises skillbook writes
wait_for_background(timeout) Runner method Blocks until background threads drain

No custom AsyncLearningPipeline class, no manual thread management, no asyncio.create_task for background learning. The pipeline engine handles all of it.


Error Handling

Follows the pipeline engine's error model without additions.

Per-sample isolation: A failing sample does not abort the run. The pipeline catches the exception, records it in SampleResult.error and SampleResult.failed_at, and continues to the next sample.

Background failures: Captured and attached to SampleResult by the pipeline engine. The runner calls wait_for_background() at the end to ensure all results are complete.

No retry logic in the runner. Retries are the responsibility of individual steps (e.g., LLM call retries via tenacity in the role classes).


Usage Examples

TraceAnalyser — learn from browser-use history

from ace_next import TraceAnalyser, Reflector, SkillManager, LiteLLMClient, wrap_with_instructor

llm = wrap_with_instructor(LiteLLMClient(model="gpt-4o-mini"))

# Raw traces — plain dicts, no enforced schema
traces = [
    {
        "task": "Find the cheapest flight to Tokyo",
        "output": "$450 on ANA, departing March 15",
        "feedback": "Correct price found in 8 steps",
        "reasoning": "Step 1: Navigate to Google Flights...",
    },
    {
        "task": "Book a hotel in Shibuya",
        "output": "Failed: could not find checkout button",
        "feedback": "Task failed after 15 steps — checkout button was behind a cookie modal",
        "reasoning": "Step 1: Navigate to Booking.com...",
    },
]

# Analyse — raw traces go directly to the Reflector via ctx.trace
analyser = TraceAnalyser.from_roles(reflector=Reflector(llm), skill_manager=SkillManager(llm))
results = analyser.run(traces, epochs=2)
analyser.save("travel_agent.json")

ACE — live Q&A training

from ace_next import ACE, Sample, SimpleEnvironment, Agent, Reflector, SkillManager
from ace_next import LiteLLMClient, wrap_with_instructor

llm = wrap_with_instructor(LiteLLMClient(model="gpt-4o-mini"))

samples = [
    Sample(question="Capital of France?", ground_truth="Paris"),
    Sample(question="Largest ocean?", ground_truth="Pacific"),
]

# Environment provided at construction — EvaluateStep uses it to generate feedback
ace = ACE.from_roles(
    agent=Agent(llm),
    reflector=Reflector(llm),
    skill_manager=SkillManager(llm),
    environment=SimpleEnvironment(),
)
results = ace.run(samples, epochs=3)
ace.save("geography.json")

ACE — without environment

# No environment — trace still contains agent output + ground truth
# The Reflector learns from ground-truth comparison directly
ace = ACE.from_roles(
    agent=Agent(llm),
    reflector=Reflector(llm),
    skill_manager=SkillManager(llm),
)
results = ace.run(samples, epochs=3)

ACE — single-pass with iterable

# Any Iterable works with epochs=1 (consumed once, not replayed)
samples = load_samples_from_csv("eval_set.csv")  # returns a list or generator

ace = ACE.from_roles(agent=Agent(llm), reflector=Reflector(llm), skill_manager=SkillManager(llm))
results = ace.run(samples, epochs=1)

ACE — with checkpoints and deduplication

from ace_next import ACE, Agent, Reflector, SkillManager, SimpleEnvironment
from ace_next.deduplication import DeduplicationManager
from ace_next.protocols.deduplication import DeduplicationConfig

ace = ACE.from_roles(
    agent=Agent(llm),
    reflector=Reflector(llm),
    skill_manager=SkillManager(llm),
    environment=SimpleEnvironment(),
    dedup_manager=DeduplicationManager(DeduplicationConfig(similarity_threshold=0.85)),
    checkpoint_dir="./checkpoints",
    checkpoint_interval=10,
)
# Pipeline: Agent → Evaluate → Reflect → Tag → Update → Apply → Deduplicate → Checkpoint
results = ace.run(samples, epochs=3)

Integration — browser-use runner

from ace_next import BrowserUse, Reflector, SkillManager, LiteLLMClient
from langchain_openai import ChatOpenAI

llm = LiteLLMClient(model="gpt-4o-mini")
browser_llm = ChatOpenAI(model="gpt-4o")

# Explicit construction — bring your own roles
runner = BrowserUse.from_roles(
    browser_llm=browser_llm,
    reflector=Reflector(llm),
    skill_manager=SkillManager(llm),
)

# Or convenience construction — just specify the model
runner = BrowserUse.from_model(browser_llm, ace_model="gpt-4o-mini")

# Live execution + learning
results = runner.run(["Find top HN post", "Check weather in Tokyo"])
runner.save("browser_expert.json")

Integration — LangChain runner (from_model)

from ace_next import LangChain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

chain = ChatPromptTemplate.from_template("Answer: {input}") | ChatOpenAI(model="gpt-4o")

# One-liner construction
runner = LangChain.from_model(chain, ace_model="gpt-4o-mini")
results = runner.run([{"input": "What is ACE?"}, {"input": "Explain skillbooks"}])
runner.save("chain_expert.json")

Integration — Claude Code runner (from_model)

from ace_next import ClaudeCode

runner = ClaudeCode.from_model(working_dir="./my_project", ace_model="gpt-4o-mini")
results = runner.run(["Add unit tests for utils.py", "Refactor the auth module"])
runner.save("code_expert.json")

ACELiteLLM — conversational agent with learning

from ace_next import ACELiteLLM, SimpleEnvironment, Sample

ace = ACELiteLLM.from_model("gpt-4o-mini")

# Direct Q&A (no pipeline)
answer = ace.ask("What is the capital of France?")

# Batch learning (delegates to ACE runner)
samples = [
    Sample(question="Capital of France?", ground_truth="Paris"),
    Sample(question="Largest ocean?", ground_truth="Pacific"),
]
ace.learn(samples, environment=SimpleEnvironment(), epochs=3)

# Manual feedback learning from last ask()
ace.ask("What is 2+2?")
ace.learn_from_feedback("The answer should be 4", ground_truth="4")

ace.save("learned.json")

# With Opik observability (explicit opt-in)
ace = ACELiteLLM.from_model("gpt-4o-mini", opik=True, opik_project="my-project")

# With Recursive Reflector + Opik
from ace_next import RRStep, RRConfig, LiteLLMClient
llm = LiteLLMClient(model="gpt-4o-mini")
rr = RRStep(llm, config=RRConfig(max_iterations=10))
ace = ACELiteLLM(llm, reflector=rr, opik=True)

Fire-and-forget — get results while learning continues

ace = ACE.from_roles(
    agent=Agent(llm),
    reflector=Reflector(llm),
    skill_manager=SkillManager(llm),
)

# wait=False: returns after foreground steps (Agent + Evaluate)
# Background learning (Reflect → Tag → Update → Apply) continues
results = ace.run(samples, epochs=1, wait=False)

# Use agent outputs immediately
for r in results:
    print(r.output.agent_output.final_answer)

# Check learning progress
print(ace.learning_stats)
# {"active": 3, "completed": 12}

# Block when you need the skillbook finalised
ace.wait_for_background(timeout=60.0)
ace.save("learned.json")

Mixed workflow — batch then live

from ace_next import TraceAnalyser, ACE, Skillbook
from ace_next.implementations import Agent, Reflector, SkillManager

reflector = Reflector(llm)
skill_manager = SkillManager(llm)

# Phase 1: build skillbook from historical traces
skillbook = Skillbook()
analyser = TraceAnalyser.from_roles(
    reflector=reflector,
    skill_manager=skill_manager,
    skillbook=skillbook,
)
analyser.run(historical_traces, epochs=3)

# Phase 2: deploy with live learning (reuse the evolved skillbook)
ace = ACE.from_roles(
    agent=Agent(llm),
    reflector=reflector,
    skill_manager=skill_manager,
    skillbook=skillbook,
)
ace.run(live_samples, epochs=1)
ace.save("production.json")

Potential Improvements

Issues acknowledged but deferred from this version of the spec.

Streaming / lazy iteration: _run() eagerly materializes the full iterable into a list of ACEStepContext objects before passing them to Pipeline.run(). For a large generator with epochs=1, the entire input gets buffered into memory. True streaming would require the pipeline to accept an iterator and process items one-at-a-time (e.g., for ctx in contexts: pipeline.run_one(ctx)), or an async iterator pattern with asyncio.as_completed. This is a deliberate simplification — batch materialization keeps the epoch loop and error handling straightforward. Revisit if memory pressure from large single-pass runs becomes a real problem.

Builder API for custom pipelines (speculative): The current API offers two extremes: factory methods (from_roles) that hide the pipeline entirely, and manual Pipeline([...]) construction that requires understanding ACEStepContext, SkillbookView, step contracts, and ACERunner subclassing. Users who want to insert a custom step between Reflect and Update, or swap the execute head while keeping the learning tail, fall into a gap where neither approach serves them well.

One possible direction is a builder API that would bridge this gap:

from ace import ACEBuilder

# Start from a preset, customise from there
ace = (
    ACEBuilder(model="gpt-4o-mini")
    .execute(MyCustomExecuteStep(my_agent))     # replace the execute head
    .add_step(MyValidationStep(), after="reflect")  # insert custom step
    .deduplicate(similarity_threshold=0.85)
    .checkpoint("./checkpoints", interval=10)
    .build()
)
results = ace.run(samples, environment)

# Or build from the standard ACE preset and tweak
ace = (
    ACEBuilder.from_preset("ace", model="gpt-4o-mini")
    .add_step(MyLoggingStep(), after="apply")
    .build()
)

The builder would handle SkillbookView wiring, step ordering validation, and ACERunner construction internally. Users compose by name ("reflect", "apply") rather than by importing step classes.

This would only be worth pursuing when there is evidence of users building custom pipelines with learning_tail() and hitting friction with the manual wiring. The learning_tail() helper (see Integration Pattern section) covers the most common customisation — custom execute step + standard learning — without a builder. A builder adds value when users need fine-grained insertion points (between existing steps) or want to compose from presets without understanding the step internals. The main risk is that a builder mirrors the step list, adding a second construction path to document, test, and keep in sync. It can also hide the requires/provides contracts — when a validation step is inserted at the wrong position, the error comes from the pipeline engine (field missing) rather than the builder (wrong position name), making debugging indirect. Mitigate by having the builder validate the final step chain at build() time and surfacing clear errors.

LiteLLM proxy base URL support (LITELLM_API_BASE): The learn_from_traces.py script (and by extension ace-learn) does not pass api_base to LiteLLMClient, so users running a LiteLLM proxy server behind a custom URL cannot use it with the OpenClaw integration. LiteLLMClient already supports api_base as a constructor parameter — the script just needs to read os.getenv("LITELLM_API_BASE") and pass it through. Deferred because all current users connect directly to providers (Anthropic, Bedrock, OpenRouter) where only a model string and API key are needed. Revisit when a user needs proxy support.

Skillbook rollback and versioning: Currently the skillbook is mutated in place with no way to undo a bad update. If the LLM hallucinates a harmful skill or a batch degrades overall quality, the only recovery is restoring from a checkpoint file. A lightweight versioning mechanism — e.g., snapshotting skillbook state at epoch boundaries or before each ApplyStep, with a rollback(to_version) method — would enable automatic revert when a validation metric degrades, A/B comparison between skillbook versions, and safer experimentation with aggressive learning rates. This could live as a VersionedSkillbook wrapper or as an optional SnapshotStep inserted before ApplyStep. Deferred because the current checkpoint-to-disk approach covers the most common recovery scenario (resume after crash), and in-memory versioning adds memory overhead proportional to skillbook size times number of snapshots.


What Was Rejected and Why

Runner extends Pipeline: Making TraceAnalyser and ACE subclasses of Pipeline was considered. Rejected — the runner is not a pipeline. It owns the epoch loop. Composition (self.pipeline) keeps responsibilities separate.

Cross-sample state (reflection window): A rolling window of recent reflections that persists across samples was considered, with variants: on the runner, on StepContext, on step instances, via a shared mediator object. All rejected — each sample should be independent. The only cross-sample coupling is the skillbook itself, which evolves as samples are processed. Adding a reflection window complicates the model (reset between epochs, eventual consistency with background steps, ordering issues with concurrent workers) for marginal benefit.

Separate Online and Offline classes: Keeping two runner classes for single-pass and multi-epoch was considered. Rejected — the only difference is epochs=1 vs epochs > 1, which is a parameter, not a class distinction. ACE handles both. TraceAnalyser is a separate class because its input type is fundamentally different (raw traces vs Sample + Environment), not because of epoch count.

Structured Trace dataclass: A @dataclass Trace with typed fields (task, output, feedback, reasoning, etc.) was considered. Rejected — it imposes a schema on trace data that doesn't match reality. External frameworks produce wildly different trace shapes (browser-use AgentHistoryList, LangChain result dicts, Claude Code transcripts). Forcing them through a common dataclass means either losing information (fields that don't map) or adding catch-all metadata: dict buckets that defeat the purpose of typing. Instead, ctx.trace is object | None — the raw trace as-is. The Reflector receives it directly and is responsible for making sense of it. This gives maximum flexibility for analysis without constraining trace format. Extraction helpers (converter functions, typed intermediate representations) can be layered on later if needed.

Steps that accept both traces and samples: Making ReflectStep and UpdateStep polymorphic over input type was considered. Rejected — steps always receive StepContext with the same named fields. The runner (via _build_context) is responsible for building the context correctly. Steps do not need to know whether the data came from a raw trace or from live execution.

Observability in the runner: Keeping observability logic in ACERunner._track_observability_data() was considered. Rejected — it mixes concerns. A dedicated OpikStep is independently testable, optional, and composable. It is not wired into learning_tail() — users append it explicitly to avoid coupling observability into the core pipeline.

Custom AsyncLearningPipeline: The legacy ace/async_learning.py implements a manual thread pool with reflector and skill manager queues. Rejected — the pipeline engine's async_boundary and max_workers provide the same functionality with less code and consistent semantics.

Per-integration pipeline classes: Having each integration define its own pipeline class was considered. Rejected — every integration pipeline has the same learning tail; only the execute step differs. Separate pipeline classes duplicate the learning tail wiring and the runner infrastructure. Instead, integrations provide execute steps that compose into an ACERunner subclass, reusing the shared _run() loop and epoch logic.

Checkpoints in the runner: Having the runner own checkpoint logic (via run() parameters) was considered. Rejected — a CheckpointStep at the end of the pipeline tail keeps checkpointing within the pipeline formalism. Checkpoint configuration belongs at construction time (factory methods), not at call time (run()).

Mutable Skillbook directly on the context: Storing the real Skillbook as a field on ACEStepContext was the initial design. Rejected — StepContext is frozen, but Skillbook is a mutable object. Placing it on the context creates the illusion of immutability while allowing any step to mutate shared state through the reference. Instead, the context carries a SkillbookView (read-only projection) that exposes only read methods (as_prompt(), get_skill(), __len__). Write methods don't exist on the view — calling them raises AttributeError at runtime and a type error at check time. Steps that need to write (TagStep, ApplyStep, DeduplicateStep, CheckpointStep) receive the real Skillbook via constructor injection. This gives us both: pipeline engine validation (skillbook is in requires/provides) and true immutability enforcement on the context.

Combined Reflect+Tag and Update+Apply steps: Keeping ReflectStep as both reflection and tagging, and UpdateStep as both generation and application was considered. Rejected — each combination mixes a pure function (LLM call producing output) with a side effect (skillbook mutation). Splitting them means pure steps can be tested without a skillbook, side-effect steps can be tested without an LLM, and concerns are cleanly separated.

Instructor auto-wrapping in implementations: The old ace/roles.py auto-wrapped LLM clients with Instructor if complete_structured was missing (duck-typing check + fallback). This has since been updated: ace/roles.py now checks INSTRUCTOR_AVAILABLE and gracefully falls back to the raw LLM if the instructor package is not installed (it is an optional dependency via pip install ace-framework[instructor]). Rejected for ace_next — auto-wrapping masks what the implementation actually requires. In ace_next, LLMClientLike explicitly requires both complete() and complete_structured(). Callers wrap their LLM clients before passing them in (e.g. wrap_with_instructor(LiteLLMClient(...)) from ace_next.providers). This makes the requirement visible at the call site and keeps implementations dependency-free.

Recursive Reflector (initial rejection, now implemented): The old ace/reflector/ subsystem supports recursive mode where the Reflector iterates multiple times to deepen analysis. Initially rejected for ace_next due to complexity. Now implemented as RRStep in ace_next/rr/ — a SubRunner[ACEStepContext]-based step that runs an iterative REPL loop (LLM call → extract code → sandbox exec → check result). RRStep satisfies both StepProtocol[ACEStepContext] (composable in any pipeline) and ReflectorLike (usable as a drop-in reflector, e.g. ACELiteLLM(llm, reflector=rr)). Exported from ace_next as RRStep and RRConfig.

Observability decorator on implementations: The old ace/roles.py uses @maybe_track() decorators for Opik tracing on every role method. Rejected — OpikStep handles metrics at the pipeline level with full visibility into all context fields. Adding per-method decorators would double-count and create coupling between implementations and the observability system.

Deduplication inside SkillManager: The old ace/roles.py SkillManager integrates with DeduplicationManager directly — calling get_similarity_report() before the LLM call and apply_operations_from_response() after. Rejected for ace_next — deduplication is now a separate DeduplicateStep in the pipeline. This is cleaner separation: the SkillManager role only produces SkillManagerOutput, and deduplication runs at a configurable interval as an independent pipeline step. The step takes a DeduplicationManagerLike protocol, keeping it decoupled from the concrete implementation.

Shared ace_next/features.py module: Creating a centralized feature detection module (like ace/features.py) for optional dependency checks was considered. Rejected — the only code that needs feature detection is deduplication/detector.py, which uses a local _has(module) helper with importlib.import_module. A shared module would add a file for a single 4-line function. If more code needs feature detection in the future, the helper can be promoted to a shared location.

Separate wrapper classes for integration runners: Having separate convenience classes (ACEAgent, ACELangChain, ACEClaudeCode) that wrap the corresponding runners was the initial design. Each wrapper eagerly built a runner via from_roles() and delegated all calls to it. Rejected — the wrappers only added from_model() and a few lifecycle helpers (get_strategies(), backward-compat aliases), all of which fit naturally as methods on the runner class itself. Two classes for one concept forces users to understand both and choose which to use, while the runner already manages the pipeline, skillbook, and epoch loop. Instead, from_model() and convenience methods are defined directly on BrowserUse, LangChain, and ClaudeCode. The exception is ACELiteLLM, which is genuinely different: it wraps two runners (ACE and TraceAnalyser), has ask() (direct Agent call, no pipeline), and has learn_from_feedback() (manual single-shot learning). These don't map to any single runner's API.