Overview
AgentRunner is an abstract class that defines the interface for running agents. It provides methods for executing agent runs, connecting to existing threads, checking run status, and stopping runs. Custom runners can be implemented to support different execution strategies (in-memory, distributed, persistent, etc.).
Class Definition
abstract class AgentRunner {
abstract run(request: AgentRunnerRunRequest): Observable<BaseEvent>;
abstract connect(request: AgentRunnerConnectRequest): Observable<BaseEvent>;
abstract isRunning(request: AgentRunnerIsRunningRequest): Promise<boolean>;
abstract stop(request: AgentRunnerStopRequest): Promise<boolean | undefined>;
}
Methods
run()
Execute an agent with given input and stream events back.
abstract run(request: AgentRunnerRunRequest): Observable<BaseEvent>
request
AgentRunnerRunRequest
required
The run request containing the agent, thread ID, and input data.
Returns: An RxJS Observable that emits AG-UI protocol events during execution.
AgentRunnerRunRequest
interface AgentRunnerRunRequest {
threadId: string;
agent: AbstractAgent;
input: RunAgentInput;
}
Unique identifier for the conversation thread. Used to maintain conversation history.
The agent instance to execute.
Input data for the agent run, including messages, tools, context, and state.{
runId: 'run_123',
threadId: 'thread_456',
messages: [...],
tools: [...],
context: [...],
state: {...},
forwardedProps: {...}
}
Example:
const events$ = runner.run({
threadId: 'conversation-123',
agent: myAgent,
input: {
runId: 'run-456',
threadId: 'conversation-123',
messages: [
{ role: 'user', content: 'Hello!' }
],
tools: [],
context: [],
state: {}
}
});
events$.subscribe({
next: (event) => {
console.log('Event:', event.type);
},
complete: () => {
console.log('Run completed');
},
error: (err) => {
console.error('Run failed:', err);
}
});
connect()
Connect to an existing thread to receive events from ongoing or completed runs.
abstract connect(request: AgentRunnerConnectRequest): Observable<BaseEvent>
request
AgentRunnerConnectRequest
required
The connect request containing the thread ID and optional headers.
Returns: An RxJS Observable that emits events from the thread.
AgentRunnerConnectRequest
interface AgentRunnerConnectRequest {
threadId: string;
headers?: Record<string, string>;
}
The thread ID to connect to.
Optional HTTP headers for authentication or context.
Example:
const events$ = runner.connect({
threadId: 'conversation-123',
headers: {
'Authorization': 'Bearer token'
}
});
events$.subscribe((event) => {
console.log('Thread event:', event);
});
isRunning()
Check if a run is currently active on a thread.
abstract isRunning(request: AgentRunnerIsRunningRequest): Promise<boolean>
request
AgentRunnerIsRunningRequest
required
The request containing the thread ID.
Returns: A Promise that resolves to true if the thread has an active run, false otherwise.
AgentRunnerIsRunningRequest
interface AgentRunnerIsRunningRequest {
threadId: string;
}
Example:
const running = await runner.isRunning({ threadId: 'conversation-123' });
if (running) {
console.log('Thread is currently running');
} else {
console.log('Thread is idle');
}
stop()
Stop an active run on a thread.
abstract stop(request: AgentRunnerStopRequest): Promise<boolean | undefined>
request
AgentRunnerStopRequest
required
The request containing the thread ID.
Returns: A Promise that resolves to:
true if the run was successfully stopped
false if there was no run to stop
undefined if the operation completed without confirmation
AgentRunnerStopRequest
interface AgentRunnerStopRequest {
threadId: string;
}
Example:
const stopped = await runner.stop({ threadId: 'conversation-123' });
if (stopped) {
console.log('Run stopped successfully');
} else {
console.log('No active run to stop');
}
Built-in Implementation
InMemoryAgentRunner
The default implementation that runs agents in-memory within the current process.
import { InMemoryAgentRunner } from '@copilotkit/runtime';
const runner = new InMemoryAgentRunner();
const runtime = new CopilotRuntime({
agents: { ... },
runner
});
Features:
- Runs agents in the same process as the server
- Maintains thread state in memory
- Automatic cleanup of completed runs
- Supports concurrent runs across different threads
- Suitable for development and single-instance deployments
Limitations:
- State is lost on server restart
- Not suitable for multi-instance deployments without shared state
- Memory usage scales with number of active threads
Custom Runner Implementation
Implement your own runner for custom execution strategies:
import { AgentRunner, AgentRunnerRunRequest, AgentRunnerConnectRequest } from '@copilotkit/runtime';
import { Observable } from 'rxjs';
import type { BaseEvent, AbstractAgent } from '@ag-ui/client';
class RedisAgentRunner extends AgentRunner {
constructor(private redisClient: RedisClient) {
super();
}
run(request: AgentRunnerRunRequest): Observable<BaseEvent> {
return new Observable((subscriber) => {
const { threadId, agent, input } = request;
// Store thread state in Redis
this.redisClient.set(`thread:${threadId}:running`, 'true');
// Execute agent
const subscription = agent.run(input).subscribe({
next: (event) => {
// Store events in Redis for replay
this.redisClient.rpush(
`thread:${threadId}:events`,
JSON.stringify(event)
);
subscriber.next(event);
},
complete: () => {
this.redisClient.set(`thread:${threadId}:running`, 'false');
subscriber.complete();
},
error: (err) => {
this.redisClient.set(`thread:${threadId}:running`, 'false');
subscriber.error(err);
}
});
return () => {
subscription.unsubscribe();
this.redisClient.set(`thread:${threadId}:running`, 'false');
};
});
}
connect(request: AgentRunnerConnectRequest): Observable<BaseEvent> {
return new Observable((subscriber) => {
const { threadId } = request;
// Replay stored events
this.redisClient.lrange(
`thread:${threadId}:events`,
0,
-1,
(err, events) => {
if (err) {
subscriber.error(err);
return;
}
events.forEach((eventStr) => {
subscriber.next(JSON.parse(eventStr));
});
// Subscribe to new events via Redis pub/sub
const pubsub = this.redisClient.duplicate();
pubsub.subscribe(`thread:${threadId}:new-events`);
pubsub.on('message', (channel, message) => {
subscriber.next(JSON.parse(message));
});
}
);
});
}
async isRunning(request: AgentRunnerIsRunningRequest): Promise<boolean> {
const status = await this.redisClient.get(`thread:${request.threadId}:running`);
return status === 'true';
}
async stop(request: AgentRunnerStopRequest): Promise<boolean> {
const running = await this.isRunning(request);
if (running) {
// Publish stop signal
await this.redisClient.publish(
`thread:${request.threadId}:commands`,
'stop'
);
return true;
}
return false;
}
}
// Usage
const runner = new RedisAgentRunner(redisClient);
const runtime = new CopilotRuntime({
agents: { ... },
runner
});
Event Types
Runners emit AG-UI protocol events. Common event types:
enum EventType {
RUN_STARTED = 'RUN_STARTED',
RUN_FINISHED = 'RUN_FINISHED',
RUN_ERROR = 'RUN_ERROR',
TEXT_MESSAGE_CHUNK = 'TEXT_MESSAGE_CHUNK',
TOOL_CALL_START = 'TOOL_CALL_START',
TOOL_CALL_ARGS = 'TOOL_CALL_ARGS',
TOOL_CALL_END = 'TOOL_CALL_END',
TOOL_CALL_RESULT = 'TOOL_CALL_RESULT',
STATE_SNAPSHOT = 'STATE_SNAPSHOT',
STATE_DELTA = 'STATE_DELTA',
// ... more event types
}
Use Cases
Development
Use the default InMemoryAgentRunner for simple, in-process execution:
const runtime = new CopilotRuntime({
agents: { ... }
// runner defaults to InMemoryAgentRunner
});
Distributed Systems
Implement a custom runner that uses message queues or distributed state:
const runner = new KafkaAgentRunner(kafkaConfig);
const runtime = new CopilotRuntime({
agents: { ... },
runner
});
Persistent Threads
Implement a runner that persists thread history to a database:
const runner = new PostgresAgentRunner(dbConnection);
const runtime = new CopilotRuntime({
agents: { ... },
runner
});
Long-running Agents
Implement a runner that offloads execution to worker processes:
const runner = new WorkerPoolAgentRunner(workerPool);
const runtime = new CopilotRuntime({
agents: { ... },
runner
});
See Also