Skip to main content

Overview

AbstractAgent is the base class/interface that all agents must implement. It defines the contract for agent execution, state management, and lifecycle hooks. You can extend this class to create custom agent implementations.

Class Definition

abstract class AbstractAgent {
  abstract run(input: RunAgentInput): Observable<BaseEvent>;
  clone?(): AbstractAgent;
  abortRun?(): void;
  
  // Middleware support
  protected middlewares: Middleware[];
  use(middleware: Middleware): void;
}

Methods

run()

Execute the agent with given input and stream events back.
abstract run(input: RunAgentInput): Observable<BaseEvent>
input
RunAgentInput
required
Input data for the agent run.
Returns: An RxJS Observable that emits AG-UI protocol events.

RunAgentInput

interface RunAgentInput {
  runId: string;
  threadId: string;
  messages: Message[];
  tools: Tool[];
  context: ContextItem[];
  state?: any;
  forwardedProps?: Record<string, unknown>;
}
runId
string
required
Unique identifier for this specific run.
threadId
string
required
Identifier for the conversation thread. Used to maintain history across runs.
messages
Message[]
required
Array of previous messages in the conversation.
messages: [
  { id: '1', role: 'user', content: 'Hello!' },
  { id: '2', role: 'assistant', content: 'Hi! How can I help?' }
]
tools
Tool[]
required
Array of tools available to the agent for this run.
tools: [
  {
    name: 'search',
    description: 'Search the database',
    parameters: { ... }
  }
]
context
ContextItem[]
required
Application context provided by the client. Read-only information for the agent.
context: [
  {
    description: 'Current user',
    value: JSON.stringify({ id: '123', name: 'John' })
  }
]
state
any
Application state that the agent can modify. Agents can update this via state events.
state: {
  user: { id: '123', name: 'John' },
  items: []
}
forwardedProps
Record<string, unknown>
Additional properties forwarded from the client. Can be used to override agent settings.
forwardedProps: {
  temperature: 0.9,
  maxOutputTokens: 2000
}

clone()

Create a copy of the agent. Optional but recommended for stateful agents.
clone?(): AbstractAgent
Returns: A new agent instance with the same configuration. Example:
class MyAgent extends AbstractAgent {
  constructor(private config: MyConfig) {
    super();
  }
  
  clone(): MyAgent {
    return new MyAgent(this.config);
  }
}

abortRun()

Abort the currently running execution. Optional but recommended.
abortRun?(): void
Example:
class MyAgent extends AbstractAgent {
  private abortController?: AbortController;
  
  run(input: RunAgentInput): Observable<BaseEvent> {
    return new Observable((subscriber) => {
      this.abortController = new AbortController();
      // Use abortController.signal in your implementation
    });
  }
  
  abortRun(): void {
    this.abortController?.abort();
  }
}

use()

Add middleware to the agent. Middleware can intercept and modify agent execution.
use(middleware: Middleware): void
middleware
Middleware
required
Middleware function to add to the agent’s middleware chain.

Events

Agents must emit AG-UI protocol events through the Observable. Common event types:

RUN_STARTED

Emitted when the run begins.
{
  type: EventType.RUN_STARTED,
  threadId: string,
  runId: string
}

TEXT_MESSAGE_CHUNK

Emitted for each chunk of generated text.
{
  type: EventType.TEXT_MESSAGE_CHUNK,
  role: 'assistant',
  messageId: string,
  delta: string  // Text chunk
}

TOOL_CALL_START

Emitted when a tool call begins.
{
  type: EventType.TOOL_CALL_START,
  parentMessageId: string,
  toolCallId: string,
  toolCallName: string
}

TOOL_CALL_ARGS

Emitted with tool arguments (may be streamed in chunks).
{
  type: EventType.TOOL_CALL_ARGS,
  toolCallId: string,
  delta: string  // JSON chunk
}

TOOL_CALL_END

Emitted when tool arguments are complete.
{
  type: EventType.TOOL_CALL_END,
  toolCallId: string
}

TOOL_CALL_RESULT

Emitted with tool execution result.
{
  type: EventType.TOOL_CALL_RESULT,
  role: 'tool',
  messageId: string,
  toolCallId: string,
  content: string  // JSON result
}

STATE_SNAPSHOT

Emitted when agent replaces entire application state.
{
  type: EventType.STATE_SNAPSHOT,
  snapshot: any  // New state object
}

STATE_DELTA

Emitted when agent applies incremental state updates.
{
  type: EventType.STATE_DELTA,
  delta: JSONPatchOperation[]  // JSON Patch ops
}

RUN_FINISHED

Emitted when the run completes successfully.
{
  type: EventType.RUN_FINISHED,
  threadId: string,
  runId: string
}

RUN_ERROR

Emitted when an error occurs.
{
  type: EventType.RUN_ERROR,
  message: string
}

Custom Implementation Example

Here’s a complete example of a custom agent:
import { AbstractAgent, RunAgentInput } from '@ag-ui/client';
import { Observable } from 'rxjs';
import { EventType } from '@ag-ui/client';

export class CustomLLMAgent extends AbstractAgent {
  constructor(
    private llmClient: LLMClient,
    private systemPrompt: string
  ) {
    super();
  }

  run(input: RunAgentInput): Observable<BaseEvent> {
    return new Observable((subscriber) => {
      (async () => {
        try {
          // Emit RUN_STARTED
          subscriber.next({
            type: EventType.RUN_STARTED,
            threadId: input.threadId,
            runId: input.runId
          });

          // Build prompt with context and state
          const prompt = this.buildPrompt(input);
          
          // Generate message ID
          const messageId = this.generateId();

          // Stream response from LLM
          const stream = await this.llmClient.streamCompletion({
            messages: this.formatMessages(input.messages),
            prompt: prompt,
            tools: input.tools
          });

          for await (const chunk of stream) {
            if (chunk.type === 'text') {
              // Emit text chunk
              subscriber.next({
                type: EventType.TEXT_MESSAGE_CHUNK,
                role: 'assistant',
                messageId,
                delta: chunk.text
              });
            } else if (chunk.type === 'tool_call') {
              // Emit tool call events
              subscriber.next({
                type: EventType.TOOL_CALL_START,
                parentMessageId: messageId,
                toolCallId: chunk.id,
                toolCallName: chunk.name
              });
              
              subscriber.next({
                type: EventType.TOOL_CALL_ARGS,
                toolCallId: chunk.id,
                delta: JSON.stringify(chunk.args)
              });
              
              subscriber.next({
                type: EventType.TOOL_CALL_END,
                toolCallId: chunk.id
              });

              // Execute tool (if it's a frontend tool, it would be executed client-side)
              // For this example, we'll emit a result immediately
              subscriber.next({
                type: EventType.TOOL_CALL_RESULT,
                role: 'tool',
                messageId: this.generateId(),
                toolCallId: chunk.id,
                content: JSON.stringify({ success: true })
              });
            }
          }

          // Emit RUN_FINISHED
          subscriber.next({
            type: EventType.RUN_FINISHED,
            threadId: input.threadId,
            runId: input.runId
          });

          subscriber.complete();
        } catch (error) {
          // Emit error event
          subscriber.next({
            type: EventType.RUN_ERROR,
            message: error.message
          });
          subscriber.error(error);
        }
      })();
    });
  }

  clone(): CustomLLMAgent {
    return new CustomLLMAgent(this.llmClient, this.systemPrompt);
  }

  private buildPrompt(input: RunAgentInput): string {
    const parts = [this.systemPrompt];

    if (input.context.length > 0) {
      parts.push('\n## Context\n');
      for (const ctx of input.context) {
        parts.push(`${ctx.description}:\n${ctx.value}\n`);
      }
    }

    if (input.state) {
      parts.push('\n## Application State\n');
      parts.push(JSON.stringify(input.state, null, 2));
    }

    return parts.join('');
  }

  private formatMessages(messages: Message[]): any[] {
    return messages.map(msg => ({
      role: msg.role,
      content: msg.content
    }));
  }

  private generateId(): string {
    return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
}

Usage Example

// Create custom agent
const agent = new CustomLLMAgent(llmClient, 'You are a helpful assistant.');

// Use in runtime
const runtime = new CopilotRuntime({
  agents: {
    'custom': agent
  }
});

// Or execute directly
const events$ = agent.run({
  runId: 'run-123',
  threadId: 'thread-456',
  messages: [
    { id: '1', role: 'user', content: 'Hello!' }
  ],
  tools: [],
  context: [],
  state: {}
});

events$.subscribe({
  next: (event) => console.log('Event:', event),
  complete: () => console.log('Run complete'),
  error: (err) => console.error('Error:', err)
});

LangGraph Integration Example

import { AbstractAgent, RunAgentInput } from '@ag-ui/client';
import { Observable } from 'rxjs';
import { StateGraph } from '@langchain/langgraph';

export class LangGraphAgent extends AbstractAgent {
  constructor(private graph: StateGraph) {
    super();
  }

  run(input: RunAgentInput): Observable<BaseEvent> {
    return new Observable((subscriber) => {
      (async () => {
        subscriber.next({
          type: EventType.RUN_STARTED,
          threadId: input.threadId,
          runId: input.runId
        });

        try {
          // Execute LangGraph
          const stream = await this.graph.stream({
            messages: input.messages,
            tools: input.tools,
            context: input.context,
            state: input.state
          });

          for await (const event of stream) {
            // Convert LangGraph events to AG-UI events
            const agEvent = this.convertEvent(event);
            if (agEvent) {
              subscriber.next(agEvent);
            }
          }

          subscriber.next({
            type: EventType.RUN_FINISHED,
            threadId: input.threadId,
            runId: input.runId
          });

          subscriber.complete();
        } catch (error) {
          subscriber.next({
            type: EventType.RUN_ERROR,
            message: error.message
          });
          subscriber.error(error);
        }
      })();
    });
  }

  clone(): LangGraphAgent {
    return new LangGraphAgent(this.graph);
  }

  private convertEvent(langGraphEvent: any): BaseEvent | null {
    // Convert LangGraph events to AG-UI events
    // Implementation depends on your LangGraph setup
    return null;
  }
}

Best Practices

  1. Always emit RUN_STARTED at the beginning of execution
  2. Always emit RUN_FINISHED or RUN_ERROR at the end
  3. Stream events as they occur for real-time feedback
  4. Generate unique IDs for messages and tool calls
  5. Handle errors gracefully and emit RUN_ERROR events
  6. Implement clone() if your agent has mutable state
  7. Implement abortRun() for long-running operations
  8. Support middleware via the use() method

See Also