Overview
AbstractAgent is the base class/interface that all agents must implement. It defines the contract for agent execution, state management, and lifecycle hooks. You can extend this class to create custom agent implementations.
Class Definition
abstract class AbstractAgent {
abstract run(input: RunAgentInput): Observable<BaseEvent>;
clone?(): AbstractAgent;
abortRun?(): void;
// Middleware support
protected middlewares: Middleware[];
use(middleware: Middleware): void;
}
Methods
run()
Execute the agent with given input and stream events back.
abstract run(input: RunAgentInput): Observable<BaseEvent>
Input data for the agent run.
Returns: An RxJS Observable that emits AG-UI protocol events.
interface RunAgentInput {
runId: string;
threadId: string;
messages: Message[];
tools: Tool[];
context: ContextItem[];
state?: any;
forwardedProps?: Record<string, unknown>;
}
Unique identifier for this specific run.
Identifier for the conversation thread. Used to maintain history across runs.
Array of previous messages in the conversation.messages: [
{ id: '1', role: 'user', content: 'Hello!' },
{ id: '2', role: 'assistant', content: 'Hi! How can I help?' }
]
Array of tools available to the agent for this run.tools: [
{
name: 'search',
description: 'Search the database',
parameters: { ... }
}
]
Application context provided by the client. Read-only information for the agent.context: [
{
description: 'Current user',
value: JSON.stringify({ id: '123', name: 'John' })
}
]
Application state that the agent can modify. Agents can update this via state events.state: {
user: { id: '123', name: 'John' },
items: []
}
Additional properties forwarded from the client. Can be used to override agent settings.forwardedProps: {
temperature: 0.9,
maxOutputTokens: 2000
}
clone()
Create a copy of the agent. Optional but recommended for stateful agents.
Returns: A new agent instance with the same configuration.
Example:
class MyAgent extends AbstractAgent {
constructor(private config: MyConfig) {
super();
}
clone(): MyAgent {
return new MyAgent(this.config);
}
}
abortRun()
Abort the currently running execution. Optional but recommended.
Example:
class MyAgent extends AbstractAgent {
private abortController?: AbortController;
run(input: RunAgentInput): Observable<BaseEvent> {
return new Observable((subscriber) => {
this.abortController = new AbortController();
// Use abortController.signal in your implementation
});
}
abortRun(): void {
this.abortController?.abort();
}
}
use()
Add middleware to the agent. Middleware can intercept and modify agent execution.
use(middleware: Middleware): void
Middleware function to add to the agent’s middleware chain.
Events
Agents must emit AG-UI protocol events through the Observable. Common event types:
RUN_STARTED
Emitted when the run begins.
{
type: EventType.RUN_STARTED,
threadId: string,
runId: string
}
TEXT_MESSAGE_CHUNK
Emitted for each chunk of generated text.
{
type: EventType.TEXT_MESSAGE_CHUNK,
role: 'assistant',
messageId: string,
delta: string // Text chunk
}
Emitted when a tool call begins.
{
type: EventType.TOOL_CALL_START,
parentMessageId: string,
toolCallId: string,
toolCallName: string
}
Emitted with tool arguments (may be streamed in chunks).
{
type: EventType.TOOL_CALL_ARGS,
toolCallId: string,
delta: string // JSON chunk
}
Emitted when tool arguments are complete.
{
type: EventType.TOOL_CALL_END,
toolCallId: string
}
Emitted with tool execution result.
{
type: EventType.TOOL_CALL_RESULT,
role: 'tool',
messageId: string,
toolCallId: string,
content: string // JSON result
}
STATE_SNAPSHOT
Emitted when agent replaces entire application state.
{
type: EventType.STATE_SNAPSHOT,
snapshot: any // New state object
}
STATE_DELTA
Emitted when agent applies incremental state updates.
{
type: EventType.STATE_DELTA,
delta: JSONPatchOperation[] // JSON Patch ops
}
RUN_FINISHED
Emitted when the run completes successfully.
{
type: EventType.RUN_FINISHED,
threadId: string,
runId: string
}
RUN_ERROR
Emitted when an error occurs.
{
type: EventType.RUN_ERROR,
message: string
}
Custom Implementation Example
Here’s a complete example of a custom agent:
import { AbstractAgent, RunAgentInput } from '@ag-ui/client';
import { Observable } from 'rxjs';
import { EventType } from '@ag-ui/client';
export class CustomLLMAgent extends AbstractAgent {
constructor(
private llmClient: LLMClient,
private systemPrompt: string
) {
super();
}
run(input: RunAgentInput): Observable<BaseEvent> {
return new Observable((subscriber) => {
(async () => {
try {
// Emit RUN_STARTED
subscriber.next({
type: EventType.RUN_STARTED,
threadId: input.threadId,
runId: input.runId
});
// Build prompt with context and state
const prompt = this.buildPrompt(input);
// Generate message ID
const messageId = this.generateId();
// Stream response from LLM
const stream = await this.llmClient.streamCompletion({
messages: this.formatMessages(input.messages),
prompt: prompt,
tools: input.tools
});
for await (const chunk of stream) {
if (chunk.type === 'text') {
// Emit text chunk
subscriber.next({
type: EventType.TEXT_MESSAGE_CHUNK,
role: 'assistant',
messageId,
delta: chunk.text
});
} else if (chunk.type === 'tool_call') {
// Emit tool call events
subscriber.next({
type: EventType.TOOL_CALL_START,
parentMessageId: messageId,
toolCallId: chunk.id,
toolCallName: chunk.name
});
subscriber.next({
type: EventType.TOOL_CALL_ARGS,
toolCallId: chunk.id,
delta: JSON.stringify(chunk.args)
});
subscriber.next({
type: EventType.TOOL_CALL_END,
toolCallId: chunk.id
});
// Execute tool (if it's a frontend tool, it would be executed client-side)
// For this example, we'll emit a result immediately
subscriber.next({
type: EventType.TOOL_CALL_RESULT,
role: 'tool',
messageId: this.generateId(),
toolCallId: chunk.id,
content: JSON.stringify({ success: true })
});
}
}
// Emit RUN_FINISHED
subscriber.next({
type: EventType.RUN_FINISHED,
threadId: input.threadId,
runId: input.runId
});
subscriber.complete();
} catch (error) {
// Emit error event
subscriber.next({
type: EventType.RUN_ERROR,
message: error.message
});
subscriber.error(error);
}
})();
});
}
clone(): CustomLLMAgent {
return new CustomLLMAgent(this.llmClient, this.systemPrompt);
}
private buildPrompt(input: RunAgentInput): string {
const parts = [this.systemPrompt];
if (input.context.length > 0) {
parts.push('\n## Context\n');
for (const ctx of input.context) {
parts.push(`${ctx.description}:\n${ctx.value}\n`);
}
}
if (input.state) {
parts.push('\n## Application State\n');
parts.push(JSON.stringify(input.state, null, 2));
}
return parts.join('');
}
private formatMessages(messages: Message[]): any[] {
return messages.map(msg => ({
role: msg.role,
content: msg.content
}));
}
private generateId(): string {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
Usage Example
// Create custom agent
const agent = new CustomLLMAgent(llmClient, 'You are a helpful assistant.');
// Use in runtime
const runtime = new CopilotRuntime({
agents: {
'custom': agent
}
});
// Or execute directly
const events$ = agent.run({
runId: 'run-123',
threadId: 'thread-456',
messages: [
{ id: '1', role: 'user', content: 'Hello!' }
],
tools: [],
context: [],
state: {}
});
events$.subscribe({
next: (event) => console.log('Event:', event),
complete: () => console.log('Run complete'),
error: (err) => console.error('Error:', err)
});
LangGraph Integration Example
import { AbstractAgent, RunAgentInput } from '@ag-ui/client';
import { Observable } from 'rxjs';
import { StateGraph } from '@langchain/langgraph';
export class LangGraphAgent extends AbstractAgent {
constructor(private graph: StateGraph) {
super();
}
run(input: RunAgentInput): Observable<BaseEvent> {
return new Observable((subscriber) => {
(async () => {
subscriber.next({
type: EventType.RUN_STARTED,
threadId: input.threadId,
runId: input.runId
});
try {
// Execute LangGraph
const stream = await this.graph.stream({
messages: input.messages,
tools: input.tools,
context: input.context,
state: input.state
});
for await (const event of stream) {
// Convert LangGraph events to AG-UI events
const agEvent = this.convertEvent(event);
if (agEvent) {
subscriber.next(agEvent);
}
}
subscriber.next({
type: EventType.RUN_FINISHED,
threadId: input.threadId,
runId: input.runId
});
subscriber.complete();
} catch (error) {
subscriber.next({
type: EventType.RUN_ERROR,
message: error.message
});
subscriber.error(error);
}
})();
});
}
clone(): LangGraphAgent {
return new LangGraphAgent(this.graph);
}
private convertEvent(langGraphEvent: any): BaseEvent | null {
// Convert LangGraph events to AG-UI events
// Implementation depends on your LangGraph setup
return null;
}
}
Best Practices
- Always emit RUN_STARTED at the beginning of execution
- Always emit RUN_FINISHED or RUN_ERROR at the end
- Stream events as they occur for real-time feedback
- Generate unique IDs for messages and tool calls
- Handle errors gracefully and emit RUN_ERROR events
- Implement clone() if your agent has mutable state
- Implement abortRun() for long-running operations
- Support middleware via the
use() method
See Also