고객 지원 이메일 에이전트를 개별 단계로 분해하여 LangGraph로 에이전트를 구축하는 사고 방식을 배워보세요
LangGraph는 여러분이 구축하는 에이전트에 대한 사고 방식을 바꿀 수 있습니다. LangGraph로 에이전트를 구축할 때, 먼저 노드라고 하는 개별 단계로 분해합니다. 그런 다음 각 노드에 대한 다양한 결정과 전환을 설명합니다. 마지막으로 각 노드가 읽고 쓸 수 있는 공유 상태를 통해 노드들을 연결합니다. 이 튜토리얼에서는 LangGraph로 고객 지원 이메일 에이전트를 구축하는 사고 과정을 안내합니다.
프로세스의 개별 단계를 식별하는 것부터 시작합니다. 각 단계는 노드(특정 작업을 수행하는 함수)가 됩니다. 그런 다음 이러한 단계들이 서로 어떻게 연결되는지 스케치합니다.화살표는 가능한 경로를 보여주지만, 어떤 경로를 선택할지에 대한 실제 결정은 각 노드 내부에서 발생합니다.이제 워크플로우의 구성 요소를 식별했으므로 각 노드가 수행해야 하는 작업을 이해해 봅시다:
Read Email: 이메일 내용 추출 및 파싱
Classify Intent: LLM을 사용하여 긴급도와 주제를 분류한 다음 적절한 작업으로 라우팅
Doc Search: 관련 정보를 위해 지식 베이스 쿼리
Bug Track: 추적 시스템에서 이슈 생성 또는 업데이트
Draft Reply: 적절한 응답 생성
Human Review: 승인 또는 처리를 위해 담당자에게 에스컬레이션
Send Reply: 이메일 응답 발송
일부 노드는 다음에 갈 위치에 대한 결정을 내리고(Classify Intent, Draft Reply, Human Review), 다른 노드는 항상 동일한 다음 단계로 진행합니다(Read Email은 항상 Classify Intent로, Doc Search는 항상 Draft Reply로 이동).
각 노드를 간단한 함수로 구현합니다. 기억하세요: 노드는 상태를 받아 작업을 수행하고 업데이트를 반환합니다.
읽기 및 분류 노드
Copy
from typing import Literalfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import interrupt, Command, RetryPolicyfrom langchain_openai import ChatOpenAIfrom langchain_core.messages import HumanMessagellm = ChatOpenAI(model="gpt-4")def read_email(state: EmailAgentState) -> dict: """Extract and parse email content""" # In production, this would connect to your email service return { "messages": [HumanMessage(content=f"Processing email: {state['email_content']}")] }def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]: """Use LLM to classify email intent and urgency, then route accordingly""" # Create structured LLM that returns EmailClassification dict structured_llm = llm.with_structured_output(EmailClassification) # Format the prompt on-demand, not stored in state classification_prompt = f""" Analyze this customer email and classify it: Email: {state['email_content']} From: {state['sender_email']} Provide classification including intent, urgency, topic, and summary. """ # Get structured response directly as dict classification = structured_llm.invoke(classification_prompt) # Determine next node based on classification if classification['intent'] == 'billing' or classification['urgency'] == 'critical': goto = "human_review" elif classification['intent'] in ['question', 'feature']: goto = "search_documentation" elif classification['intent'] == 'bug': goto = "bug_tracking" else: goto = "draft_response" # Store classification as a single dict in state return Command( update={"classification": classification}, goto=goto )
검색 및 추적 노드
Copy
def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]: """Search knowledge base for relevant information""" # Build search query from classification classification = state.get('classification', {}) query = f"{classification.get('intent', '')} {classification.get('topic', '')}" try: # Implement your search logic here # Store raw search results, not formatted text search_results = [ "Reset password via Settings > Security > Change Password", "Password must be at least 12 characters", "Include uppercase, lowercase, numbers, and symbols" ] except SearchAPIError as e: # For recoverable search errors, store error and continue search_results = [f"Search temporarily unavailable: {str(e)}"] return Command( update={"search_results": search_results}, # Store raw results or error goto="draft_response" )def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]: """Create or update bug tracking ticket""" # Create ticket in your bug tracking system ticket_id = "BUG-12345" # Would be created via API return Command( update={ "search_results": [f"Bug ticket {ticket_id} created"], "current_step": "bug_tracked" }, goto="draft_response" )
응답 노드
Copy
def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]: """Generate response using context and route based on quality""" classification = state.get('classification', {}) # Format context from raw state data on-demand context_sections = [] if state.get('search_results'): # Format search results for the prompt formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']]) context_sections.append(f"Relevant documentation:\n{formatted_docs}") if state.get('customer_history'): # Format customer data for the prompt context_sections.append(f"Customer tier: {state['customer_history'].get('tier', 'standard')}") # Build the prompt with formatted context draft_prompt = f""" Draft a response to this customer email: {state['email_content']} Email intent: {classification.get('intent', 'unknown')} Urgency level: {classification.get('urgency', 'medium')} {chr(10).join(context_sections)} Guidelines: - Be professional and helpful - Address their specific concern - Use the provided documentation when relevant """ response = llm.invoke(draft_prompt) # Determine if human review needed based on urgency and intent needs_review = ( classification.get('urgency') in ['high', 'critical'] or classification.get('intent') == 'complex' ) # Route to appropriate next node goto = "human_review" if needs_review else "send_reply" return Command( update={"draft_response": response.content}, # Store only the raw response goto=goto )def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]: """Pause for human review using interrupt and route based on decision""" classification = state.get('classification', {}) # interrupt() must come first - any code before it will re-run on resume human_decision = interrupt({ "email_id": state.get('email_id',''), "original_email": state.get('email_content',''), "draft_response": state.get('draft_response',''), "urgency": classification.get('urgency'), "intent": classification.get('intent'), "action": "Please review and approve/edit this response" }) # Now process the human's decision if human_decision.get("approved"): return Command( update={"draft_response": human_decision.get("edited_response", state.get('draft_response',''))}, goto="send_reply" ) else: # Rejection means human will handle directly return Command(update={}, goto=END)def send_reply(state: EmailAgentState) -> dict: """Send the email response""" # Integrate with email service print(f"Sending reply: {state['draft_response'][:100]}...") return {}
이제 노드들을 작동하는 그래프로 연결합니다. 노드가 자체 라우팅 결정을 처리하므로 몇 가지 필수 엣지만 필요합니다.interrupt()를 사용한 human-in-the-loop를 활성화하려면 실행 간 상태를 저장하기 위해 checkpointer로 컴파일해야 합니다:
그래프 컴파일 코드
Copy
from langgraph.checkpoint.memory import MemorySaverfrom langgraph.types import RetryPolicy# Create the graphworkflow = StateGraph(EmailAgentState)# Add nodes with appropriate error handlingworkflow.add_node("read_email", read_email)workflow.add_node("classify_intent", classify_intent)# Add retry policy for nodes that might have transient failuresworkflow.add_node( "search_documentation", search_documentation, retry_policy=RetryPolicy(max_attempts=3))workflow.add_node("bug_tracking", bug_tracking)workflow.add_node("draft_response", draft_response)workflow.add_node("human_review", human_review)workflow.add_node("send_reply", send_reply)# Add only the essential edgesworkflow.add_edge(START, "read_email")workflow.add_edge("read_email", "classify_intent")workflow.add_edge("send_reply", END)# Compile with checkpointer for persistence, in case run graph with Local_Server --> Please compile without checkpointermemory = MemorySaver()app = workflow.compile(checkpointer=memory)
그래프 구조는 최소한입니다. 라우팅이 Command 객체를 통해 노드 내부에서 발생하기 때문입니다. 각 노드는 Command[Literal["node1", "node2"]]와 같은 타입 힌트를 사용하여 갈 수 있는 위치를 선언하므로 흐름이 명시적이고 추적 가능합니다.
# Test with an urgent billing issueinitial_state = { "email_content": "I was charged twice for my subscription! This is urgent!", "sender_email": "[email protected]", "email_id": "email_123", "messages": []}# Run with a thread_id for persistenceconfig = {"configurable": {"thread_id": "customer_123"}}result = app.invoke(initial_state, config)# The graph will pause at human_reviewprint(f"Draft ready for review: {result['draft_response'][:100]}...")# When ready, provide human input to resumefrom langgraph.types import Commandhuman_response = Command( resume={ "approved": True, "edited_response": "We sincerely apologize for the double charge. I've initiated an immediate refund..." })# Resume executionfinal_result = app.invoke(human_response, config)print(f"Email sent successfully!")
그래프는 interrupt()에 도달하면 일시 중지되고, 모든 것을 checkpointer에 저장하고 대기합니다. 며칠 후에도 재개할 수 있으며, 중단된 지점에서 정확히 이어집니다. thread_id는 이 대화의 모든 상태가 함께 보존되도록 보장합니다.
이 섹션에서는 노드 세분성 설계의 트레이드오프를 탐구합니다. 대부분의 애플리케이션은 이를 건너뛰고 위에 표시된 패턴을 사용할 수 있습니다.
궁금할 수 있습니다: Read Email과 Classify Intent를 하나의 노드로 결합하지 않는 이유는 무엇일까요?또는 Doc Search를 Draft Reply와 분리하는 이유는 무엇일까요?답은 복원력과 관찰 가능성 간의 트레이드오프와 관련이 있습니다.복원력 고려사항: LangGraph의 내구성 있는 실행은 노드 경계에서 체크포인트를 생성합니다. 중단 또는 실패 후 워크플로우가 재개되면 실행이 중지된 노드의 시작 부분부터 시작됩니다. 노드가 작을수록 체크포인트가 더 자주 생성되며, 이는 문제가 발생했을 때 반복해야 할 작업이 줄어든다는 것을 의미합니다. 여러 작업을 하나의 큰 노드로 결합하면 끝 부분 근처에서 실패가 발생할 경우 해당 노드의 시작 부분부터 모든 것을 다시 실행해야 합니다.이메일 에이전트에 대해 이러한 분해를 선택한 이유:
외부 서비스의 격리: Doc Search와 Bug Track은 외부 API를 호출하기 때문에 별도의 노드입니다. 검색 서비스가 느리거나 실패하면 LLM 호출과 격리하고 싶습니다. 다른 노드에 영향을 주지 않고 이러한 특정 노드에 재시도 정책을 추가할 수 있습니다.
중간 가시성:Classify Intent를 자체 노드로 두면 조치를 취하기 전에 LLM이 결정한 내용을 검사할 수 있습니다. 이는 디버깅 및 모니터링에 유용합니다. 에이전트가 언제 왜 사람 검토로 라우팅되는지 정확히 볼 수 있습니다.
다른 실패 모드: LLM 호출, 데이터베이스 조회, 이메일 발송은 서로 다른 재시도 전략을 가집니다. 별도의 노드를 사용하면 이를 독립적으로 구성할 수 있습니다.
재사용성 및 테스트: 작은 노드는 격리된 상태에서 테스트하고 다른 워크플로우에서 재사용하기가 더 쉽습니다.
다른 유효한 접근 방식: Read Email과 Classify Intent를 단일 노드로 결합할 수 있습니다. 분류 전에 원시 이메일을 검사하는 기능을 잃게 되고 해당 노드의 실패 시 두 작업을 모두 반복하게 됩니다. 대부분의 애플리케이션에서는 별도 노드의 관찰 가능성 및 디버깅 이점이 트레이드오프의 가치가 있습니다.애플리케이션 수준 고려사항: 2단계의 캐싱 논의(검색 결과를 캐시할지 여부)는 LangGraph 프레임워크 기능이 아닌 애플리케이션 수준 결정입니다. 특정 요구사항에 따라 노드 함수 내에서 캐싱을 구현합니다. LangGraph는 이를 규정하지 않습니다.성능 고려사항: 노드가 많다고 해서 실행이 느려지는 것은 아닙니다. LangGraph는 기본적으로 백그라운드에서 체크포인트를 작성하므로(비동기 내구성 모드), 그래프는 체크포인트가 완료될 때까지 기다리지 않고 계속 실행됩니다. 이는 최소한의 성능 영향으로 빈번한 체크포인트를 얻을 수 있음을 의미합니다. 필요한 경우 이 동작을 조정할 수 있습니다. 완료 시에만 체크포인트하려면 "exit" 모드를 사용하거나, 각 체크포인트가 작성될 때까지 실행을 차단하려면 "sync" 모드를 사용합니다.