crewAI에서 조건부 작업을 사용하는 방법을 알아보세요
from typing import List
from pydantic import BaseModel
from crewai import Agent, Crew
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.task import Task
from crewai_tools import SerperDevTool
# 조건부 작업을 위한 조건 함수 정의
# 조건이 거짓이면 작업을 건너뛰고, 참이면 작업을 실행한다.
def is_data_missing(output: TaskOutput) -> bool:
return len(output.pydantic.events) < 10 # 이 조건이 참이면 작업을 건너뛴다
# 에이전트 정의
data_fetcher_agent = Agent(
role="데이터 수집자",
goal="Serper 도구를 사용해 온라인에서 데이터를 수집한다",
backstory="배경 이야기 1",
verbose=True,
tools=[SerperDevTool()]
)
data_processor_agent = Agent(
role="데이터 처리자",
goal="수집한 데이터를 처리한다",
backstory="배경 이야기 2",
verbose=True
)
summary_generator_agent = Agent(
role="요약 생성자",
goal="수집한 데이터에서 요약을 생성한다",
backstory="배경 이야기 3",
verbose=True
)
class EventOutput(BaseModel):
events: List[str]
task1 = Task(
description="Serper 도구를 사용해 샌프란시스코의 이벤트 데이터를 수집한다",
expected_output="이번 주 샌프란시스코에서 할 만한 10가지 활동 목록",
agent=data_fetcher_agent,
output_pydantic=EventOutput,
)
conditional_task = ConditionalTask(
description="""
데이터가 부족한지 확인한다. 10개 미만의 이벤트가 있다면,
Serper 도구를 사용해 추가 이벤트를 수집하여
이번 주 샌프란시스코에서 할 만한 10가지 활동을 완성한다.
""",
expected_output="이번 주 샌프란시스코에서 할 만한 10가지 활동 목록",
condition=is_data_missing,
agent=data_processor_agent,
)
task3 = Task(
description="수집한 데이터에서 샌프란시스코의 이벤트 요약을 생성한다",
expected_output="고객과 그들의 고객 및 경쟁사에 대한 완전한 보고서. 인구통계, 선호도, 시장 포지셔닝, 청중 참여 등이 포함된다.",
agent=summary_generator_agent,
)
# 작업을 포함한 크루 생성
crew = Crew(
agents=[data_fetcher_agent, data_processor_agent, summary_generator_agent],
tasks=[task1, conditional_task, task3],
verbose=True,
planning=True
)
# 크루 실행
result = crew.kickoff()
print("결과", result)