권장 읽기 자료이 내용을 살펴보기 전에 다음 문서를 읽어보시면 도움이 됩니다:
대량의 trace를 내보내려는 경우, 대량 데이터 내보내기 기능을 사용하는 것을 권장합니다. 이 기능은 대용량 데이터를 더 잘 처리하며 파티션 간 자동 재시도 및 병렬화를 지원합니다.
run(LangSmith trace의 span 데이터)을 쿼리하는 권장 방법은 SDK의 list_runs 메서드 또는 API의 /runs/query endpoint를 사용하는 것입니다. LangSmith는 Run (span) 데이터 형식에 명시된 간단한 형식으로 trace를 저장합니다.

filter argument 사용하기

간단한 쿼리의 경우 쿼리 구문에 의존할 필요가 없습니다. filter argument 참조에 명시된 filter argument를 사용할 수 있습니다.
사전 요구사항아래 코드 스니펫을 실행하기 전에 client를 초기화하세요.
from langsmith import Client

client = Client()
다음은 keyword argument를 사용하여 run을 나열하는 몇 가지 예시입니다:

프로젝트의 모든 run 나열하기

project_runs = client.list_runs(project_name="<your_project>")

최근 24시간 내의 LLM 및 Chat run 나열하기

todays_llm_runs = client.list_runs(
    project_name="<your_project>",
    start_time=datetime.now() - timedelta(days=1),
    run_type="llm",
)

프로젝트의 root run 나열하기

Root run은 부모가 없는 run입니다. 이들은 is_rootTrue 값이 할당됩니다. 이를 사용하여 root run을 필터링할 수 있습니다.
root_runs = client.list_runs(
    project_name="<your_project>",
    is_root=True
)

오류가 없는 run 나열하기

correct_runs = client.list_runs(project_name="<your_project>", error=False)

run ID로 run 나열하기

다른 Argument 무시위에서 설명한 방식으로 run ID 목록을 제공하면 project_name, run_type 등과 같은 다른 모든 필터링 argument를 무시하고 주어진 ID와 일치하는 run을 직접 반환합니다.
run ID 목록이 있는 경우 직접 나열할 수 있습니다:
run_ids = ['a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836','9398e6be-964f-4aa4-8ae9-ad78cd4b7074']
selected_runs = client.list_runs(id=run_ids)

filter 쿼리 언어 사용하기

더 복잡한 쿼리의 경우 filter 쿼리 언어 참조에 설명된 쿼리 언어를 사용할 수 있습니다.

대화 thread의 모든 root run 나열하기

이것은 대화 thread에서 run을 가져오는 방법입니다. thread 설정에 대한 자세한 내용은 thread 설정 방법 가이드를 참조하세요. Thread는 공유 thread ID를 설정하여 그룹화됩니다. LangSmith UI에서는 다음 세 가지 metadata key 중 하나를 사용할 수 있습니다: session_id, conversation_id, 또는 thread_id. session ID는 tracing project ID라고도 합니다. 다음 쿼리는 이들 중 하나와 일치합니다.
group_key = "<your_thread_id>"
filter_string = f'and(in(metadata_key, ["session_id","conversation_id","thread_id"]), eq(metadata_value, "{group_key}"))'
thread_runs = client.list_runs(
    project_name="<your_project>",
    filter=filter_string,
    is_root=True
)

trace의 root에 “user_score” feedback 점수 1이 할당된 “extractor”라는 모든 run 나열하기

client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "extractor")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))'
)

점수가 4보다 큰 “star_rating” key를 가진 run 나열하기

client.list_runs(
    project_name="<your_project>",
    filter='and(eq(feedback_key, "star_rating"), gt(feedback_score, 4))'
)

완료하는 데 5초 이상 걸린 run 나열하기

client.list_runs(project_name="<your_project>", filter='gt(latency, "5s")')

”error”가 null이 아닌 모든 run 나열하기

client.list_runs(project_name="<your_project>", filter='neq(error, null)')

start_time이 특정 timestamp보다 큰 모든 run 나열하기

client.list_runs(project_name="<your_project>", filter='gt(start_time, "2023-07-15T12:34:56Z")')

”substring” 문자열을 포함하는 모든 run 나열하기

client.list_runs(project_name="<your_project>", filter='search("substring")')

git hash “2aa1cf4”로 태그된 모든 run 나열하기

client.list_runs(project_name="<your_project>", filter='has(tags, "2aa1cf4")')

특정 timestamp 이후에 시작되었고 “error”가 null이 아니거나 “Correctness” feedback 점수가 0인 모든 run 나열하기

client.list_runs(
  project_name="<your_project>",
  filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))'
)

복잡한 쿼리: tag에 “experimental” 또는 “beta”가 포함되고 latency가 2초보다 큰 모든 run 나열하기

client.list_runs(
  project_name="<your_project>",
  filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))'
)

전체 텍스트로 trace tree 검색하기

특정 필드 없이 search() function을 사용하여 run의 모든 문자열 필드에 대해 전체 텍스트 검색을 수행할 수 있습니다. 이를 통해 검색어와 일치하는 trace를 빠르게 찾을 수 있습니다.
client.list_runs(
  project_name="<your_project>",
  filter='search("image classification")'
)

metadata 존재 여부 확인하기

metadata의 존재 여부를 확인하려면 eq operator를 사용할 수 있으며, 선택적으로 and 문을 사용하여 값으로 일치시킬 수 있습니다. 이는 run에 대한 보다 구조화된 정보를 기록하려는 경우 유용합니다.
to_search = {
    "user_id": ""
}

# Check for any run with the "user_id" metadata key
client.list_runs(
  project_name="default",
  filter="eq(metadata_key, 'user_id')"
)
# Check for runs with user_id=4070f233-f61e-44eb-bff1-da3c163895a3
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'user_id'), eq(metadata_value, '4070f233-f61e-44eb-bff1-da3c163895a3'))"
)

metadata의 환경 세부 정보 확인하기

일반적인 패턴은 metadata를 통해 trace에 환경 정보를 추가하는 것입니다. 환경 metadata를 포함하는 run을 필터링하려면 위와 동일한 패턴을 사용할 수 있습니다:
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'environment'), eq(metadata_value, 'production'))"
)

metadata의 conversation ID 확인하기

동일한 대화에서 trace를 연결하는 또 다른 일반적인 방법은 공유 conversation ID를 사용하는 것입니다. 이러한 방식으로 conversation ID를 기반으로 run을 필터링하려면 metadata에서 해당 ID를 검색할 수 있습니다.
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

key-value 쌍에 대한 부정 필터링

metadata, input, output key-value 쌍에 대해 부정 필터링을 사용하여 결과에서 특정 run을 제외할 수 있습니다. 다음은 metadata key-value 쌍에 대한 몇 가지 예시이지만 동일한 논리가 input 및 output key-value 쌍에도 적용됩니다.
# Find all runs where the metadata does not contain a "conversation_id" key
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'))"
)

# Find all runs where the conversation_id in metadata is not "a1b2c3d4-e5f6-7890"
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where there is no "conversation_id" metadata key and the "a1b2c3d4-e5f6-7890" value is not present
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where the conversation_id metadata key is not present but the "a1b2c3d4-e5f6-7890" value is present
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

여러 filter 결합하기

검색을 세분화하기 위해 여러 조건을 결합하려면 다른 필터링 function과 함께 and operator를 사용할 수 있습니다. 다음은 “ChatOpenAI”라는 이름의 run을 검색하고 metadata에 특정 conversation_id가 있는 run을 찾는 방법입니다:
client.list_runs(
  project_name="default",
  filter="and(eq(name, 'ChatOpenAI'), eq(metadata_key, 'conversation_id'), eq(metadata_value, '69b12c91-b1e2-46ce-91de-794c077e8151'))"
)

Tree Filter

root run에 “user_score” feedback이 1이고 전체 trace의 어떤 run이 “ExpandQuery”라는 이름을 가진 “RetrieveDocs”라는 모든 run을 나열합니다. 이러한 유형의 쿼리는 trace 내에서 다양한 상태나 단계에 도달한 것을 조건으로 특정 run을 추출하려는 경우 유용합니다.
client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "RetrieveDocs")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
    tree_filter='eq(name, "ExpandQuery")'
)

고급: 하위 tool 사용을 포함한 평면화된 trace view 내보내기

다음 Python 예제는 각 trace 내에서 agent가 사용한 tool(중첩된 run에서)에 대한 정보를 포함하여 평면화된 trace view를 내보내는 방법을 보여줍니다. 이는 여러 trace에서 agent의 동작을 분석하는 데 사용할 수 있습니다. 이 예제는 지정된 일수 내의 모든 tool run을 쿼리하고 부모(root) run ID별로 그룹화합니다. 그런 다음 각 root run에 대한 관련 정보(run 이름, input, output 등)를 가져오고 해당 정보를 하위 run 정보와 결합합니다. 쿼리를 최적화하기 위해 이 예제는:
  1. tool run을 쿼리할 때 필요한 필드만 선택하여 쿼리 시간을 줄입니다.
  2. tool run을 동시에 처리하면서 root run을 배치로 가져옵니다.
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "my-project"
num_days = 30

# List all tool runs
tool_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="tool",
    # We don't need to fetch inputs, outputs, and other values that # may increase the query time
    select=["trace_id", "name", "run_type"],
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

tool_runs_by_parent = defaultdict(lambda: defaultdict(set))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
    # Group tool runs by parent run ID
    for run in tqdm(tool_runs):
        # Collect all tools invoked within a given trace
        tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name)
        # maybe send a batch of parent run IDs to the server
        # this lets us query for the root runs in batches
        # while still processing the tool runs
        if len(tool_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(tool_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=["name", "inputs", "outputs", "run_type"],
                    )
                )
    if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = tool_runs_by_parent[root_run.id]
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                "run_type": root_run.run_type,
                "inputs": root_run.inputs,
                "outputs": root_run.outputs,
                "tools_involved": list(root_data["tools_involved"]),
            }
        )

# (Optional): Convert to a pandas DataFrame
import pandas as pd

df = pd.DataFrame(data)
df.head()

고급: feedback이 있는 trace에 대한 retriever IO 내보내기

이 쿼리는 embedding을 미세 조정하거나 retriever 동작을 기반으로 end-to-end 시스템 성능 문제를 진단하려는 경우 유용합니다. 다음 Python 예제는 특정 feedback 점수가 있는 trace 내에서 retriever input 및 output을 내보내는 방법을 보여줍니다.
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "your-project-name"
num_days = 1

# List all tool runs
retriever_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="retriever",
    # This time we do want to fetch the inputs and outputs, since they
    # may be adjusted by query expansion steps.
    select=["trace_id", "name", "run_type", "inputs", "outputs"],
    trace_filter='eq(feedback_key, "user_score")',
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

retriever_runs_by_parent = defaultdict(lambda: defaultdict(list))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
    # Group retriever runs by parent run ID
    for run in tqdm(retriever_runs):
        # Collect all retriever calls invoked within a given trace
        for k, v in run.inputs.items():
            retriever_runs_by_parent[run.trace_id][f"retriever.inputs.{k}"].append(v)
        for k, v in (run.outputs or {}).items():
            # Extend the docs
            retriever_runs_by_parent[run.trace_id][f"retriever.outputs.{k}"].extend(v)
        # maybe send a batch of parent run IDs to the server
        # this lets us query for the root runs in batches
        # while still processing the retriever runs
        if len(retriever_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(retriever_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=[
                            "name",
                            "inputs",
                            "outputs",
                            "run_type",
                            "feedback_stats",
                        ],
                    )
                )
    if this_batch := list(retriever_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = retriever_runs_by_parent[root_run.id]
        feedback = {
            f"feedback.{k}": v.get("avg")
            for k, v in (root_run.feedback_stats or {}).items()
        }
        inputs = {f"inputs.{k}": v for k, v in root_run.inputs.items()}
        outputs = {f"outputs.{k}": v for k, v in (root_run.outputs or {}).items()}
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                **inputs,
                **outputs,
                **feedback,
                **root_data,
            }
        )

# (Optional): Convert to a pandas DataFrame
import pandas as pd
df = pd.DataFrame(data)
df.head()

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.
I