跳到主要内容

查询追踪

推荐阅读

在深入了解本内容之前,阅读以下资料可能会有所帮助

注意

如果您希望导出大量追踪数据,我们建议您使用批量数据导出功能,因为它能更好地处理大量数据,并支持自动重试和跨分区并行化。

查询运行(LangSmith 追踪中的 span 数据)的推荐方法是使用 SDK 中的 list_runs 方法或 API 中的 /runs/query 端点。

LangSmith 以一种简单格式存储追踪数据,该格式在运行(span)数据格式中指定。

使用筛选参数

对于简单查询,您无需依赖我们的查询语法。您可以使用筛选参数参考中指定的筛选参数。

先决条件

在运行以下代码片段之前初始化客户端。

from langsmith import Client

client = Client()

以下是一些使用关键字参数列出运行的示例

列出项目中所有运行

project_runs = client.list_runs(project_name="<your_project>")

列出过去 24 小时内的 LLM 和聊天运行

todays_llm_runs = client.list_runs(
project_name="<your_project>",
start_time=datetime.now() - timedelta(days=1),
run_type="llm",
)

列出项目中的根运行

根运行是没有父级的运行。它们被分配 is_root 值为 True。您可以使用此值来筛选根运行。

root_runs = client.list_runs(
project_name="<your_project>",
is_root=True
)

列出没有错误的运行

correct_runs = client.list_runs(project_name="<your_project>", error=False)

按运行 ID 列出运行

忽略其他参数

如果您按照上述方式提供运行 ID 列表,它将忽略所有其他筛选参数,例如 project_namerun_type 等,并直接返回与给定 ID 匹配的运行。

如果您有运行 ID 列表,可以直接列出它们

run_ids = ['a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836','9398e6be-964f-4aa4-8ae9-ad78cd4b7074']
selected_runs = client.list_runs(id=run_ids)

使用筛选查询语言

对于更复杂的查询,您可以使用筛选查询语言参考中描述的查询语言。

列出对话线程中的所有根运行

这是获取对话线程中运行的方法。有关设置线程的更多信息,请参阅我们的关于设置线程的操作指南。线程通过设置共享线程 ID 进行分组。LangSmith UI 允许您使用以下三个元数据键中的任何一个:session_idconversation_idthread_id。以下查询匹配其中任何一个。

group_key = "<your_thread_id>"
filter_string = f'and(in(metadata_key, ["session_id","conversation_id","thread_id"]), eq(metadata_value, "{group_key}"))'
thread_runs = client.list_runs(
project_name="<your_project>",
filter=filter_string,
is_root=True
)

列出所有名为“extractor”的运行,其追踪根被分配了反馈“user_score”分数为 1

client.list_runs(
project_name="<your_project>",
filter='eq(name, "extractor")',
trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))'
)

列出“star_rating”键的值大于 4 的运行

client.list_runs(
project_name="<your_project>",
filter='and(eq(feedback_key, "star_rating"), gt(feedback_score, 4))'
)

列出完成时间超过 5 秒的运行

client.list_runs(project_name="<your_project>", filter='gt(latency, "5s")')

列出所有 total_tokens 大于 5000 的运行

client.list_runs(project_name="<your_project>", filter='gt(total_tokens, 5000)')

列出所有“error”不为 null 的运行

client.list_runs(project_name="<your_project>", filter='neq(error, null)')

列出所有 start_time 大于特定时间戳的运行

client.list_runs(project_name="<your_project>", filter='gt(start_time, "2023-07-15T12:34:56Z")')

列出所有包含字符串“substring”的运行

client.list_runs(project_name="<your_project>", filter='search("substring")')

列出所有标记有 Git 哈希值“2aa1cf4”的运行

client.list_runs(project_name="<your_project>", filter='has(tags, "2aa1cf4")')

列出所有耗时超过 10 秒的“chain”类型运行,并且

total_tokens 大于 5000

client.list_runs(
project_name="<your_project>",
filter='and(eq(run_type, "chain"), gt(latency, 10), gt(total_tokens, 5000))'
)

列出所有在特定时间戳后开始的运行,并且

“error”不为 null 或“Correctness”反馈分数为 0

client.list_runs(
project_name="<your_project>",
filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))'
)

复杂查询:列出所有 tags 包含“experimental”或“beta”且

latency 大于 2 秒的运行

client.list_runs(
project_name="<your_project>",
filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))'
)

按全文搜索追踪树 您可以使用 search() 函数,无需

任何特定字段即可对运行中的所有字符串字段进行全文搜索。这使您能够快速找到与搜索词匹配的追踪。

client.list_runs(
project_name="<your_project>",
filter='search("image classification")'
)

检查元数据是否存在

如果您想检查元数据是否存在,可以使用 eq 运算符,并可选地结合 and 语句进行值匹配。这对于记录更结构化的运行信息很有用。


to_search = {
"user_id": ""
}

# Check for any run with the "user_id" metadata key
client.list_runs(
project_name="default",
filter="eq(metadata_key, 'user_id')"
)
# Check for runs with user_id=4070f233-f61e-44eb-bff1-da3c163895a3
client.list_runs(
project_name="default",
filter="and(eq(metadata_key, 'user_id'), eq(metadata_value, '4070f233-f61e-44eb-bff1-da3c163895a3'))"
)

检查元数据中的环境详细信息。

一种常见模式是通过元数据向追踪添加环境信息。如果您想筛选包含环境元数据的运行,可以使用与上述相同的模式

client.list_runs(
project_name="default",
filter="and(eq(metadata_key, 'environment'), eq(metadata_value, 'production'))"
)

检查元数据中的对话 ID

另一种关联同一对话中追踪的常见方法是使用共享对话 ID。如果您想通过这种方式基于对话 ID 筛选运行,可以在元数据中搜索该 ID。

client.list_runs(
project_name="default",
filter="and(eq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

键值对的负向筛选

您可以使用对元数据、输入和输出键值对的负向筛选,以从结果中排除特定运行。以下是元数据键值对的一些示例,但相同的逻辑适用于输入和输出键值对。

# Find all runs where the metadata does not contain a "conversation_id" key
client.list_runs(
project_name="default",
filter="and(neq(metadata_key, 'conversation_id'))"
)

# Find all runs where the conversation_id in metadata is not "a1b2c3d4-e5f6-7890"

client.list_runs(
project_name="default",
filter="and(eq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where there is no "conversation_id" metadata key and the "a1b2c3d4-e5f6-7890" value is not present

client.list_runs(
project_name="default",
filter="and(neq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where the conversation_id metadata key is not present but the "a1b2c3d4-e5f6-7890" value is present

client.list_runs(
project_name="default",
filter="and(neq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

组合多个筛选器

如果您想组合多个条件来细化搜索,可以使用 and 运算符以及其他筛选函数。以下是您如何搜索名为“ChatOpenAI”且其元数据中包含特定 conversation_id 的运行示例

client.list_runs(
project_name="default",
filter="and(eq(name, 'ChatOpenAI'), eq(metadata_key, 'conversation_id'), eq(metadata_value, '69b12c91-b1e2-46ce-91de-794c077e8151'))"
)

树形筛选器

列出所有名为“RetrieveDocs”的运行,其根运行具有“user_score”反馈评分为 1,并且完整追踪中的任何运行名为“ExpandQuery”。

如果您想根据追踪中达到的各种状态或步骤来提取特定运行,这种类型的查询很有用。

client.list_runs(
project_name="<your_project>",
filter='eq(name, "RetrieveDocs")',
trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
tree_filter='eq(name, "ExpandQuery")'
)

高级:导出包含子工具使用情况的扁平化追踪视图

以下 Python 示例演示了如何导出追踪的扁平化视图,包括代理在每个追踪中使用的工具(来自嵌套运行)信息。这可用于分析代理在多个追踪中的行为。

此示例查询指定天数内的所有工具运行,并按其父(根)运行 ID 进行分组。然后,它获取每个根运行的相关信息,例如运行名称、输入、输出,并将该信息与子运行信息相结合。

为了优化查询,该示例

  1. 在查询工具运行时仅选择必要的字段以减少查询时间。
  2. 在并发处理工具运行的同时批量获取根运行。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "my-project"
num_days = 30

# List all tool runs
tool_runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(days=num_days),
run_type="tool",
# We don't need to fetch inputs, outputs, and other values that # may increase the query time
select=["trace_id", "name", "run_type"],
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

tool_runs_by_parent = defaultdict(lambda: defaultdict(set))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
# Group tool runs by parent run ID
for run in tqdm(tool_runs):
# Collect all tools invoked within a given trace
tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name)
# maybe send a batch of parent run IDs to the server
# this lets us query for the root runs in batches
# while still processing the tool runs
if len(tool_runs_by_parent) % trace_batch_size == 0:
if this_batch := list(tool_runs_by_parent.keys())[
trace_cursor : trace_cursor + trace_batch_size
]:
trace_cursor += trace_batch_size
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=["name", "inputs", "outputs", "run_type"],
)
)
if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]:
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=["name", "inputs", "outputs", "run_type"],
)
)

for future in tqdm(futures):
root_runs = future.result()
for root_run in root_runs:
root_data = tool_runs_by_parent[root_run.id]
data.append(
{
"run_id": root_run.id,
"run_name": root_run.name,
"run_type": root_run.run_type,
"inputs": root_run.inputs,
"outputs": root_run.outputs,
"tools_involved": list(root_data["tools_involved"]),
}
)

# (Optional): Convert to a pandas DataFrame

import pandas as pd

df = pd.DataFrame(data)
df.head()

高级:导出带有反馈的追踪的检索器 IO

如果您想根据检索器行为微调嵌入或诊断端到端系统性能问题,此查询很有用。以下 Python 示例演示了如何在具有特定反馈分数的追踪中导出检索器输入和输出。

from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "your-project-name"
num_days = 1

# List all tool runs
retriever_runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(days=num_days),
run_type="retriever",
# This time we do want to fetch the inputs and outputs, since they
# may be adjusted by query expansion steps.
select=["trace_id", "name", "run_type", "inputs", "outputs"],
trace_filter='eq(feedback_key, "user_score")',
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

retriever_runs_by_parent = defaultdict(lambda: defaultdict(list))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
# Group retriever runs by parent run ID
for run in tqdm(retriever_runs):
# Collect all retriever calls invoked within a given trace
for k, v in run.inputs.items():
retriever_runs_by_parent[run.trace_id][f"retriever.inputs.{k}"].append(v)
for k, v in (run.outputs or {}).items():
# Extend the docs
retriever_runs_by_parent[run.trace_id][f"retriever.outputs.{k}"].extend(v)
# maybe send a batch of parent run IDs to the server
# this lets us query for the root runs in batches
# while still processing the retriever runs
if len(retriever_runs_by_parent) % trace_batch_size == 0:
if this_batch := list(retriever_runs_by_parent.keys())[
trace_cursor : trace_cursor + trace_batch_size
]:
trace_cursor += trace_batch_size
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=[
"name",
"inputs",
"outputs",
"run_type",
"feedback_stats",
],
)
)
if this_batch := list(retriever_runs_by_parent.keys())[trace_cursor:]:
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=["name", "inputs", "outputs", "run_type"],
)
)

for future in tqdm(futures):
root_runs = future.result()
for root_run in root_runs:
root_data = retriever_runs_by_parent[root_run.id]
feedback = {
f"feedback.{k}": v.get("avg")
for k, v in (root_run.feedback_stats or {}).items()
}
inputs = {f"inputs.{k}": v for k, v in root_run.inputs.items()}
outputs = {f"outputs.{k}": v for k, v in (root_run.outputs or {}).items()}
data.append(
{
"run_id": root_run.id,
"run_name": root_run.name,
**inputs,
**outputs,
**feedback,
**root_data,
}
)

# (Optional): Convert to a pandas DataFrame
import pandas as pd
df = pd.DataFrame(data)
df.head()

此页面有帮助吗?


您可以提供详细反馈 在 GitHub 上.