CrewAIAgent integrates both Crew-based and Flow-based CrewAI patterns into your application.
Installation
pip install "copilotkit[crewai]"
Quick Start
Using a CrewAI Crew
from copilotkit import CrewAIAgent
from crewai import Crew, Agent, Task
# Define your crew
researcher = Agent(
role="Research Analyst",
goal="Research and analyze information",
backstory="Expert at finding and synthesizing information"
)
writer = Agent(
role="Content Writer",
goal="Write engaging content",
backstory="Skilled writer who crafts compelling narratives"
)
research_task = Task(
description="Research the topic: {topic}",
agent=researcher,
expected_output="A comprehensive research report"
)
writing_task = Task(
description="Write an article based on the research",
agent=writer,
expected_output="A well-written article"
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
verbose=True
)
# Create CopilotKit agent
agent = CrewAIAgent(
name="content_crew",
description="A research and writing crew",
crew=crew
)
Using a CrewAI Flow
from copilotkit import CrewAIAgent
from crewai import Flow
from crewai.flow import start, listen
class ContentFlow(Flow):
@start()
def research_phase(self):
# Research logic
results = self.research(self.state.get("topic"))
return {"research": results}
@listen("research_phase")
def writing_phase(self):
# Writing logic based on research
research = self.state.get("research")
article = self.write_article(research)
return {"article": article}
flow = ContentFlow()
agent = CrewAIAgent(
name="content_flow",
description="A content creation flow",
flow=flow
)
Creating a CrewAI Agent
Configuration
The unique identifier for your agent. Must contain only alphanumeric characters, underscores, and hyphens.
A CrewAI Crew instance. Either
crew or flow must be provided.A CrewAI Flow instance. Either
crew or flow must be provided.Description of the agent’s purpose. Used for dynamic routing when multiple agents are available.
Advanced CopilotKit configuration for customizing state merging.
Crew-Based Agents
Complete Crew Example
from copilotkit import CrewAIAgent
from crewai import Crew, Agent, Task, Process
from langchain_openai import ChatOpenAI
# Initialize LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# Define specialized agents
data_analyst = Agent(
role="Data Analyst",
goal="Analyze data and extract insights",
backstory="Expert data scientist with years of experience",
llm=llm,
verbose=True
)
report_writer = Agent(
role="Report Writer",
goal="Create clear, actionable reports",
backstory="Professional report writer with business acumen",
llm=llm,
verbose=True
)
# Define tasks
analysis_task = Task(
description="Analyze the provided data for trends and patterns: {data}",
agent=data_analyst,
expected_output="A detailed analysis with key findings and metrics"
)
report_task = Task(
description="Write a comprehensive report based on the analysis",
agent=report_writer,
expected_output="A professional report with recommendations",
context=[analysis_task] # Depends on analysis
)
# Create crew with sequential process
analytics_crew = Crew(
agents=[data_analyst, report_writer],
tasks=[analysis_task, report_task],
process=Process.sequential,
verbose=True,
chat_llm=llm # Required for crew-based agents
)
# Create CopilotKit agent
agent = CrewAIAgent(
name="analytics_crew",
description="Data analytics and reporting crew",
crew=analytics_crew
)
Hierarchical Process
from crewai import Process
# Define manager agent
manager = Agent(
role="Project Manager",
goal="Coordinate team and ensure quality",
backstory="Experienced project manager",
llm=llm,
allow_delegation=True
)
crew = Crew(
agents=[manager, data_analyst, report_writer],
tasks=[analysis_task, report_task],
process=Process.hierarchical,
manager_agent=manager,
verbose=True,
chat_llm=llm
)
Flow-Based Agents
Basic Flow Structure
from crewai import Flow
from crewai.flow import start, listen, router
from pydantic import BaseModel
class EmailFlowState(BaseModel):
recipient: str = ""
subject: str = ""
draft: str = ""
approved: bool = False
class EmailFlow(Flow[EmailFlowState]):
@start()
def draft_email(self):
"""Draft an email based on user input"""
messages = self.state.get("messages", [])
# Your drafting logic
return {
"draft": draft_text,
"subject": subject
}
@listen("draft_email")
def review_draft(self):
"""Review the draft"""
draft = self.state.get("draft")
# Review logic
return {"approved": is_approved}
@router("review_draft")
def check_approval(self):
"""Route based on approval"""
if self.state.get("approved"):
return "send_email"
return "draft_email"
@listen("check_approval")
def send_email(self):
"""Send the approved email"""
draft = self.state.get("draft")
recipient = self.state.get("recipient")
# Send logic
return {"sent": True}
flow = EmailFlow()
agent = CrewAIAgent(
name="email_flow",
description="Email drafting and sending flow",
flow=flow
)
Multi-Path Flow
class CustomerServiceFlow(Flow):
@start()
def categorize_request(self):
"""Categorize the customer request"""
messages = self.state.get("messages", [])
category = self.categorize(messages)
return {"category": category}
@router("categorize_request")
def route_request(self):
"""Route to appropriate handler"""
category = self.state.get("category")
if category == "technical":
return "handle_technical"
elif category == "billing":
return "handle_billing"
return "handle_general"
@listen("route_request")
def handle_technical(self):
"""Handle technical issues"""
return {"resolution": "..."}
@listen("route_request")
def handle_billing(self):
"""Handle billing issues"""
return {"resolution": "..."}
@listen("route_request")
def handle_general(self):
"""Handle general inquiries"""
return {"resolution": "..."}
Working with Tools
Adding Tools to Agents
from crewai import Agent, Task, Crew
from langchain_core.tools import tool
@tool
def search_database(query: str) -> str:
"""Search the internal database"""
# Your search implementation
return results
@tool
def send_notification(user_id: str, message: str) -> dict:
"""Send a notification to a user"""
# Your notification implementation
return {"success": True}
# Add tools to agents
support_agent = Agent(
role="Support Agent",
goal="Help customers with their issues",
backstory="Experienced customer support specialist",
tools=[search_database, send_notification],
llm=llm,
verbose=True
)
Dynamic Tool Access in Flows
class ToolFlow(Flow):
@start()
def use_tools(self):
"""Use tools from CopilotKit actions"""
# Access actions passed from CopilotKit
actions = self.state.get("copilotkit", {}).get("actions", [])
# Your tool usage logic
for action in actions:
tool_name = action["function"]["name"]
# Use the tool
return {"result": "..."}
State Management
Flow State with Persistence
from crewai import Flow
from crewai.flow.persistence import SQLitePersistence
class PersistentFlow(Flow):
def __init__(self):
super().__init__()
# Configure persistence
self._persistence = SQLitePersistence("flow_state.db")
flow = PersistentFlow()
agent = CrewAIAgent(
name="persistent_flow",
flow=flow
)
# State is automatically persisted across sessions
Custom State Merging
def custom_merge_state(*, state, messages, actions, agent_name, flow):
"""Custom state merge logic for CrewAI flows"""
return {
**state,
"messages": messages,
"copilotkit": {
"actions": actions
},
"custom_field": "value"
}
agent = CrewAIAgent(
name="custom_flow",
flow=flow,
copilotkit_config={
"merge_state": custom_merge_state
}
)
Streaming and Progress Updates
Emit Progress from Flow
from copilotkit.crewai.crewai_sdk import copilotkit_stream
class ProgressFlow(Flow):
@start()
async def long_running_task(self):
"""A task that reports progress"""
for i in range(100):
# Do work
await self.emit_progress(i)
return {"completed": True}
async def emit_progress(self, progress: int):
"""Helper to emit progress updates"""
# Update state to trigger frontend update
self.state["progress"] = progress
Exit Flow Early
from copilotkit.crewai.crewai_sdk import copilotkit_exit
class CancellableFlow(Flow):
@start()
async def main_flow(self):
# Check if user wants to cancel
if self.should_cancel():
await copilotkit_exit()
return {"cancelled": True}
# Continue with flow
return {"completed": True}
Multi-Agent Collaboration
Delegating Between Agents
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="...",
llm=llm
)
analyst = Agent(
role="Analyst",
goal="Analyze research findings",
backstory="...",
llm=llm,
allow_delegation=True # Can delegate to other agents
)
writer = Agent(
role="Writer",
goal="Write final report",
backstory="...",
llm=llm
)
# Tasks can be delegated within the crew
research_task = Task(
description="Research topic: {topic}",
agent=researcher,
expected_output="Research findings"
)
analysis_task = Task(
description="Analyze the research and provide insights",
agent=analyst,
expected_output="Analysis report",
context=[research_task]
)
writing_task = Task(
description="Write final report",
agent=writer,
expected_output="Final report",
context=[analysis_task]
)
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
process=Process.sequential,
verbose=True,
chat_llm=llm
)
Using with FastAPI
from fastapi import FastAPI
from copilotkit import CrewAIAgent
from copilotkit.integrations.fastapi import add_fastapi_endpoint
app = FastAPI()
# Create your crew or flow
agent = CrewAIAgent(
name="my_crew",
crew=crew # or flow=flow
)
# Add endpoint
add_fastapi_endpoint(
app,
agents=[agent],
endpoint="/copilotkit"
)
Error Handling
class RobustFlow(Flow):
@start()
def safe_operation(self):
try:
# Your flow logic
result = self.perform_operation()
return {"result": result}
except Exception as e:
# Handle errors gracefully
return {
"error": str(e),
"messages": [{
"role": "assistant",
"content": f"I encountered an error: {str(e)}"
}]
}
Complete Production Example
from copilotkit import CrewAIAgent
from crewai import Crew, Agent, Task, Process, Flow
from crewai.flow import start, listen, router
from crewai.flow.persistence import SQLitePersistence
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from pydantic import BaseModel
# Initialize LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# Define tools
@tool
def search_inventory(product_name: str) -> dict:
"""Search product inventory"""
return {"in_stock": True, "quantity": 50}
@tool
def create_order(product_id: str, quantity: int) -> dict:
"""Create a new order"""
return {"order_id": "ORD-123", "status": "created"}
# Option 1: Crew-based approach
order_specialist = Agent(
role="Order Specialist",
goal="Help customers place orders",
backstory="Expert in product catalog and order processing",
tools=[search_inventory, create_order],
llm=llm,
verbose=True
)
inventory_agent = Agent(
role="Inventory Manager",
goal="Check and manage inventory",
backstory="Maintains accurate inventory information",
tools=[search_inventory],
llm=llm,
verbose=True
)
check_inventory = Task(
description="Check inventory for: {product}",
agent=inventory_agent,
expected_output="Inventory status"
)
place_order = Task(
description="Place order based on inventory check",
agent=order_specialist,
expected_output="Order confirmation",
context=[check_inventory]
)
order_crew = Crew(
agents=[inventory_agent, order_specialist],
tasks=[check_inventory, place_order],
process=Process.sequential,
verbose=True,
chat_llm=llm
)
# Option 2: Flow-based approach
class OrderFlowState(BaseModel):
product: str = ""
quantity: int = 0
inventory_checked: bool = False
order_placed: bool = False
class OrderFlow(Flow[OrderFlowState]):
def __init__(self):
super().__init__()
self._persistence = SQLitePersistence("orders.db")
@start()
def check_inventory(self):
"""Check if product is in stock"""
product = self.state.get("product")
inventory = search_inventory(product)
return {
"inventory_checked": True,
"in_stock": inventory["in_stock"]
}
@router("check_inventory")
def can_order(self):
"""Determine if order can proceed"""
if self.state.get("in_stock"):
return "place_order"
return "notify_out_of_stock"
@listen("can_order")
def place_order(self):
"""Place the order"""
product = self.state.get("product")
quantity = self.state.get("quantity")
order = create_order(product, quantity)
return {
"order_placed": True,
"order_id": order["order_id"]
}
@listen("can_order")
def notify_out_of_stock(self):
"""Handle out of stock scenario"""
return {
"messages": [{
"role": "assistant",
"content": "Sorry, this product is currently out of stock."
}]
}
# Create agents
crew_agent = CrewAIAgent(
name="order_crew",
description="Order processing crew with inventory management",
crew=order_crew
)
flow_agent = CrewAIAgent(
name="order_flow",
description="Streamlined order processing flow",
flow=OrderFlow()
)
# Use in your application
from copilotkit.integrations.fastapi import add_fastapi_endpoint
from fastapi import FastAPI
app = FastAPI()
add_fastapi_endpoint(
app,
agents=[crew_agent, flow_agent],
endpoint="/copilotkit"
)
Best Practices
1. Choose the Right Pattern
- Use Crews for collaborative multi-agent scenarios with delegation
- Use Flows for deterministic, state-machine-like workflows
2. Set chat_llm for Crews
crew = Crew(
agents=[...],
tasks=[...],
chat_llm=llm # Required for CopilotKit integration
)
3. Enable Persistence for Flows
from crewai.flow.persistence import SQLitePersistence
flow._persistence = SQLitePersistence("flow_state.db")
4. Handle Errors Gracefully
Always wrap critical operations in try-except blocks and return meaningful error messages.5. Use Meaningful Agent Names
Agent names should be descriptive and follow naming conventions (alphanumeric, underscores, hyphens only).API Reference
See the CrewAIAgent source code for complete implementation details.Next Steps
BuiltInAgent
Use the built-in agent for simple cases
LangGraph Integration
Build complex workflows with LangGraph
Custom Agents
Create custom agent implementations
CrewAI Docs
Official CrewAI documentation
