소개

crewAI의 조건부 작업은 이전 작업의 결과에 따라 동적으로 워크플로를 조정할 수 있게 한다. 이 강력한 기능을 활용하면 AI 기반 프로세스의 유연성과 효율성을 높이며, 작업을 선택적으로 실행하고 결정을 내릴 수 있다.

사용 예제

from typing import List
from pydantic import BaseModel
from crewai import Agent, Crew
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.task import Task
from crewai_tools import SerperDevTool

# 조건부 작업을 위한 조건 함수 정의
# 조건이 거짓이면 작업을 건너뛰고, 참이면 작업을 실행한다.
def is_data_missing(output: TaskOutput) -> bool:
    return len(output.pydantic.events) < 10  # 이 조건이 참이면 작업을 건너뛴다

# 에이전트 정의
data_fetcher_agent = Agent(
    role="데이터 수집자",
    goal="Serper 도구를 사용해 온라인에서 데이터를 수집한다",
    backstory="배경 이야기 1",
    verbose=True,
    tools=[SerperDevTool()]
)

data_processor_agent = Agent(
    role="데이터 처리자",
    goal="수집한 데이터를 처리한다",
    backstory="배경 이야기 2",
    verbose=True
)

summary_generator_agent = Agent(
    role="요약 생성자",
    goal="수집한 데이터에서 요약을 생성한다",
    backstory="배경 이야기 3",
    verbose=True
)

class EventOutput(BaseModel):
    events: List[str]

task1 = Task(
    description="Serper 도구를 사용해 샌프란시스코의 이벤트 데이터를 수집한다",
    expected_output="이번 주 샌프란시스코에서 할 만한 10가지 활동 목록",
    agent=data_fetcher_agent,
    output_pydantic=EventOutput,
)

conditional_task = ConditionalTask(
    description="""
        데이터가 부족한지 확인한다. 10개 미만의 이벤트가 있다면,
        Serper 도구를 사용해 추가 이벤트를 수집하여
        이번 주 샌프란시스코에서 할 만한 10가지 활동을 완성한다.
        """,
    expected_output="이번 주 샌프란시스코에서 할 만한 10가지 활동 목록",
    condition=is_data_missing,
    agent=data_processor_agent,
)

task3 = Task(
    description="수집한 데이터에서 샌프란시스코의 이벤트 요약을 생성한다",
    expected_output="고객과 그들의 고객 및 경쟁사에 대한 완전한 보고서. 인구통계, 선호도, 시장 포지셔닝, 청중 참여 등이 포함된다.",
    agent=summary_generator_agent,
)

# 작업을 포함한 크루 생성
crew = Crew(
    agents=[data_fetcher_agent, data_processor_agent, summary_generator_agent],
    tasks=[task1, conditional_task, task3],
    verbose=True,
    planning=True
)

# 크루 실행
result = crew.kickoff()
print("결과", result)