LangGraph는 실시간 업데이트를 제공하기 위한 streaming 시스템을 구현합니다. Streaming은 LLM 기반 애플리케이션의 응답성을 향상시키는 데 매우 중요합니다. 완전한 응답이 준비되기 전에도 출력을 점진적으로 표시함으로써, streaming은 특히 LLM의 지연 시간을 처리할 때 사용자 경험(UX)을 크게 개선합니다. LangGraph streaming으로 가능한 것들:
  • Graph state streamupdatesvalues 모드로 state 업데이트 / 값을 가져옵니다.
  • Subgraph 출력 stream — 부모 graph와 중첩된 subgraph의 출력을 모두 포함합니다.
  • LLM token stream — node, subgraph 또는 tool 내부 어디에서든 token stream을 캡처합니다.
  • 사용자 정의 데이터 stream — tool function에서 직접 사용자 정의 업데이트 또는 진행 신호를 전송합니다.
  • 여러 streaming 모드 사용values (전체 state), updates (state 델타), messages (LLM token + metadata), custom (임의의 사용자 데이터), 또는 debug (상세한 trace) 중에서 선택합니다.

지원되는 stream 모드

다음 stream 모드 중 하나 이상을 리스트로 stream 또는 astream 메서드에 전달하세요:
모드설명
valuesgraph의 각 단계 후 state의 전체 값을 stream합니다.
updatesgraph의 각 단계 후 state에 대한 업데이트를 stream합니다. 동일한 단계에서 여러 업데이트가 발생하면(예: 여러 node가 실행됨), 해당 업데이트는 개별적으로 stream됩니다.
customgraph node 내부에서 사용자 정의 데이터를 stream합니다.
messagesLLM이 호출되는 모든 graph node에서 2-tuple (LLM token, metadata)을 stream합니다.
debuggraph 실행 전반에 걸쳐 가능한 한 많은 정보를 stream합니다.

기본 사용 예제

LangGraph graph는 stream (동기) 및 astream (비동기) 메서드를 노출하여 stream된 출력을 iterator로 생성합니다.
for chunk in graph.stream(inputs, stream_mode="updates"):
    print(chunk)
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)

# The stream() method returns an iterator that yields streamed outputs
for chunk in graph.stream(  
    {"topic": "ice cream"},
    # Set stream_mode="updates" to stream only the updates to the graph state after each node
    # Other stream modes are also available. See supported stream modes for details
    stream_mode="updates",  
):
    print(chunk)
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}

여러 모드 stream

stream_mode 매개변수에 리스트를 전달하여 여러 모드를 동시에 stream할 수 있습니다. stream된 출력은 (mode, chunk) tuple이 되며, 여기서 mode는 stream 모드의 이름이고 chunk는 해당 모드에서 stream된 데이터입니다.
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)

Graph state stream

stream 모드 updatesvalues를 사용하여 graph가 실행될 때 state를 stream합니다.
  • updates는 graph의 각 단계 후 state에 대한 업데이트를 stream합니다.
  • values는 graph의 각 단계 후 state의 전체 값을 stream합니다.
from typing import TypedDict
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
  topic: str
  joke: str


def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}


def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
  • updates
  • values
각 단계 후 node에서 반환된 state 업데이트만 stream하려면 이것을 사용하세요. stream된 출력에는 node의 이름과 업데이트가 포함됩니다.
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="updates",  
):
    print(chunk)

Subgraph 출력 stream

stream된 출력에 subgraph의 출력을 포함하려면, 부모 graph의 .stream() 메서드에서 subgraphs=True를 설정할 수 있습니다. 이렇게 하면 부모 graph와 모든 subgraph의 출력이 stream됩니다. 출력은 tuple (namespace, data)로 stream되며, 여기서 namespace는 subgraph가 호출되는 node의 경로를 가진 tuple입니다. 예: ("parent_node:<task_id>", "child_node:<task_id>").
for chunk in graph.stream(
    {"foo": "foo"},
    # Set subgraphs=True to stream outputs from subgraphs
    subgraphs=True,  
    stream_mode="updates",
):
    print(chunk)
from langgraph.graph import START, StateGraph
from typing import TypedDict

# Define subgraph
class SubgraphState(TypedDict):
    foo: str  # note that this key is shared with the parent graph state
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# Define parent graph
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()

for chunk in graph.stream(
    {"foo": "foo"},
    stream_mode="updates",
    # Set subgraphs=True to stream outputs from subgraphs
    subgraphs=True,  
):
    print(chunk)
((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})
참고: node 업데이트뿐만 아니라 어떤 graph(또는 subgraph)에서 stream하고 있는지 알려주는 namespace도 받고 있습니다.

Debugging

debug streaming 모드를 사용하여 graph 실행 전반에 걸쳐 가능한 한 많은 정보를 stream합니다. stream된 출력에는 node의 이름과 전체 state가 포함됩니다.
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="debug",  
):
    print(chunk)

LLM token

messages streaming 모드를 사용하여 graph의 모든 부분(node, tool, subgraph 또는 task 포함)에서 Large Language Model (LLM) 출력을 token 단위로 stream합니다. messages 모드에서 stream된 출력은 tuple (message_chunk, metadata)이며, 여기서:
  • message_chunk: LLM의 token 또는 message segment입니다.
  • metadata: graph node 및 LLM 호출에 대한 세부 정보를 포함하는 dictionary입니다.
LLM이 LangChain integration으로 제공되지 않는 경우, 대신 custom 모드를 사용하여 출력을 stream할 수 있습니다. 자세한 내용은 모든 LLM과 함께 사용을 참조하세요.
Python < 3.11에서 async를 위한 수동 config 필요 Python < 3.11에서 async 코드를 사용할 때는 적절한 streaming을 활성화하기 위해 ainvoke()RunnableConfig를 명시적으로 전달해야 합니다. 자세한 내용은 Python < 3.11에서 Async를 참조하거나 Python 3.11+로 업그레이드하세요.
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START


@dataclass
class MyState:
    topic: str
    joke: str = ""


model = init_chat_model(model="openai:gpt-4o-mini")

def call_model(state: MyState):
    """Call the LLM to generate a joke about a topic"""
    # Note that message events are emitted even when the LLM is run using .invoke rather than .stream
    model_response = model.invoke(  
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# The "messages" stream mode returns an iterator of tuples (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for message_chunk, metadata in graph.stream(
    {"topic": "ice cream"},
    stream_mode="messages",  
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)

LLM 호출별로 필터링

LLM 호출과 tags를 연결하여 LLM 호출별로 stream된 token을 필터링할 수 있습니다.
from langchain.chat_models import init_chat_model

# model_1 is tagged with "joke"
model_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke'])
# model_2 is tagged with "poem"
model_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem'])

graph = ... # define a graph that uses these LLMs

# The stream_mode is set to "messages" to stream LLM tokens
# The metadata contains information about the LLM invocation, including the tags
async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="messages",  
):
    # Filter the streamed tokens by the tags field in the metadata to only include
    # the tokens from the LLM invocation with the "joke" tag
    if metadata["tags"] == ["joke"]:
        print(msg.content, end="|", flush=True)
from typing import TypedDict

from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph

# The joke_model is tagged with "joke"
joke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"])
# The poem_model is tagged with "poem"
poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"])


class State(TypedDict):
      topic: str
      joke: str
      poem: str


async def call_model(state, config):
      topic = state["topic"]
      print("Writing joke...")
      # Note: Passing the config through explicitly is required for python < 3.11
      # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
      # The config is passed through explicitly to ensure the context vars are propagated correctly
      # This is required for Python < 3.11 when using async code. Please see the async section for more details
      joke_response = await joke_model.ainvoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}],
            config,
      )
      print("\n\nWriting poem...")
      poem_response = await poem_model.ainvoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}],
            config,
      )
      return {"joke": joke_response.content, "poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(call_model)
      .add_edge(START, "call_model")
      .compile()
)

# The stream_mode is set to "messages" to stream LLM tokens
# The metadata contains information about the LLM invocation, including the tags
async for msg, metadata in graph.astream(
      {"topic": "cats"},
      stream_mode="messages",
):
    if metadata["tags"] == ["joke"]:
        print(msg.content, end="|", flush=True)

Node별로 필터링

특정 node에서만 token을 stream하려면, stream_mode="messages"를 사용하고 stream된 metadata의 langgraph_node 필드로 출력을 필터링하세요:
# The "messages" stream mode returns a tuple of (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for msg, metadata in graph.stream(
    inputs,
    stream_mode="messages",  
):
    # Filter the streamed tokens by the langgraph_node field in the metadata
    # to only include the tokens from the specified node
    if msg.content and metadata["langgraph_node"] == "some_node_name":
        ...
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")


class State(TypedDict):
      topic: str
      joke: str
      poem: str


def write_joke(state: State):
      topic = state["topic"]
      joke_response = model.invoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}]
      )
      return {"joke": joke_response.content}


def write_poem(state: State):
      topic = state["topic"]
      poem_response = model.invoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}]
      )
      return {"poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(write_joke)
      .add_node(write_poem)
      # write both the joke and the poem concurrently
      .add_edge(START, "write_joke")
      .add_edge(START, "write_poem")
      .compile()
)

# The "messages" stream mode returns a tuple of (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for msg, metadata in graph.stream(
    {"topic": "cats"},
    stream_mode="messages",  
):
    # Filter the streamed tokens by the langgraph_node field in the metadata
    # to only include the tokens from the write_poem node
    if msg.content and metadata["langgraph_node"] == "write_poem":
        print(msg.content, end="|", flush=True)

사용자 정의 데이터 stream

LangGraph node 또는 tool 내부에서 사용자 정의 데이터를 전송하려면 다음 단계를 따르세요:
  1. get_stream_writer를 사용하여 stream writer에 액세스하고 사용자 정의 데이터를 emit합니다.
  2. .stream() 또는 .astream()을 호출할 때 stream_mode="custom"을 설정하여 stream에서 사용자 정의 데이터를 가져옵니다. 여러 모드를 결합할 수 있지만(예: ["updates", "custom"]), 최소한 하나는 "custom"이어야 합니다.
Python < 3.11에서 async에서는 get_stream_writer 사용 불가 Python < 3.11에서 실행되는 async 코드에서는 get_stream_writer가 작동하지 않습니다. 대신, node 또는 tool에 writer 매개변수를 추가하고 수동으로 전달하세요. 사용 예제는 Python < 3.11에서 Async를 참조하세요.
  • node
  • tool
from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START

class State(TypedDict):
    query: str
    answer: str

def node(state: State):
    # Get the stream writer to send custom data
    writer = get_stream_writer()
    # Emit a custom key-value pair (e.g., progress update)
    writer({"custom_key": "Generating custom data inside node"})
    return {"answer": "some data"}

graph = (
    StateGraph(State)
    .add_node(node)
    .add_edge(START, "node")
    .compile()
)

inputs = {"query": "example"}

# Set stream_mode="custom" to receive the custom data in the stream
for chunk in graph.stream(inputs, stream_mode="custom"):
    print(chunk)

모든 LLM과 함께 사용

stream_mode="custom"을 사용하여 모든 LLM API에서 데이터를 stream할 수 있습니다 — 해당 API가 LangChain chat model interface를 구현하지 않더라도 가능합니다. 이를 통해 자체 streaming interface를 제공하는 raw LLM client 또는 외부 서비스를 통합할 수 있어, LangGraph를 사용자 정의 설정에 매우 유연하게 만듭니다.
from langgraph.config import get_stream_writer

def call_arbitrary_model(state):
    """Example node that calls an arbitrary model and streams the output"""
    # Get the stream writer to send custom data
    writer = get_stream_writer()  
    # Assume you have a streaming client that yields chunks
    # Generate LLM tokens using your custom streaming client
    for chunk in your_custom_streaming_client(state["topic"]):
        # Use the writer to send custom data to the stream
        writer({"custom_llm_chunk": chunk})  
    return {"result": "completed"}

graph = (
    StateGraph(State)
    .add_node(call_arbitrary_model)
    # Add other nodes and edges as needed
    .compile()
)
# Set stream_mode="custom" to receive the custom data in the stream
for chunk in graph.stream(
    {"topic": "cats"},
    stream_mode="custom",  

):
    # The chunk will contain the custom data streamed from the llm
    print(chunk)
import operator
import json

from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )
    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


# this is our tool
async def get_items(place: str) -> str:
    """Use this tool to list items one might find in a place you're asked about."""
    writer = get_stream_writer()
    response = ""
    async for msg_chunk in stream_tokens(
        model_name,
        [
            {
                "role": "user",
                "content": (
                    "Can you tell me what kind of items "
                    f"i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. "
                    "And include a brief description of each item."
                ),
            }
        ],
    ):
        response += msg_chunk["content"]
        writer(msg_chunk)

    return response


class State(TypedDict):
    messages: Annotated[list[dict], operator.add]


# this is the tool-calling graph node
async def call_tool(state: State):
    ai_message = state["messages"][-1]
    tool_call = ai_message["tool_calls"][-1]

    function_name = tool_call["function"]["name"]
    if function_name != "get_items":
        raise ValueError(f"Tool {function_name} not supported")

    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await get_items(**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}


graph = (
    StateGraph(State)
    .add_node(call_tool)
    .add_edge(START, "call_tool")
    .compile()
)
tool call을 포함하는 AIMessage로 graph를 호출해 봅시다:
inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk["content"], end="|", flush=True)

특정 chat model에 대해 streaming 비활성화

애플리케이션이 streaming을 지원하는 model과 지원하지 않는 model을 혼합하는 경우, streaming을 지원하지 않는 model에 대해 명시적으로 streaming을 비활성화해야 할 수 있습니다. model을 초기화할 때 disable_streaming=True를 설정하세요.
  • init_chat_model
  • chat model interface
from langchain.chat_models import init_chat_model

model = init_chat_model(
    "anthropic:claude-sonnet-4-5",
    # Set disable_streaming=True to disable streaming for the chat model
    disable_streaming=True

)

Python < 3.11에서 Async

Python 버전 < 3.11에서는 asyncio taskcontext 매개변수를 지원하지 않습니다. 이는 LangGraph의 context 자동 전파 기능을 제한하며, LangGraph의 streaming 메커니즘에 두 가지 주요 방식으로 영향을 미칩니다:
  1. async LLM 호출(예: ainvoke())에 RunnableConfig를 명시적으로 전달해야 합니다. callback이 자동으로 전파되지 않기 때문입니다.
  2. async node 또는 tool에서 get_stream_writer를 사용할 수 없습니다writer 인수를 직접 전달해야 합니다.
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain.chat_models import init_chat_model

model = init_chat_model(model="openai:gpt-4o-mini")

class State(TypedDict):
    topic: str
    joke: str

# Accept config as an argument in the async node function
async def call_model(state, config):
    topic = state["topic"]
    print("Generating joke...")
    # Pass config to model.ainvoke() to ensure proper context propagation
    joke_response = await model.ainvoke(  
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        config,
    )
    return {"joke": joke_response.content}

graph = (
    StateGraph(State)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# Set stream_mode="messages" to stream LLM tokens
async for chunk, metadata in graph.astream(
    {"topic": "ice cream"},
    stream_mode="messages",  
):
    if chunk.content:
        print(chunk.content, end="|", flush=True)
from typing import TypedDict
from langgraph.types import StreamWriter

class State(TypedDict):
      topic: str
      joke: str

# Add writer as an argument in the function signature of the async node or tool
# LangGraph will automatically pass the stream writer to the function
async def generate_joke(state: State, writer: StreamWriter):  
      writer({"custom_key": "Streaming custom data while generating a joke"})
      return {"joke": f"This is a joke about {state['topic']}"}

graph = (
      StateGraph(State)
      .add_node(generate_joke)
      .add_edge(START, "generate_joke")
      .compile()
)

# Set stream_mode="custom" to receive the custom data in the stream  #
async for chunk in graph.astream(
      {"topic": "ice cream"},
      stream_mode="custom",
):
      print(chunk)

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.
I