traceable#

langsmith.run_helpers.traceable(func: Callable[[P], R]) SupportsLangsmithExtra[P, R][source]#
langsmith.run_helpers.traceable(run_type: Literal['tool', 'chain', 'llm', 'retriever', 'embedding', 'prompt', 'parser'] = 'chain', *, name: str | None = None, metadata: Mapping[str, Any] | None = None, tags: List[str] | None = None, client: Client | None = None, reduce_fn: Callable[[Sequence], dict] | None = None, project_name: str | None = None, process_inputs: Callable[[dict], dict] | None = None, process_outputs: Callable[[...], dict] | None = None, process_chunk: Callable | None = None, _invocation_params_fn: Callable[[dict], dict] | None = None, dangerously_allow_filesystem: bool = False) Callable[[Callable[[P], R]], SupportsLangsmithExtra[P, R]]

使用 LangSmith 追踪函数。

参数:
  • run_type – 要创建的运行(span)类型。示例:llm、chain、tool、prompt、retriever 等。默认为“chain”。

  • name – 运行的名称。默认为函数名称。

  • metadata – 要添加到运行的元数据。默认为 None。

  • tags – 要添加到运行的标签。默认为 None。

  • client – 用于将运行日志记录到 LangSmith 的客户端。默认为 None,这将使用默认客户端。

  • reduce_fn – 如果函数返回生成器,则用于减少函数输出的函数。默认为 None,这意味着这些值将记录为列表。注意:如果迭代器永远不会耗尽(例如,函数返回无限生成器),则永远不会调用此函数,并且运行本身将卡在挂起状态。

  • project_name – 用于记录运行的项目名称。默认为 None,这将使用默认项目。

  • process_inputs – 输入的自定义序列化/处理函数。默认为 None。

  • process_outputs – 输出的自定义序列化/处理函数。默认为 None。

  • dangerously_allow_filesystem

    是否允许文件系统访问附件。默认为 False。

    引用本地文件路径的追踪将被上传到 LangSmith。一般来说,网络托管的应用程序不应使用此功能,因为引用的文件通常在用户的机器上,而不是主机上。

返回:

被装饰的函数。

返回类型:

Union[Callable, Callable[[Callable], Callable]]

注意

  • 要求在环境中将 LANGSMITH_TRACING_V2 设置为 ‘true’。

示例

基本用法

@traceable
def my_function(x: float, y: float) -> float:
    return x + y


my_function(5, 6)


@traceable
async def my_async_function(query_params: dict) -> dict:
    async with httpx.AsyncClient() as http_client:
        response = await http_client.get(
            "https://api.example.com/data",
            params=query_params,
        )
        return response.json()


asyncio.run(my_async_function({"param": "value"}))

使用生成器流式传输数据

@traceable
def my_generator(n: int) -> Iterable:
    for i in range(n):
        yield i


for item in my_generator(5):
    print(item)

异步流式传输数据

@traceable
async def my_async_generator(query_params: dict) -> Iterable:
    async with httpx.AsyncClient() as http_client:
        response = await http_client.get(
            "https://api.example.com/data",
            params=query_params,
        )
        for item in response.json():
            yield item


async def async_code():
    async for item in my_async_generator({"param": "value"}):
        print(item)


asyncio.run(async_code())

指定运行类型和名称

@traceable(name="CustomName", run_type="tool")
def another_function(a: float, b: float) -> float:
    return a * b


another_function(5, 6)

使用自定义元数据和标签进行日志记录

@traceable(
    metadata={"version": "1.0", "author": "John Doe"}, tags=["beta", "test"]
)
def tagged_function(x):
    return x**2


tagged_function(5)

指定自定义客户端和项目名称

custom_client = Client(api_key="your_api_key")


@traceable(client=custom_client, project_name="My Special Project")
def project_specific_function(data):
    return data


project_specific_function({"data": "to process"})

手动传递 langsmith_extra

@traceable
def manual_extra_function(x):
    return x**2


manual_extra_function(5, langsmith_extra={"metadata": {"version": "1.0"}})