Skip to main content
CrewAI enables you to create collaborative AI agent teams where multiple specialized agents work together. CopilotKit’s CrewAIAgent integrates both Crew-based and Flow-based CrewAI patterns into your application.

Installation

pip install "copilotkit[crewai]"
This installs CopilotKit with CrewAI support.

Quick Start

Using a CrewAI Crew

from copilotkit import CrewAIAgent
from crewai import Crew, Agent, Task

# Define your crew
researcher = Agent(
    role="Research Analyst",
    goal="Research and analyze information",
    backstory="Expert at finding and synthesizing information"
)

writer = Agent(
    role="Content Writer",
    goal="Write engaging content",
    backstory="Skilled writer who crafts compelling narratives"
)

research_task = Task(
    description="Research the topic: {topic}",
    agent=researcher,
    expected_output="A comprehensive research report"
)

writing_task = Task(
    description="Write an article based on the research",
    agent=writer,
    expected_output="A well-written article"
)

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, writing_task],
    verbose=True
)

# Create CopilotKit agent
agent = CrewAIAgent(
    name="content_crew",
    description="A research and writing crew",
    crew=crew
)

Using a CrewAI Flow

from copilotkit import CrewAIAgent
from crewai import Flow
from crewai.flow import start, listen

class ContentFlow(Flow):
    @start()
    def research_phase(self):
        # Research logic
        results = self.research(self.state.get("topic"))
        return {"research": results}
    
    @listen("research_phase")
    def writing_phase(self):
        # Writing logic based on research
        research = self.state.get("research")
        article = self.write_article(research)
        return {"article": article}

flow = ContentFlow()

agent = CrewAIAgent(
    name="content_flow",
    description="A content creation flow",
    flow=flow
)

Creating a CrewAI Agent

Configuration

name
str
required
The unique identifier for your agent. Must contain only alphanumeric characters, underscores, and hyphens.
crew
Crew
A CrewAI Crew instance. Either crew or flow must be provided.
flow
Flow
A CrewAI Flow instance. Either crew or flow must be provided.
description
str
Description of the agent’s purpose. Used for dynamic routing when multiple agents are available.
copilotkit_config
CopilotKitConfig
Advanced CopilotKit configuration for customizing state merging.

Crew-Based Agents

Complete Crew Example

from copilotkit import CrewAIAgent
from crewai import Crew, Agent, Task, Process
from langchain_openai import ChatOpenAI

# Initialize LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)

# Define specialized agents
data_analyst = Agent(
    role="Data Analyst",
    goal="Analyze data and extract insights",
    backstory="Expert data scientist with years of experience",
    llm=llm,
    verbose=True
)

report_writer = Agent(
    role="Report Writer",
    goal="Create clear, actionable reports",
    backstory="Professional report writer with business acumen",
    llm=llm,
    verbose=True
)

# Define tasks
analysis_task = Task(
    description="Analyze the provided data for trends and patterns: {data}",
    agent=data_analyst,
    expected_output="A detailed analysis with key findings and metrics"
)

report_task = Task(
    description="Write a comprehensive report based on the analysis",
    agent=report_writer,
    expected_output="A professional report with recommendations",
    context=[analysis_task]  # Depends on analysis
)

# Create crew with sequential process
analytics_crew = Crew(
    agents=[data_analyst, report_writer],
    tasks=[analysis_task, report_task],
    process=Process.sequential,
    verbose=True,
    chat_llm=llm  # Required for crew-based agents
)

# Create CopilotKit agent
agent = CrewAIAgent(
    name="analytics_crew",
    description="Data analytics and reporting crew",
    crew=analytics_crew
)

Hierarchical Process

from crewai import Process

# Define manager agent
manager = Agent(
    role="Project Manager",
    goal="Coordinate team and ensure quality",
    backstory="Experienced project manager",
    llm=llm,
    allow_delegation=True
)

crew = Crew(
    agents=[manager, data_analyst, report_writer],
    tasks=[analysis_task, report_task],
    process=Process.hierarchical,
    manager_agent=manager,
    verbose=True,
    chat_llm=llm
)

Flow-Based Agents

Basic Flow Structure

from crewai import Flow
from crewai.flow import start, listen, router
from pydantic import BaseModel

class EmailFlowState(BaseModel):
    recipient: str = ""
    subject: str = ""
    draft: str = ""
    approved: bool = False

class EmailFlow(Flow[EmailFlowState]):
    @start()
    def draft_email(self):
        """Draft an email based on user input"""
        messages = self.state.get("messages", [])
        # Your drafting logic
        return {
            "draft": draft_text,
            "subject": subject
        }
    
    @listen("draft_email")
    def review_draft(self):
        """Review the draft"""
        draft = self.state.get("draft")
        # Review logic
        return {"approved": is_approved}
    
    @router("review_draft")
    def check_approval(self):
        """Route based on approval"""
        if self.state.get("approved"):
            return "send_email"
        return "draft_email"
    
    @listen("check_approval")
    def send_email(self):
        """Send the approved email"""
        draft = self.state.get("draft")
        recipient = self.state.get("recipient")
        # Send logic
        return {"sent": True}

flow = EmailFlow()

agent = CrewAIAgent(
    name="email_flow",
    description="Email drafting and sending flow",
    flow=flow
)

Multi-Path Flow

class CustomerServiceFlow(Flow):
    @start()
    def categorize_request(self):
        """Categorize the customer request"""
        messages = self.state.get("messages", [])
        category = self.categorize(messages)
        return {"category": category}
    
    @router("categorize_request")
    def route_request(self):
        """Route to appropriate handler"""
        category = self.state.get("category")
        if category == "technical":
            return "handle_technical"
        elif category == "billing":
            return "handle_billing"
        return "handle_general"
    
    @listen("route_request")
    def handle_technical(self):
        """Handle technical issues"""
        return {"resolution": "..."}
    
    @listen("route_request")
    def handle_billing(self):
        """Handle billing issues"""
        return {"resolution": "..."}
    
    @listen("route_request")
    def handle_general(self):
        """Handle general inquiries"""
        return {"resolution": "..."}

Working with Tools

Adding Tools to Agents

from crewai import Agent, Task, Crew
from langchain_core.tools import tool

@tool
def search_database(query: str) -> str:
    """Search the internal database"""
    # Your search implementation
    return results

@tool
def send_notification(user_id: str, message: str) -> dict:
    """Send a notification to a user"""
    # Your notification implementation
    return {"success": True}

# Add tools to agents
support_agent = Agent(
    role="Support Agent",
    goal="Help customers with their issues",
    backstory="Experienced customer support specialist",
    tools=[search_database, send_notification],
    llm=llm,
    verbose=True
)

Dynamic Tool Access in Flows

class ToolFlow(Flow):
    @start()
    def use_tools(self):
        """Use tools from CopilotKit actions"""
        # Access actions passed from CopilotKit
        actions = self.state.get("copilotkit", {}).get("actions", [])
        
        # Your tool usage logic
        for action in actions:
            tool_name = action["function"]["name"]
            # Use the tool
        
        return {"result": "..."}

State Management

Flow State with Persistence

from crewai import Flow
from crewai.flow.persistence import SQLitePersistence

class PersistentFlow(Flow):
    def __init__(self):
        super().__init__()
        # Configure persistence
        self._persistence = SQLitePersistence("flow_state.db")

flow = PersistentFlow()

agent = CrewAIAgent(
    name="persistent_flow",
    flow=flow
)

# State is automatically persisted across sessions

Custom State Merging

def custom_merge_state(*, state, messages, actions, agent_name, flow):
    """Custom state merge logic for CrewAI flows"""
    return {
        **state,
        "messages": messages,
        "copilotkit": {
            "actions": actions
        },
        "custom_field": "value"
    }

agent = CrewAIAgent(
    name="custom_flow",
    flow=flow,
    copilotkit_config={
        "merge_state": custom_merge_state
    }
)

Streaming and Progress Updates

Emit Progress from Flow

from copilotkit.crewai.crewai_sdk import copilotkit_stream

class ProgressFlow(Flow):
    @start()
    async def long_running_task(self):
        """A task that reports progress"""
        for i in range(100):
            # Do work
            await self.emit_progress(i)
            
        return {"completed": True}
    
    async def emit_progress(self, progress: int):
        """Helper to emit progress updates"""
        # Update state to trigger frontend update
        self.state["progress"] = progress

Exit Flow Early

from copilotkit.crewai.crewai_sdk import copilotkit_exit

class CancellableFlow(Flow):
    @start()
    async def main_flow(self):
        # Check if user wants to cancel
        if self.should_cancel():
            await copilotkit_exit()
            return {"cancelled": True}
        
        # Continue with flow
        return {"completed": True}

Multi-Agent Collaboration

Delegating Between Agents

researcher = Agent(
    role="Researcher",
    goal="Research information",
    backstory="...",
    llm=llm
)

analyst = Agent(
    role="Analyst",
    goal="Analyze research findings",
    backstory="...",
    llm=llm,
    allow_delegation=True  # Can delegate to other agents
)

writer = Agent(
    role="Writer",
    goal="Write final report",
    backstory="...",
    llm=llm
)

# Tasks can be delegated within the crew
research_task = Task(
    description="Research topic: {topic}",
    agent=researcher,
    expected_output="Research findings"
)

analysis_task = Task(
    description="Analyze the research and provide insights",
    agent=analyst,
    expected_output="Analysis report",
    context=[research_task]
)

writing_task = Task(
    description="Write final report",
    agent=writer,
    expected_output="Final report",
    context=[analysis_task]
)

crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, analysis_task, writing_task],
    process=Process.sequential,
    verbose=True,
    chat_llm=llm
)

Using with FastAPI

from fastapi import FastAPI
from copilotkit import CrewAIAgent
from copilotkit.integrations.fastapi import add_fastapi_endpoint

app = FastAPI()

# Create your crew or flow
agent = CrewAIAgent(
    name="my_crew",
    crew=crew  # or flow=flow
)

# Add endpoint
add_fastapi_endpoint(
    app,
    agents=[agent],
    endpoint="/copilotkit"
)

Error Handling

class RobustFlow(Flow):
    @start()
    def safe_operation(self):
        try:
            # Your flow logic
            result = self.perform_operation()
            return {"result": result}
        except Exception as e:
            # Handle errors gracefully
            return {
                "error": str(e),
                "messages": [{
                    "role": "assistant",
                    "content": f"I encountered an error: {str(e)}"
                }]
            }

Complete Production Example

from copilotkit import CrewAIAgent
from crewai import Crew, Agent, Task, Process, Flow
from crewai.flow import start, listen, router
from crewai.flow.persistence import SQLitePersistence
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from pydantic import BaseModel

# Initialize LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)

# Define tools
@tool
def search_inventory(product_name: str) -> dict:
    """Search product inventory"""
    return {"in_stock": True, "quantity": 50}

@tool
def create_order(product_id: str, quantity: int) -> dict:
    """Create a new order"""
    return {"order_id": "ORD-123", "status": "created"}

# Option 1: Crew-based approach
order_specialist = Agent(
    role="Order Specialist",
    goal="Help customers place orders",
    backstory="Expert in product catalog and order processing",
    tools=[search_inventory, create_order],
    llm=llm,
    verbose=True
)

inventory_agent = Agent(
    role="Inventory Manager",
    goal="Check and manage inventory",
    backstory="Maintains accurate inventory information",
    tools=[search_inventory],
    llm=llm,
    verbose=True
)

check_inventory = Task(
    description="Check inventory for: {product}",
    agent=inventory_agent,
    expected_output="Inventory status"
)

place_order = Task(
    description="Place order based on inventory check",
    agent=order_specialist,
    expected_output="Order confirmation",
    context=[check_inventory]
)

order_crew = Crew(
    agents=[inventory_agent, order_specialist],
    tasks=[check_inventory, place_order],
    process=Process.sequential,
    verbose=True,
    chat_llm=llm
)

# Option 2: Flow-based approach
class OrderFlowState(BaseModel):
    product: str = ""
    quantity: int = 0
    inventory_checked: bool = False
    order_placed: bool = False

class OrderFlow(Flow[OrderFlowState]):
    def __init__(self):
        super().__init__()
        self._persistence = SQLitePersistence("orders.db")
    
    @start()
    def check_inventory(self):
        """Check if product is in stock"""
        product = self.state.get("product")
        inventory = search_inventory(product)
        
        return {
            "inventory_checked": True,
            "in_stock": inventory["in_stock"]
        }
    
    @router("check_inventory")
    def can_order(self):
        """Determine if order can proceed"""
        if self.state.get("in_stock"):
            return "place_order"
        return "notify_out_of_stock"
    
    @listen("can_order")
    def place_order(self):
        """Place the order"""
        product = self.state.get("product")
        quantity = self.state.get("quantity")
        
        order = create_order(product, quantity)
        
        return {
            "order_placed": True,
            "order_id": order["order_id"]
        }
    
    @listen("can_order")
    def notify_out_of_stock(self):
        """Handle out of stock scenario"""
        return {
            "messages": [{
                "role": "assistant",
                "content": "Sorry, this product is currently out of stock."
            }]
        }

# Create agents
crew_agent = CrewAIAgent(
    name="order_crew",
    description="Order processing crew with inventory management",
    crew=order_crew
)

flow_agent = CrewAIAgent(
    name="order_flow",
    description="Streamlined order processing flow",
    flow=OrderFlow()
)

# Use in your application
from copilotkit.integrations.fastapi import add_fastapi_endpoint
from fastapi import FastAPI

app = FastAPI()

add_fastapi_endpoint(
    app,
    agents=[crew_agent, flow_agent],
    endpoint="/copilotkit"
)

Best Practices

1. Choose the Right Pattern

  • Use Crews for collaborative multi-agent scenarios with delegation
  • Use Flows for deterministic, state-machine-like workflows

2. Set chat_llm for Crews

crew = Crew(
    agents=[...],
    tasks=[...],
    chat_llm=llm  # Required for CopilotKit integration
)

3. Enable Persistence for Flows

from crewai.flow.persistence import SQLitePersistence

flow._persistence = SQLitePersistence("flow_state.db")

4. Handle Errors Gracefully

Always wrap critical operations in try-except blocks and return meaningful error messages.

5. Use Meaningful Agent Names

Agent names should be descriptive and follow naming conventions (alphanumeric, underscores, hyphens only).

API Reference

See the CrewAIAgent source code for complete implementation details.

Next Steps

BuiltInAgent

Use the built-in agent for simple cases

LangGraph Integration

Build complex workflows with LangGraph

Custom Agents

Create custom agent implementations

CrewAI Docs

Official CrewAI documentation