跳转到主要内容

查询追踪

推荐阅读

在深入阅读本文档之前,阅读以下内容可能会有所帮助

注意

如果您希望导出大量的追踪数据,我们建议您使用批量数据导出功能,因为它能更好地处理大量数据,并支持跨分区的自动重试和并行化。

查询运行(LangSmith 追踪中的 span 数据)的推荐方法是使用 SDK 中的 list_runs 方法或 API 中的 /runs/query 端点。

LangSmith 将追踪数据存储在 运行(span)数据格式中指定的简单格式中。

使用过滤器参数

对于简单查询,您不必依赖我们的查询语法。您可以使用 过滤器参数参考中指定的过滤器参数。

前提条件

在运行以下代码片段之前,请初始化客户端。

from langsmith import Client

client = Client()

以下是一些使用关键字参数列出运行的示例

列出项目中的所有运行

project_runs = client.list_runs(project_name="<your_project>")

列出过去 24 小时内的 LLM 和 Chat 运行

todays_llm_runs = client.list_runs(
project_name="<your_project>",
start_time=datetime.now() - timedelta(days=1),
run_type="llm",
)

列出项目中的根运行

根运行是没有父级的运行。这些运行的 is_root 值被赋值为 True。您可以使用此项来过滤根运行。

root_runs = client.list_runs(
project_name="<your_project>",
is_root=True
)

列出没有错误的运行

correct_runs = client.list_runs(project_name="<your_project>", error=False)

按运行 ID 列出运行

忽略其他参数

如果您以上述方式提供运行 ID 列表,它将忽略所有其他过滤参数,如 project_namerun_type 等,并直接返回与给定 ID 匹配的运行。

如果您有运行 ID 列表,您可以直接列出它们

run_ids = ['a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836','9398e6be-964f-4aa4-8ae9-ad78cd4b7074']
selected_runs = client.list_runs(id=run_ids)

使用过滤器查询语言

对于更复杂的查询,您可以使用 过滤器查询语言参考中描述的查询语言。

列出对话线程中的所有根运行

这是获取对话线程中运行的方法。有关设置线程的更多信息,请参阅我们的关于设置线程的操作指南。线程通过设置共享的线程 ID 进行分组。LangSmith UI 允许您使用以下三个元数据键中的任何一个:session_idconversation_idthread_id。以下查询匹配其中任何一个。

group_key = "<your_thread_id>"
filter_string = f'and(in(metadata_key, ["session_id","conversation_id","thread_id"]), eq(metadata_value, "{group_key}"))'
thread_runs = client.list_runs(
project_name="<your_project>",
filter=filter_string,
is_root=True
)

列出所有名为 “extractor” 的运行,其追踪的根被分配了反馈 “user_score” 分数为 1

client.list_runs(
project_name="<your_project>",
filter='eq(name, "extractor")',
trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))'
)

列出 “star_rating” 键的分数大于 4 的运行

client.list_runs(
project_name="<your_project>",
filter='and(eq(feedback_key, "star_rating"), gt(feedback_score, 4))'
)

列出耗时超过 5 秒完成的运行

client.list_runs(project_name="<your_project>", filter='gt(latency, "5s")')

列出所有 total_tokens 大于 5000 的运行

client.list_runs(project_name="<your_project>", filter='gt(total_tokens, 5000)')

列出所有 “error” 不等于 null 的运行

client.list_runs(project_name="<your_project>", filter='neq(error, null)')

列出所有 start_time 大于特定时间戳的运行

client.list_runs(project_name="<your_project>", filter='gt(start_time, "2023-07-15T12:34:56Z")')

列出所有包含字符串 “substring” 的运行

client.list_runs(project_name="<your_project>", filter='search("substring")')

列出所有标记为 git 哈希 “2aa1cf4” 的运行

client.list_runs(project_name="<your_project>", filter='has(tags, "2aa1cf4")')

列出所有类型为 “chain” 且耗时超过 10 秒的运行,并且

total_tokens 大于 5000

client.list_runs(
project_name="<your_project>",
filter='and(eq(run_type, "chain"), gt(latency, 10), gt(total_tokens, 5000))'
)

列出所有在特定时间戳之后启动的运行,并且

“error” 不等于 null 或 “Correctness” 反馈分数等于 0

client.list_runs(
project_name="<your_project>",
filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))'
)

复杂查询:列出所有 tags 包含 “experimental” 或 “beta” 且

latency 大于 2 秒的运行

client.list_runs(
project_name="<your_project>",
filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))'
)

通过全文搜索追踪树 您可以使用 search() 函数,而无需

任何特定字段即可对运行中的所有字符串字段进行全文搜索。这使您可以快速找到与搜索词匹配的追踪。

client.list_runs(
project_name="<your_project>",
filter='search("image classification")'
)

检查元数据的存在

如果您想检查元数据的存在,可以使用 eq 运算符,可以选择与 and 语句一起使用以按值匹配。如果您想记录有关运行的更多结构化信息,这将非常有用。


to_search = {
"user_id": ""
}

# Check for any run with the "user_id" metadata key
client.list_runs(
project_name="default",
filter="eq(metadata_key, 'user_id')"
)
# Check for runs with user_id=4070f233-f61e-44eb-bff1-da3c163895a3
client.list_runs(
project_name="default",
filter="and(eq(metadata_key, 'user_id'), eq(metadata_value, '4070f233-f61e-44eb-bff1-da3c163895a3'))"
)

检查元数据中的环境详细信息。

一种常见的模式是通过元数据将环境信息添加到您的追踪中。如果您想过滤包含环境元数据的运行,可以使用与上述相同的模式

client.list_runs(
project_name="default",
filter="and(eq(metadata_key, 'environment'), eq(metadata_value, 'production'))"
)

检查元数据中的会话 ID

另一种关联同一会话中追踪的常用方法是使用共享的会话 ID。如果您想基于这种方式过滤运行,可以在元数据中搜索该 ID。

client.list_runs(
project_name="default",
filter="and(eq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

对键值对进行负面过滤

您可以使用元数据、输入和输出键值对的负面过滤来从结果中排除特定的运行。以下是一些元数据键值对的示例,但相同的逻辑适用于输入和输出键值对。

# Find all runs where the metadata does not contain a "conversation_id" key
client.list_runs(
project_name="default",
filter="and(neq(metadata_key, 'conversation_id'))"
)

# Find all runs where the conversation_id in metadata is not "a1b2c3d4-e5f6-7890"

client.list_runs(
project_name="default",
filter="and(eq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where there is no "conversation_id" metadata key and the "a1b2c3d4-e5f6-7890" value is not present

client.list_runs(
project_name="default",
filter="and(neq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where the conversation_id metadata key is not present but the "a1b2c3d4-e5f6-7890" value is present

client.list_runs(
project_name="default",
filter="and(neq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

组合多个过滤器

如果您想组合多个条件来优化搜索,可以使用 and 运算符以及其他过滤函数。以下是如何搜索名为 “ChatOpenAI” 且在其元数据中具有特定 conversation_id 的运行

client.list_runs(
project_name="default",
filter="and(eq(name, 'ChatOpenAI'), eq(metadata_key, 'conversation_id'), eq(metadata_value, '69b12c91-b1e2-46ce-91de-794c077e8151'))"
)

树形过滤器

列出所有名为 “RetrieveDocs” 的运行,其根运行具有 “user_score” 反馈为 1,并且完整追踪中的任何运行都名为 “ExpandQuery”。

如果您想提取特定的运行,并以追踪中达到的各种状态或步骤为条件,则此类查询非常有用。

client.list_runs(
project_name="<your_project>",
filter='eq(name, "RetrieveDocs")',
trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
tree_filter='eq(name, "ExpandQuery")'
)

高级:导出包含子工具使用情况的扁平化追踪视图

以下 Python 示例演示了如何导出追踪的扁平化视图,包括有关代理在每个追踪中使用的工具(来自嵌套运行)的信息。这可以用于分析您的代理在多个追踪中的行为。

此示例查询指定天数内的所有工具运行,并按其父级(根)运行 ID 对其进行分组。然后,它获取每个根运行的相关信息,例如运行名称、输入、输出,并将该信息与子运行信息结合起来。

为了优化查询,该示例

  1. 在查询工具运行时仅选择必要的字段,以减少查询时间。
  2. 在并发处理工具运行的同时,批量获取根运行。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "my-project"
num_days = 30

# List all tool runs
tool_runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(days=num_days),
run_type="tool",
# We don't need to fetch inputs, outputs, and other values that # may increase the query time
select=["trace_id", "name", "run_type"],
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

tool_runs_by_parent = defaultdict(lambda: defaultdict(set))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
# Group tool runs by parent run ID
for run in tqdm(tool_runs):
# Collect all tools invoked within a given trace
tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name)
# maybe send a batch of parent run IDs to the server
# this lets us query for the root runs in batches
# while still processing the tool runs
if len(tool_runs_by_parent) % trace_batch_size == 0:
if this_batch := list(tool_runs_by_parent.keys())[
trace_cursor : trace_cursor + trace_batch_size
]:
trace_cursor += trace_batch_size
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=["name", "inputs", "outputs", "run_type"],
)
)
if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]:
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=["name", "inputs", "outputs", "run_type"],
)
)

for future in tqdm(futures):
root_runs = future.result()
for root_run in root_runs:
root_data = tool_runs_by_parent[root_run.id]
data.append(
{
"run_id": root_run.id,
"run_name": root_run.name,
"run_type": root_run.run_type,
"inputs": root_run.inputs,
"outputs": root_run.outputs,
"tools_involved": list(root_data["tools_involved"]),
}
)

# (Optional): Convert to a pandas DataFrame

import pandas as pd

df = pd.DataFrame(data)
df.head()

高级:导出带有反馈的追踪的检索器 IO

如果您想根据检索器行为微调嵌入或诊断端到端系统性能问题,此查询非常有用。以下 Python 示例演示了如何在具有特定反馈分数的追踪中导出检索器输入和输出。

from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "your-project-name"
num_days = 1

# List all tool runs
retriever_runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(days=num_days),
run_type="retriever",
# This time we do want to fetch the inputs and outputs, since they
# may be adjusted by query expansion steps.
select=["trace_id", "name", "run_type", "inputs", "outputs"],
trace_filter='eq(feedback_key, "user_score")',
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

retriever_runs_by_parent = defaultdict(lambda: defaultdict(list))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
# Group retriever runs by parent run ID
for run in tqdm(retriever_runs):
# Collect all retriever calls invoked within a given trace
for k, v in run.inputs.items():
retriever_runs_by_parent[run.trace_id][f"retriever.inputs.{k}"].append(v)
for k, v in (run.outputs or {}).items():
# Extend the docs
retriever_runs_by_parent[run.trace_id][f"retriever.outputs.{k}"].extend(v)
# maybe send a batch of parent run IDs to the server
# this lets us query for the root runs in batches
# while still processing the retriever runs
if len(retriever_runs_by_parent) % trace_batch_size == 0:
if this_batch := list(retriever_runs_by_parent.keys())[
trace_cursor : trace_cursor + trace_batch_size
]:
trace_cursor += trace_batch_size
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=[
"name",
"inputs",
"outputs",
"run_type",
"feedback_stats",
],
)
)
if this_batch := list(retriever_runs_by_parent.keys())[trace_cursor:]:
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=["name", "inputs", "outputs", "run_type"],
)
)

for future in tqdm(futures):
root_runs = future.result()
for root_run in root_runs:
root_data = retriever_runs_by_parent[root_run.id]
feedback = {
f"feedback.{k}": v.get("avg")
for k, v in (root_run.feedback_stats or {}).items()
}
inputs = {f"inputs.{k}": v for k, v in root_run.inputs.items()}
outputs = {f"outputs.{k}": v for k, v in (root_run.outputs or {}).items()}
data.append(
{
"run_id": root_run.id,
"run_name": root_run.name,
**inputs,
**outputs,
**feedback,
**root_data,
}
)

# (Optional): Convert to a pandas DataFrame
import pandas as pd
df = pd.DataFrame(data)
df.head()

此页是否对您有帮助?


您可以留下详细的反馈 在 GitHub 上.