aevaluate#
- async langsmith.evaluation._arunner.aevaluate(target: ATARGET_T | AsyncIterable[dict] | Runnable | str | uuid.UUID | schemas.TracerSession, /, data: DATA_T | AsyncIterable[schemas.Example] | Iterable[schemas.Example] | None = None, evaluators: Sequence[EVALUATOR_T | AEVALUATOR_T] | None = None, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] | None = None, metadata: dict | None = None, experiment_prefix: str | None = None, description: str | None = None, max_concurrency: int | None = 0, num_repetitions: int = 1, client: langsmith.Client | None = None, blocking: bool = True, experiment: schemas.TracerSession | str | uuid.UUID | None = None, upload_results: bool = True, **kwargs: Any) AsyncExperimentResults [source]#
在给定数据集上评估异步目标系统。
- 参数:
target (AsyncCallable[[dict], dict] | AsyncIterable[dict] | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]) – 要评估的目标系统或实验。可以是接受字典并返回字典的异步函数、langchain Runnable、现有实验 ID 或实验 ID 的二元组。
data (Union[DATA_T, AsyncIterable[schemas.Example]]) – 要评估的数据集。可以是数据集名称、示例列表、示例的异步生成器或示例的异步可迭代对象。
evaluators (Optional[Sequence[EVALUATOR_T]]) – 要在每个示例上运行的评估器列表。默认为 None。
summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]) – 要在整个数据集上运行的摘要评估器列表。默认为 None。
metadata (Optional[dict]) – 要附加到实验的元数据。默认为 None。
experiment_prefix (Optional[str]) – 为您的实验名称提供的前缀。默认为 None。
description (Optional[str]) – 实验的描述。
max_concurrency (int | None) – 要运行的最大并发评估数。如果为 None,则不设置限制。如果为 0,则不并发。默认为 0。
num_repetitions (int) – 运行评估的次数。数据集中的每个项目将运行并评估此多次。默认为 1。
client (Optional[langsmith.Client]) – 要使用的 LangSmith 客户端。默认为 None。
blocking (bool) – 是否阻塞直到评估完成。默认为 True。
experiment (Optional[schemas.TracerSession]) – 要扩展的现有实验。如果提供,则忽略 experiment_prefix。仅用于高级用法。
load_nested – 是否加载实验的所有子运行。默认设置为仅加载顶层根运行。仅应在评估现有实验时指定。
upload_results (bool)
kwargs (Any)
- 返回:
实验结果的异步迭代器。
- 返回类型:
AsyncIterator[ExperimentResultRow]
- 环境
- LANGSMITH_TEST_CACHE: 如果设置,API 调用将被缓存到磁盘以节省时间和
测试期间的成本。建议将缓存文件提交到您的存储库以加快 CI/CD 运行。需要安装 ‘langsmith[vcr]’ 包。
示例
>>> from typing import Sequence >>> from langsmith import Client, aevaluate >>> from langsmith.schemas import Example, Run >>> client = Client() >>> dataset = client.clone_public_dataset( ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d" ... ) >>> dataset_name = "Evaluate Examples"
基本用法
>>> def accuracy(run: Run, example: Example): ... # Row-level evaluator for accuracy. ... pred = run.outputs["output"] ... expected = example.outputs["answer"] ... return {"score": expected.lower() == pred.lower()}
>>> def precision(runs: Sequence[Run], examples: Sequence[Example]): ... # Experiment-level evaluator for precision. ... # TP / (TP + FP) ... predictions = [run.outputs["output"].lower() for run in runs] ... expected = [example.outputs["answer"].lower() for example in examples] ... # yes and no are the only possible answers ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"]) ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)]) ... return {"score": tp / (tp + fp)}
>>> import asyncio >>> async def apredict(inputs: dict) -> dict: ... # This can be any async function or just an API call to your app. ... await asyncio.sleep(0.1) ... return {"output": "Yes"} >>> results = asyncio.run( ... aevaluate( ... apredict, ... data=dataset_name, ... evaluators=[accuracy], ... summary_evaluators=[precision], ... experiment_prefix="My Experiment", ... description="Evaluate the accuracy of the model asynchronously.", ... metadata={ ... "my-prompt-version": "abcd-1234", ... }, ... ) ... ) View the evaluation results for experiment:...
仅使用异步生成器评估示例的子集
>>> async def example_generator(): ... examples = client.list_examples(dataset_name=dataset_name, limit=5) ... for example in examples: ... yield example >>> results = asyncio.run( ... aevaluate( ... apredict, ... data=example_generator(), ... evaluators=[accuracy], ... summary_evaluators=[precision], ... experiment_prefix="My Subset Experiment", ... description="Evaluate a subset of examples asynchronously.", ... ) ... ) View the evaluation results for experiment:...
流式传输每个预测,以便更轻松 + 更快速地调试。
>>> results = asyncio.run( ... aevaluate( ... apredict, ... data=dataset_name, ... evaluators=[accuracy], ... summary_evaluators=[precision], ... experiment_prefix="My Streaming Experiment", ... description="Streaming predictions for debugging.", ... blocking=False, ... ) ... ) View the evaluation results for experiment:...
>>> async def aenumerate(iterable): ... async for elem in iterable: ... print(elem) >>> asyncio.run(aenumerate(results))
在没有并发的情况下运行
>>> results = asyncio.run( ... aevaluate( ... apredict, ... data=dataset_name, ... evaluators=[accuracy], ... summary_evaluators=[precision], ... experiment_prefix="My Experiment Without Concurrency", ... description="This was run without concurrency.", ... max_concurrency=0, ... ) ... ) View the evaluation results for experiment:...
使用异步评估器
>>> async def helpfulness(run: Run, example: Example): ... # Row-level evaluator for helpfulness. ... await asyncio.sleep(5) # Replace with your LLM API call ... return {"score": run.outputs["output"] == "Yes"}
>>> results = asyncio.run( ... aevaluate( ... apredict, ... data=dataset_name, ... evaluators=[helpfulness], ... summary_evaluators=[precision], ... experiment_prefix="My Helpful Experiment", ... description="Applying async evaluators example.", ... ) ... ) View the evaluation results for experiment:...
在版本 0.2.0 中更改: ‘max_concurrency’ 默认值从 None(对并发没有限制)更新为 0(完全没有并发)。