Mentu

EventBus

EventBus

The EventBus is mentu's local real-time event system. Every execution event -- step started, step completed, sequence finished, gate awaiting review -- is broadcast over a Unix socket. External tools connect to this socket to monitor, react, or control execution.

The engine writes events during execution. Anything that can read a Unix socket can consume them.

Architecture

SequenceRunner / LoopRunner
execution events
EventForwarder (sink pipeline: filter, namespace, dedup)
JSON-RPC over Unix socket
~/.mentu/mentu-local.sock
fanout to all subscribers
mentud-toolbar, eventbus-n8n-bridge, your integration

The EventForwarder sits between the execution engine and the socket. It implements a composable sink pipeline modeled after containerd's event-exchange architecture: filter, namespace isolation, dedup, then write. Sinks are composed at init time, not per-event, so there is zero dynamic dispatch overhead during execution.

Text events are batched (200ms flush interval) to avoid flooding the socket with per-character deltas.

Socket Protocol

  • Transport: Unix domain socket at ~/.mentu/mentu-local.sock
  • Protocol: JSON-RPC 2.0 over newline-delimited JSON
  • Direction: Server pushes events to connected clients
  • Subscription: events.subscribe { types: ["*"] } to receive all events
  • Connection: Non-blocking. If the socket does not exist, the engine logs and skips. Never crashes or blocks startup.

A subscription request looks like:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "events.subscribe",
  "params": { "types": ["step.started", "step.completed"] }
}

Namespace-scoped subscription (receive only events from a specific pipeline):

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "events.subscribe",
  "params": { "types": ["*"], "namespace": "my-pipeline" }
}

Event Types

Event When Payload
session.init Agent session begins { session_id, model, pipeline }
tool.start Tool invocation begins { name, summary, phase, pipeline }
text.delta Batched text output { content, phase, pipeline }
phase.complete Step finishes { phase, turns, cost, duration_s, pipeline }
step.skipped Step skipped { phase, pipeline }
step.backgrounded Step moved to background { phase, pipeline }
error Step error { message, phase, pipeline }
rate.limited API rate limit hit { phase, pipeline }
execution.paused Execution paused { phase, pipeline }
execution.resumed Execution resumed { phase, pipeline }
gate.await Gate waiting for human review { label, trust, findings, status }
sequence.start Sequence begins { sequence_name, total_steps, from_step }
sequence.complete Sequence finishes { sequence_name, ok_count, warn_count, cost, duration_seconds, success }
fleet.start Fleet begins { fleet_name, sequence_count, total_steps }
fleet.complete Fleet finishes { fleet_name, total_ok, total_fail, total_cost, duration_seconds }
fork.start Fork begins { label, branches, strategies }
fork.branch Fork branch completes { label, branch, strategy, status, trust }
fork.collapse Fork winner selected { label, winner, strategy, trust }
recipe.committed Recipe passes constitutional gate { name, approved, confidence, commitmentId }
lifecycle.update Per-step lifecycle telemetry { recipe, phase, trust_mean, trust_stddev, contradictions, cost_usd, steps_completed, steps_total }
kill.all Kill all running steps { pipeline }

Every event payload includes a type field with the event name and an optional pipeline field identifying the source pipeline namespace.

Event Envelope

Every event is wrapped in an EventEnvelope before dispatch:

Field Type Description
topic String Event type (e.g. tool.start)
namespace String Pipeline name or "global"
timestamp Date When the event was created
source String Origin component (e.g. "engine", "looprunner")
payload Object Original event parameters
method String JSON-RPC method (default "agent.event")

Sink Pipeline

The EventForwarder uses a composable sink chain. Each sink receives an envelope, optionally transforms or filters it, and passes it to the next sink.

Sink Purpose
FilterSink Drops events whose topic is not in the allowed set
NamespaceSink Drops events from other pipeline namespaces
DedupSink Sliding-window duplicate detection (5s default)
EventForwarder Terminal sink. Serializes to JSON-RPC and writes to socket

Sinks compose at init time:

FilterSink -> NamespaceSink -> DedupSink -> EventForwarder (socket write)

This is the containerd pattern: per-subscriber filtering applied before buffering, reducing processing overhead for selective subscribers.

Control Signals

The ControlSignalBus allows external tools to control execution. Signals converge from two paths: terminal keyboard input and the EventBus socket.

Signal Effect
pause Pause execution after current step
resume Resume paused execution
togglePause Toggle pause state
skip Skip the current step (also resumes if paused)
background Background the current step
killAll Kill all running steps and sequences

Subscribers (LoopRunner, SequenceRunner) consume signals via two methods:

  • waitIfPaused() blocks the caller while the bus is paused
  • consumeSkip() returns true exactly once per skip signal, then resets

EventBus vs Webhooks

EventBus Webhooks
Transport Unix socket (local) HTTPS (cloud)
Latency Microseconds Seconds
Scope Same machine only Any endpoint
Protocol JSON-RPC 2.0 stream HMAC-signed HTTP POST
Direction Push to connected clients POST to registered URLs
Use case Real-time monitoring, toolbars, dashboards Cross-system automation, notifications

Use the EventBus when your integration runs on the same machine. Use webhooks when you need to reach external services.

Building an Integration

Connect to the Unix socket, subscribe to event types, and process incoming JSON-RPC notifications.

import { connect } from "net";
 
const socket = connect({ path: `${process.env.HOME}/.mentu/mentu-local.sock` });
 
// Subscribe to all events
socket.write(JSON.stringify({
  jsonrpc: "2.0",
  id: 1,
  method: "events.subscribe",
  params: { types: ["*"] }
}) + "\n");
 
// Process newline-delimited JSON-RPC notifications
let buffer = "";
socket.on("data", (data) => {
  buffer += data.toString();
  const lines = buffer.split("\n");
  buffer = lines.pop() || "";
 
  for (const line of lines) {
    if (!line) continue;
    const event = JSON.parse(line);
    const { type } = event.params || {};
 
    // Forward to n8n, Slack, or any HTTP endpoint
    fetch("https://your-n8n.example.com/webhook/mentu", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify(event),
    });
  }
});

A production-ready example lives at ~/.mentu/scripts/eventbus-n8n-bridge.ts. It includes reconnection backoff, event filtering, and structured forwarding to n8n webhooks.

Subscribers

Built-in subscribers:

  • EventForwarder -- serializes execution events through the sink pipeline and writes them to the Unix socket
  • mentud-toolbar -- macOS menu bar showing current step, progress, and cost
  • ControlSignalBus -- receives pause, resume, skip, background, and kill signals from external tools
  • EventBusSubscriber -- client-side actor that connects to the socket with topic-based filtering, namespace isolation, and sliding-window dedup
© 2026 Mentu.