LangGraph SDK를 사용하면 LangGraph Server API에서 출력을 스트리밍할 수 있습니다.
LangGraph SDK와 LangGraph Server는 LangSmith의 일부입니다.

기본 사용법

기본 사용 예제:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# create a streaming run
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
이것은 LangGraph API 서버에서 실행할 수 있는 예제 graph입니다. 자세한 내용은 LangSmith 빠른 시작을 참조하세요.
# graph.py
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)
실행 중인 LangGraph API 서버가 있으면 LangGraph SDK를 사용하여 상호작용할 수 있습니다.
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# create a streaming run
async for chunk in client.runs.stream(  # (1)!
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"  # (2)!
):
    print(chunk.data)
  1. client.runs.stream() 메서드는 스트리밍된 출력을 생성하는 iterator를 반환합니다. 2. stream_mode="updates"로 설정하면 각 node 이후 graph state에 대한 업데이트만 스트리밍합니다. 다른 stream mode도 사용할 수 있습니다. 자세한 내용은 지원되는 stream mode를 참조하세요.
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

지원되는 stream mode

Mode설명LangGraph Library Method
valuessuper-step 이후 전체 graph state를 스트리밍합니다..stream() / .astream() with stream_mode="values"
updatesgraph의 각 step 이후 state에 대한 업데이트를 스트리밍합니다. 동일한 step에서 여러 업데이트가 발생하면(예: 여러 node가 실행됨) 해당 업데이트는 별도로 스트리밍됩니다..stream() / .astream() with stream_mode="updates"
messages-tupleLLM이 호출되는 graph node에 대한 LLM token과 metadata를 스트리밍합니다(채팅 앱에 유용)..stream() / .astream() with stream_mode="messages"
debuggraph 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다..stream() / .astream() with stream_mode="debug"
customgraph 내부에서 사용자 정의 데이터를 스트리밍합니다..stream() / .astream() with stream_mode="custom"
events모든 event(graph의 state 포함)를 스트리밍합니다. 주로 대규모 LCEL 앱을 마이그레이션할 때 유용합니다..astream_events()

여러 mode 스트리밍

stream_mode 매개변수에 list를 전달하여 여러 mode를 동시에 스트리밍할 수 있습니다. 스트리밍된 출력은 (mode, chunk) 튜플이 되며, 여기서 mode는 stream mode의 이름이고 chunk는 해당 mode에서 스트리밍된 데이터입니다.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode=["updates", "custom"]
):
    print(chunk)

Graph state 스트리밍

stream mode updatesvalues를 사용하여 실행 중인 graph의 state를 스트리밍합니다.
  • updates는 graph의 각 step 이후 state에 대한 업데이트를 스트리밍합니다.
  • values는 graph의 각 step 이후 state의 전체 값을 스트리밍합니다.
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
  topic: str
  joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
Stateful run 아래 예제는 streaming run의 출력을 유지하고 checkpointer DB에 저장하며 thread를 생성했다고 가정합니다. thread를 생성하려면:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
run의 출력을 유지할 필요가 없다면 스트리밍할 때 thread_id 대신 None을 전달할 수 있습니다.

Stream Mode: updates

각 step 이후 node에서 반환된 state 업데이트만 스트리밍하려면 이것을 사용하세요. 스트리밍된 출력에는 node의 이름과 업데이트가 포함됩니다.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"
):
    print(chunk.data)

Stream Mode: values

각 step 이후 graph의 전체 state를 스트리밍하려면 이것을 사용하세요.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="values"
):
    print(chunk.data)

Subgraph

스트리밍된 출력에 subgraph의 출력을 포함하려면 parent graph의 .stream() 메서드에서 subgraphs=True로 설정할 수 있습니다. 이렇게 하면 parent graph와 모든 subgraph의 출력이 스트리밍됩니다.
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. stream_subgraphs=True로 설정하여 subgraph의 출력을 스트리밍합니다.
이것은 LangGraph API 서버에서 실행할 수 있는 예제 graph입니다. 자세한 내용은 LangSmith 빠른 시작을 참조하세요.
# graph.py
from langgraph.graph import START, StateGraph
from typing import TypedDict

# Define subgraph
class SubgraphState(TypedDict):
    foo: str  # note that this key is shared with the parent graph state
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# Define parent graph
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
실행 중인 LangGraph API 서버가 있으면 LangGraph SDK를 사용하여 상호작용할 수 있습니다.
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. stream_subgraphs=True로 설정하여 subgraph의 출력을 스트리밍합니다.
node 업데이트뿐만 아니라 어떤 graph(또는 subgraph)에서 스트리밍하고 있는지 알려주는 namespace도 받고 있다는 점에 유의하세요.

디버깅

debug streaming mode를 사용하여 graph 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다. 스트리밍된 출력에는 node의 이름과 전체 state가 포함됩니다.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="debug"
):
    print(chunk.data)

LLM token

messages-tuple streaming mode를 사용하여 node, tool, subgraph 또는 task를 포함한 graph의 모든 부분에서 Large Language Model(LLM) 출력을 token 단위로 스트리밍합니다. messages-tuple mode에서 스트리밍된 출력은 튜플 (message_chunk, metadata)이며, 여기서:
  • message_chunk: LLM의 token 또는 message segment입니다.
  • metadata: graph node 및 LLM 호출에 대한 세부 정보를 포함하는 dictionary입니다.
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START

@dataclass
class MyState:
    topic: str
    joke: str = ""

model = init_chat_model(model="openai:gpt-4o-mini")

def call_model(state: MyState):
    """Call the LLM to generate a joke about a topic"""
    model_response = model.invoke( # (1)!
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)
  1. LLM이 stream이 아닌 invoke를 사용하여 실행되는 경우에도 message event가 발생합니다.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="messages-tuple",
):
    if chunk.event != "messages":
        continue

    message_chunk, metadata = chunk.data  # (1)!
    if message_chunk["content"]:
        print(message_chunk["content"], end="|", flush=True)
  1. “messages-tuple” stream mode는 튜플 (message_chunk, metadata)의 iterator를 반환합니다. 여기서 message_chunk는 LLM에서 스트리밍된 token이고 metadata는 LLM이 호출된 graph node에 대한 정보 및 기타 정보가 포함된 dictionary입니다.

LLM token 필터링

사용자 정의 데이터 스트리밍

사용자 정의 데이터를 전송하려면:
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"query": "example"},
    stream_mode="custom"
):
    print(chunk.data)

Event 스트리밍

graph의 state를 포함한 모든 event를 스트리밍하려면:
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="events"
):
    print(chunk.data)

Stateless run

streaming run의 출력을 유지하지 않고 checkpointer DB에 저장하지 않으려면 thread를 생성하지 않고 stateless run을 생성할 수 있습니다:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.stream(
    None,  # (1)!
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
  1. thread_id UUID 대신 None을 전달합니다.

Join 및 stream

LangSmith를 사용하면 활성 background run에 join하고 출력을 스트리밍할 수 있습니다. 이를 위해 LangGraph SDK의 client.runs.join_stream 메서드를 사용할 수 있습니다:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.join_stream(
    thread_id,
    run_id,  # (1)!
):
    print(chunk)
  1. 이것은 join하려는 기존 run의 run_id입니다.
출력이 버퍼링되지 않음 .join_stream을 사용하면 출력이 버퍼링되지 않으므로 join하기 전에 생성된 출력은 수신되지 않습니다.

API reference

API 사용법 및 구현에 대해서는 API reference를 참조하세요.
Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.
I