Skip to main content

Overview

AgentRunner is an abstract class that defines the interface for running agents. It provides methods for executing agent runs, connecting to existing threads, checking run status, and stopping runs. Custom runners can be implemented to support different execution strategies (in-memory, distributed, persistent, etc.).

Class Definition

abstract class AgentRunner {
  abstract run(request: AgentRunnerRunRequest): Observable<BaseEvent>;
  abstract connect(request: AgentRunnerConnectRequest): Observable<BaseEvent>;
  abstract isRunning(request: AgentRunnerIsRunningRequest): Promise<boolean>;
  abstract stop(request: AgentRunnerStopRequest): Promise<boolean | undefined>;
}

Methods

run()

Execute an agent with given input and stream events back.
abstract run(request: AgentRunnerRunRequest): Observable<BaseEvent>
request
AgentRunnerRunRequest
required
The run request containing the agent, thread ID, and input data.
Returns: An RxJS Observable that emits AG-UI protocol events during execution.

AgentRunnerRunRequest

interface AgentRunnerRunRequest {
  threadId: string;
  agent: AbstractAgent;
  input: RunAgentInput;
}
threadId
string
required
Unique identifier for the conversation thread. Used to maintain conversation history.
agent
AbstractAgent
required
The agent instance to execute.
input
RunAgentInput
required
Input data for the agent run, including messages, tools, context, and state.
{
  runId: 'run_123',
  threadId: 'thread_456',
  messages: [...],
  tools: [...],
  context: [...],
  state: {...},
  forwardedProps: {...}
}
Example:
const events$ = runner.run({
  threadId: 'conversation-123',
  agent: myAgent,
  input: {
    runId: 'run-456',
    threadId: 'conversation-123',
    messages: [
      { role: 'user', content: 'Hello!' }
    ],
    tools: [],
    context: [],
    state: {}
  }
});

events$.subscribe({
  next: (event) => {
    console.log('Event:', event.type);
  },
  complete: () => {
    console.log('Run completed');
  },
  error: (err) => {
    console.error('Run failed:', err);
  }
});

connect()

Connect to an existing thread to receive events from ongoing or completed runs.
abstract connect(request: AgentRunnerConnectRequest): Observable<BaseEvent>
request
AgentRunnerConnectRequest
required
The connect request containing the thread ID and optional headers.
Returns: An RxJS Observable that emits events from the thread.

AgentRunnerConnectRequest

interface AgentRunnerConnectRequest {
  threadId: string;
  headers?: Record<string, string>;
}
threadId
string
required
The thread ID to connect to.
headers
Record<string, string>
Optional HTTP headers for authentication or context.
Example:
const events$ = runner.connect({
  threadId: 'conversation-123',
  headers: {
    'Authorization': 'Bearer token'
  }
});

events$.subscribe((event) => {
  console.log('Thread event:', event);
});

isRunning()

Check if a run is currently active on a thread.
abstract isRunning(request: AgentRunnerIsRunningRequest): Promise<boolean>
request
AgentRunnerIsRunningRequest
required
The request containing the thread ID.
Returns: A Promise that resolves to true if the thread has an active run, false otherwise.

AgentRunnerIsRunningRequest

interface AgentRunnerIsRunningRequest {
  threadId: string;
}
Example:
const running = await runner.isRunning({ threadId: 'conversation-123' });
if (running) {
  console.log('Thread is currently running');
} else {
  console.log('Thread is idle');
}

stop()

Stop an active run on a thread.
abstract stop(request: AgentRunnerStopRequest): Promise<boolean | undefined>
request
AgentRunnerStopRequest
required
The request containing the thread ID.
Returns: A Promise that resolves to:
  • true if the run was successfully stopped
  • false if there was no run to stop
  • undefined if the operation completed without confirmation

AgentRunnerStopRequest

interface AgentRunnerStopRequest {
  threadId: string;
}
Example:
const stopped = await runner.stop({ threadId: 'conversation-123' });
if (stopped) {
  console.log('Run stopped successfully');
} else {
  console.log('No active run to stop');
}

Built-in Implementation

InMemoryAgentRunner

The default implementation that runs agents in-memory within the current process.
import { InMemoryAgentRunner } from '@copilotkit/runtime';

const runner = new InMemoryAgentRunner();

const runtime = new CopilotRuntime({
  agents: { ... },
  runner
});
Features:
  • Runs agents in the same process as the server
  • Maintains thread state in memory
  • Automatic cleanup of completed runs
  • Supports concurrent runs across different threads
  • Suitable for development and single-instance deployments
Limitations:
  • State is lost on server restart
  • Not suitable for multi-instance deployments without shared state
  • Memory usage scales with number of active threads

Custom Runner Implementation

Implement your own runner for custom execution strategies:
import { AgentRunner, AgentRunnerRunRequest, AgentRunnerConnectRequest } from '@copilotkit/runtime';
import { Observable } from 'rxjs';
import type { BaseEvent, AbstractAgent } from '@ag-ui/client';

class RedisAgentRunner extends AgentRunner {
  constructor(private redisClient: RedisClient) {
    super();
  }

  run(request: AgentRunnerRunRequest): Observable<BaseEvent> {
    return new Observable((subscriber) => {
      const { threadId, agent, input } = request;
      
      // Store thread state in Redis
      this.redisClient.set(`thread:${threadId}:running`, 'true');
      
      // Execute agent
      const subscription = agent.run(input).subscribe({
        next: (event) => {
          // Store events in Redis for replay
          this.redisClient.rpush(
            `thread:${threadId}:events`,
            JSON.stringify(event)
          );
          subscriber.next(event);
        },
        complete: () => {
          this.redisClient.set(`thread:${threadId}:running`, 'false');
          subscriber.complete();
        },
        error: (err) => {
          this.redisClient.set(`thread:${threadId}:running`, 'false');
          subscriber.error(err);
        }
      });

      return () => {
        subscription.unsubscribe();
        this.redisClient.set(`thread:${threadId}:running`, 'false');
      };
    });
  }

  connect(request: AgentRunnerConnectRequest): Observable<BaseEvent> {
    return new Observable((subscriber) => {
      const { threadId } = request;
      
      // Replay stored events
      this.redisClient.lrange(
        `thread:${threadId}:events`,
        0,
        -1,
        (err, events) => {
          if (err) {
            subscriber.error(err);
            return;
          }
          
          events.forEach((eventStr) => {
            subscriber.next(JSON.parse(eventStr));
          });
          
          // Subscribe to new events via Redis pub/sub
          const pubsub = this.redisClient.duplicate();
          pubsub.subscribe(`thread:${threadId}:new-events`);
          pubsub.on('message', (channel, message) => {
            subscriber.next(JSON.parse(message));
          });
        }
      );
    });
  }

  async isRunning(request: AgentRunnerIsRunningRequest): Promise<boolean> {
    const status = await this.redisClient.get(`thread:${request.threadId}:running`);
    return status === 'true';
  }

  async stop(request: AgentRunnerStopRequest): Promise<boolean> {
    const running = await this.isRunning(request);
    if (running) {
      // Publish stop signal
      await this.redisClient.publish(
        `thread:${request.threadId}:commands`,
        'stop'
      );
      return true;
    }
    return false;
  }
}

// Usage
const runner = new RedisAgentRunner(redisClient);
const runtime = new CopilotRuntime({
  agents: { ... },
  runner
});

Event Types

Runners emit AG-UI protocol events. Common event types:
enum EventType {
  RUN_STARTED = 'RUN_STARTED',
  RUN_FINISHED = 'RUN_FINISHED',
  RUN_ERROR = 'RUN_ERROR',
  TEXT_MESSAGE_CHUNK = 'TEXT_MESSAGE_CHUNK',
  TOOL_CALL_START = 'TOOL_CALL_START',
  TOOL_CALL_ARGS = 'TOOL_CALL_ARGS',
  TOOL_CALL_END = 'TOOL_CALL_END',
  TOOL_CALL_RESULT = 'TOOL_CALL_RESULT',
  STATE_SNAPSHOT = 'STATE_SNAPSHOT',
  STATE_DELTA = 'STATE_DELTA',
  // ... more event types
}

Use Cases

Development

Use the default InMemoryAgentRunner for simple, in-process execution:
const runtime = new CopilotRuntime({
  agents: { ... }
  // runner defaults to InMemoryAgentRunner
});

Distributed Systems

Implement a custom runner that uses message queues or distributed state:
const runner = new KafkaAgentRunner(kafkaConfig);
const runtime = new CopilotRuntime({
  agents: { ... },
  runner
});

Persistent Threads

Implement a runner that persists thread history to a database:
const runner = new PostgresAgentRunner(dbConnection);
const runtime = new CopilotRuntime({
  agents: { ... },
  runner
});

Long-running Agents

Implement a runner that offloads execution to worker processes:
const runner = new WorkerPoolAgentRunner(workerPool);
const runtime = new CopilotRuntime({
  agents: { ... },
  runner
});

See Also