EventBus
EventBus
The EventBus is mentu's local real-time event system. Every execution event -- step started, step completed, sequence finished, gate awaiting review -- is broadcast over a Unix socket. External tools connect to this socket to monitor, react, or control execution.
The engine writes events during execution. Anything that can read a Unix socket can consume them.
Architecture
The EventForwarder sits between the execution engine and the socket. It implements a composable sink pipeline modeled after containerd's event-exchange architecture: filter, namespace isolation, dedup, then write. Sinks are composed at init time, not per-event, so there is zero dynamic dispatch overhead during execution.
Text events are batched (200ms flush interval) to avoid flooding the socket with per-character deltas.
Socket Protocol
- Transport: Unix domain socket at
~/.mentu/mentu-local.sock - Protocol: JSON-RPC 2.0 over newline-delimited JSON
- Direction: Server pushes events to connected clients
- Subscription:
events.subscribe { types: ["*"] }to receive all events - Connection: Non-blocking. If the socket does not exist, the engine logs and skips. Never crashes or blocks startup.
A subscription request looks like:
{
"jsonrpc": "2.0",
"id": 1,
"method": "events.subscribe",
"params": { "types": ["step.started", "step.completed"] }
}Namespace-scoped subscription (receive only events from a specific pipeline):
{
"jsonrpc": "2.0",
"id": 1,
"method": "events.subscribe",
"params": { "types": ["*"], "namespace": "my-pipeline" }
}Event Types
| Event | When | Payload |
|---|---|---|
session.init |
Agent session begins | { session_id, model, pipeline } |
tool.start |
Tool invocation begins | { name, summary, phase, pipeline } |
text.delta |
Batched text output | { content, phase, pipeline } |
phase.complete |
Step finishes | { phase, turns, cost, duration_s, pipeline } |
step.skipped |
Step skipped | { phase, pipeline } |
step.backgrounded |
Step moved to background | { phase, pipeline } |
error |
Step error | { message, phase, pipeline } |
rate.limited |
API rate limit hit | { phase, pipeline } |
execution.paused |
Execution paused | { phase, pipeline } |
execution.resumed |
Execution resumed | { phase, pipeline } |
gate.await |
Gate waiting for human review | { label, trust, findings, status } |
sequence.start |
Sequence begins | { sequence_name, total_steps, from_step } |
sequence.complete |
Sequence finishes | { sequence_name, ok_count, warn_count, cost, duration_seconds, success } |
fleet.start |
Fleet begins | { fleet_name, sequence_count, total_steps } |
fleet.complete |
Fleet finishes | { fleet_name, total_ok, total_fail, total_cost, duration_seconds } |
fork.start |
Fork begins | { label, branches, strategies } |
fork.branch |
Fork branch completes | { label, branch, strategy, status, trust } |
fork.collapse |
Fork winner selected | { label, winner, strategy, trust } |
recipe.committed |
Recipe passes constitutional gate | { name, approved, confidence, commitmentId } |
lifecycle.update |
Per-step lifecycle telemetry | { recipe, phase, trust_mean, trust_stddev, contradictions, cost_usd, steps_completed, steps_total } |
kill.all |
Kill all running steps | { pipeline } |
Every event payload includes a type field with the event name and an optional pipeline field identifying the source pipeline namespace.
Event Envelope
Every event is wrapped in an EventEnvelope before dispatch:
| Field | Type | Description |
|---|---|---|
topic |
String | Event type (e.g. tool.start) |
namespace |
String | Pipeline name or "global" |
timestamp |
Date | When the event was created |
source |
String | Origin component (e.g. "engine", "looprunner") |
payload |
Object | Original event parameters |
method |
String | JSON-RPC method (default "agent.event") |
Sink Pipeline
The EventForwarder uses a composable sink chain. Each sink receives an envelope, optionally transforms or filters it, and passes it to the next sink.
| Sink | Purpose |
|---|---|
FilterSink |
Drops events whose topic is not in the allowed set |
NamespaceSink |
Drops events from other pipeline namespaces |
DedupSink |
Sliding-window duplicate detection (5s default) |
EventForwarder |
Terminal sink. Serializes to JSON-RPC and writes to socket |
Sinks compose at init time:
FilterSink -> NamespaceSink -> DedupSink -> EventForwarder (socket write)This is the containerd pattern: per-subscriber filtering applied before buffering, reducing processing overhead for selective subscribers.
Control Signals
The ControlSignalBus allows external tools to control execution. Signals converge from two paths: terminal keyboard input and the EventBus socket.
| Signal | Effect |
|---|---|
pause |
Pause execution after current step |
resume |
Resume paused execution |
togglePause |
Toggle pause state |
skip |
Skip the current step (also resumes if paused) |
background |
Background the current step |
killAll |
Kill all running steps and sequences |
Subscribers (LoopRunner, SequenceRunner) consume signals via two methods:
waitIfPaused()blocks the caller while the bus is pausedconsumeSkip()returns true exactly once per skip signal, then resets
EventBus vs Webhooks
| EventBus | Webhooks | |
|---|---|---|
| Transport | Unix socket (local) | HTTPS (cloud) |
| Latency | Microseconds | Seconds |
| Scope | Same machine only | Any endpoint |
| Protocol | JSON-RPC 2.0 stream | HMAC-signed HTTP POST |
| Direction | Push to connected clients | POST to registered URLs |
| Use case | Real-time monitoring, toolbars, dashboards | Cross-system automation, notifications |
Use the EventBus when your integration runs on the same machine. Use webhooks when you need to reach external services.
Building an Integration
Connect to the Unix socket, subscribe to event types, and process incoming JSON-RPC notifications.
import { connect } from "net";
const socket = connect({ path: `${process.env.HOME}/.mentu/mentu-local.sock` });
// Subscribe to all events
socket.write(JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "events.subscribe",
params: { types: ["*"] }
}) + "\n");
// Process newline-delimited JSON-RPC notifications
let buffer = "";
socket.on("data", (data) => {
buffer += data.toString();
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (!line) continue;
const event = JSON.parse(line);
const { type } = event.params || {};
// Forward to n8n, Slack, or any HTTP endpoint
fetch("https://your-n8n.example.com/webhook/mentu", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(event),
});
}
});A production-ready example lives at ~/.mentu/scripts/eventbus-n8n-bridge.ts. It includes reconnection backoff, event filtering, and structured forwarding to n8n webhooks.
Subscribers
Built-in subscribers:
- EventForwarder -- serializes execution events through the sink pipeline and writes them to the Unix socket
- mentud-toolbar -- macOS menu bar showing current step, progress, and cost
- ControlSignalBus -- receives pause, resume, skip, background, and kill signals from external tools
- EventBusSubscriber -- client-side actor that connects to the socket with topic-based filtering, namespace isolation, and sliding-window dedup