Installation
pip install copilotkit[crewai]
The CrewAI extra includes the CrewAI framework as a dependency:
crewai==0.118.0
Plus all base CopilotKit dependencies (LangChain, FastAPI, etc.)
Core Classes
CrewAIAgent
The main class for integrating CrewAI Crews and Flows with CopilotKit.
from copilotkit import CrewAIAgent
agent = CrewAIAgent(
name = "email_agent" ,
description = "This agent sends emails" ,
crew = my_crew,
)
The name of the agent. Must consist of alphanumeric characters, underscores, and hyphens only.
When using a Crew-based agent, pass in a Crew object. Either crew or flow must be provided.
When using a Flow-based agent, pass in a Flow object. Either crew or flow must be provided.
Optional description of the agent. Used when CopilotKit dynamically routes requests.
Optional CopilotKit-specific configuration for advanced customization.
Crew-based Agent Example
from crewai import Agent, Crew, Task
from copilotkit import CrewAIAgent
class EmailCrew :
def crew ( self ):
email_agent = Agent(
role = "Email Specialist" ,
goal = "Send emails effectively" ,
backstory = "Expert at crafting emails" ,
chat_llm = "gpt-4"
)
send_email = Task(
description = "Send an email to {recipient} " ,
expected_output = "Email sent confirmation" ,
agent = email_agent
)
return Crew(
agents = [email_agent],
tasks = [send_email],
chat_llm = "gpt-4"
)
agent = CrewAIAgent(
name = "email_crew" ,
description = "Sends emails using CrewAI" ,
crew = EmailCrew()
)
Important You must set a chat_llm on the Crew object when using Crew-based agents. See the CrewAI docs for more information.
Flow-based Agent Example
from crewai.flow import Flow, start
from copilotkit import CrewAIAgent
class EmailFlow ( Flow ):
@start ()
async def send_email ( self ):
# Flow logic here
pass
agent = CrewAIAgent(
name = "email_flow" ,
description = "Sends emails using CrewAI Flow" ,
flow = EmailFlow()
)
CopilotKitState
State class for CrewAI Flow integration that includes CopilotKit-specific properties.
from copilotkit.crewai import CopilotKitState
from crewai.flow import Flow
class MyFlow (Flow[CopilotKitState]):
@start ()
async def my_method ( self ):
messages = self .state.messages
actions = self .state.copilotkit.actions
State Properties:
messages (List[Any]): The conversation messages
copilotkit (CopilotKitProperties): CopilotKit-specific state
actions (List[Any]): Available actions from the frontend
CopilotKitFlow
Generic Flow class that provides helper methods for CopilotKit integration.
from copilotkit.crewai import CopilotKitFlow
from pydantic import BaseModel
class MyState ( BaseModel ):
messages: list[dict[ str , str ]] = []
tools: list[ dict ] = []
custom_field: str = ""
class MyFlow (CopilotKitFlow[MyState]):
def kickoff ( self , state = None , inputs = None ):
# Access helper methods
messages = self .get_message_history()
tools = self .get_available_tools()
return super ().kickoff(state, inputs)
Helper Methods
get_message_history(system_prompt: Optional[str] = None, max_messages: int = 20) -> List[Dict[str, str]]
Retrieves the conversation history with optional system prompt.
messages = flow.get_message_history(
system_prompt = "You are a helpful assistant" ,
max_messages = 50
)
get_available_tools() -> List[Dict[str, Any]]
Retrieves tools passed from the frontend.
tools = flow.get_available_tools()
format_tools_for_llm(tools_definitions: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Dict[str, Callable]]
Formats CopilotKit tools for LLM consumption and creates proxy functions.
formatted_tools, available_functions = flow.format_tools_for_llm(tools)
CopilotKitConfig
Advanced configuration for customizing CrewAI integration behavior.
from typing import List, Any
def custom_merge_state (
* ,
state : dict ,
messages : List[Any],
actions : List[Any],
agent_name : str ,
flow : Flow
):
# Custom state merging logic
return { ** state, "messages" : messages}
config = {
"merge_state" : custom_merge_state
}
agent = CrewAIAgent(
name = "custom_agent" ,
flow = my_flow,
copilotkit_config = config
)
Function to customize how CopilotKit merges the agent state. Parameters:
state (dict): Current state
messages (List[Any]): Incoming messages
actions (List[Any]): Available actions
agent_name (str): Name of the agent
flow (Flow): The CrewAI Flow instance
Runtime Functions
copilotkit_emit_state
Emits intermediate state to CopilotKit during Flow execution.
from copilotkit.crewai import copilotkit_emit_state
async def my_method ( self ):
for i in range ( 10 ):
await some_operation(i)
await copilotkit_emit_state({ "progress" : i})
The state to emit. Must be JSON serializable.
Returns: Awaitable[bool] - Always returns True.
copilotkit_emit_message
Manually emits a message to CopilotKit.
from copilotkit.crewai import copilotkit_emit_message
async def my_method ( self ):
message = "Processing step 1..."
message_id = await copilotkit_emit_message(message)
Returns: Awaitable[str] - The message ID.
Manually emits a tool call to CopilotKit.
from copilotkit.crewai import copilotkit_emit_tool_call
async def my_method ( self ):
tool_call_id = await copilotkit_emit_tool_call(
name = "SearchTool" ,
args = { "query" : "AI agents" }
)
The name of the tool to emit.
Returns: Awaitable[str] - The tool call ID.
copilotkit_exit
Signals CopilotKit to exit the current agent after the run completes.
from copilotkit.crewai import copilotkit_exit
async def my_method ( self ):
# Perform some work
await copilotkit_exit()
Returns: Awaitable[bool] - Always returns True.
Note Calling copilotkit_exit() does not immediately stop the agent. It signals to CopilotKit to stop after the current run completes.
copilotkit_stream
Stream LiteLLM responses token by token to CopilotKit.
from copilotkit.crewai import copilotkit_stream
from litellm import completion
async def my_method ( self ):
messages = self .get_message_history()
response = await copilotkit_stream(
completion(
model = "openai/gpt-4o" ,
messages = messages,
tools = tools,
stream = True # Must be True for streaming
)
)
# response is a ModelResponse object
message = response.choices[ 0 ][ "message" ]
response
ModelResponse | CustomStreamWrapper
required
The LiteLLM completion response to stream. Must be created with stream=True.
Returns: ModelResponse - The complete response after streaming.
Important The stream=True parameter must be set on the completion() call for streaming to work.
copilotkit_predict_state
Configure streaming tool calls as state updates.
from copilotkit.crewai import copilotkit_predict_state
async def my_method ( self ):
await copilotkit_predict_state({
"steps" : {
"tool_name" : "SearchTool" ,
"tool_argument" : "steps" ,
},
"results" : {
"tool_name" : "AnalyzeTool" ,
# Omit tool_argument to emit all arguments under this state key
}
})
config
Dict[str, PredictStateConfig]
required
Configuration mapping state keys to tool configurations. Each configuration contains:
tool_name (str): The name of the tool to monitor
tool_argument (str, optional): Specific argument to emit. If omitted, all arguments are emitted.
Returns: Awaitable[bool] - Always returns True.
Message Conversion
copilotkit_messages_to_crewai_flow
Converts CopilotKit messages to CrewAI Flow format.
from copilotkit.crewai import copilotkit_messages_to_crewai_flow
crewai_messages = copilotkit_messages_to_crewai_flow(copilotkit_messages)
The CopilotKit messages to convert.
Returns: List[Any] - The converted CrewAI Flow messages.
crewai_flow_messages_to_copilotkit
Converts CrewAI Flow messages to CopilotKit format.
from copilotkit.crewai import crewai_flow_messages_to_copilotkit
copilotkit_messages = crewai_flow_messages_to_copilotkit(crewai_messages)
The CrewAI Flow messages to convert.
Returns: List[Message] - The converted CopilotKit messages.
Event System
Event emitted when a tool call is made through CopilotKit.
from copilotkit.crewai import CopilotKitToolCallEvent
event = CopilotKitToolCallEvent(
tool_name = "SearchTool" ,
args = { "query" : "AI" }
)
Properties:
type (str): Always “copilotkit_frontend_tool_call”
tool_name (str): The name of the tool
args (Dict[str, Any]): The tool arguments
timestamp (str): ISO format timestamp
CopilotKitStateUpdateEvent
Event for state updates in CopilotKit.
from copilotkit.crewai import CopilotKitStateUpdateEvent
event = CopilotKitStateUpdateEvent(
tool_name = "ProcessTool" ,
args = { "status" : "complete" }
)
Properties:
type (str): Always “copilotkit_state_update”
tool_name (str): The tool name
args (Dict[str, Any]): The state arguments
timestamp (str): ISO format timestamp
Register a listener for tool call events.
from copilotkit.crewai import register_tool_call_listener
from crewai.utilities.events import crewai_event_bus
register_tool_call_listener()
@crewai_event_bus.on (CopilotKitToolCallEvent)
def on_tool_call ( source , event ):
print ( f "Tool called: { event.tool_name } " )
print ( f "Args: { event.args } " )
emit_copilotkit_state_update_event
Emit a state update event to the client UI.
from copilotkit.crewai import emit_copilotkit_state_update_event
emit_copilotkit_state_update_event(
"write_document" ,
{ "document" : state.data[ "document" ]}
)
The name of the tool associated with the state update.
Helper Utilities
Predefined state model for common Flow input scenarios.
from copilotkit.crewai import FlowInputState
class MyState ( FlowInputState ):
custom_field: str = ""
Properties:
messages (List[Dict[str, str]]): Current messages from the user
tools (List[Dict[str, Any]]): CopilotKit tool format
conversation_history (List[Dict[str, str]]): Full conversation history
Creates a proxy function for a frontend tool.
from copilotkit.crewai import create_tool_proxy
search_tool = create_tool_proxy( "SearchTool" )
result = search_tool( query = "AI agents" , max_results = 5 )
The name of the tool to create a proxy for.
Returns: Callable - A proxy function that emits tool call events.
Global list tracking all tool calls made during execution.
from copilotkit.crewai import tool_calls_log
# View all tool calls
for call in tool_calls_log:
print ( f "Tool: { call[ 'tool_name' ] } " )
print ( f "Args: { call[ 'args' ] } " )
print ( f "Time: { call[ 'timestamp' ] } " )
FastAPI Integration
CrewAI agents use the same FastAPI integration as LangGraph agents:
from fastapi import FastAPI
from copilotkit import CopilotKitRemoteEndpoint, CrewAIAgent
from copilotkit.integrations.fastapi import add_fastapi_endpoint
app = FastAPI()
agent = CrewAIAgent( name = "my_crew" , crew = my_crew)
sdk = CopilotKitRemoteEndpoint( agents = [agent])
add_fastapi_endpoint(app, sdk, "/copilotkit" )
Complete Example
Here’s a complete example showing a CrewAI Flow agent with CopilotKit:
from fastapi import FastAPI
from crewai.flow import Flow, start
from litellm import completion
from pydantic import BaseModel
from copilotkit import CopilotKitRemoteEndpoint, CrewAIAgent
from copilotkit.integrations.fastapi import add_fastapi_endpoint
from copilotkit.crewai import (
copilotkit_stream,
copilotkit_emit_state,
copilotkit_exit,
CopilotKitFlow
)
# Define state
class ResearchState ( BaseModel ):
messages: list[dict[ str , str ]] = []
tools: list[ dict ] = []
research_results: str = ""
progress: int = 0
# Create Flow
class ResearchFlow (CopilotKitFlow[ResearchState]):
@start ()
async def research ( self ):
messages = self .get_message_history(
system_prompt = "You are a research assistant" ,
max_messages = 20
)
tools = self .get_available_tools()
# Emit progress
await copilotkit_emit_state({ "progress" : 50 })
# Stream LLM response
response = await copilotkit_stream(
completion(
model = "gpt-4" ,
messages = messages,
tools = tools,
stream = True
)
)
message = response.choices[ 0 ][ "message" ]
self .state.messages.append(message)
# Check if research is complete
if "research complete" in message.get( "content" , "" ).lower():
await copilotkit_exit()
# Create agent
agent = CrewAIAgent(
name = "research_agent" ,
description = "A research assistant using CrewAI" ,
flow = ResearchFlow()
)
# Setup FastAPI
app = FastAPI()
sdk = CopilotKitRemoteEndpoint( agents = [agent])
add_fastapi_endpoint(app, sdk, "/copilotkit" )
if __name__ == "__main__" :
import uvicorn
uvicorn.run(app, host = "0.0.0.0" , port = 8000 )
Crew-based Agent with Chat LLM
from crewai import Agent, Crew, Task, Process
from copilotkit import CrewAIAgent
class EmailCrew :
def crew ( self ):
# Define agents
email_writer = Agent(
role = "Email Writer" ,
goal = "Write professional emails" ,
backstory = "Expert in business communication" ,
)
# Define tasks
write_email = Task(
description = "Write an email to {recipient} about {topic} " ,
expected_output = "A well-formatted email" ,
agent = email_writer
)
# Create crew with chat_llm
return Crew(
agents = [email_writer],
tasks = [write_email],
process = Process.sequential,
chat_llm = "gpt-4" # Required for CopilotKit
)
# Create CopilotKit agent
agent = CrewAIAgent(
name = "email_crew" ,
description = "Sends professional emails" ,
crew = EmailCrew()
)
Type Hints
The CrewAI SDK uses proper Python type hints:
from typing import List, Dict, Any, Optional, Literal
from crewai import Crew, Flow
from pydantic import BaseModel
# All functions are fully typed
async def copilotkit_emit_state ( state : Any) -> Literal[ True ]:
...
async def copilotkit_emit_message ( message : str ) -> str :
...
async def copilotkit_stream (
response : ModelResponse | CustomStreamWrapper
) -> ModelResponse:
...
Error Handling
Use the same error handling as LangGraph agents:
from copilotkit.exc import (
AgentNotFoundException,
AgentExecutionException
)
try :
result = await sdk.execute_agent( ... )
except AgentNotFoundException as e:
print ( f "Agent not found: { e } " )
except AgentExecutionException as e:
print ( f "Agent execution failed: { e } " )