Skip to content
Go back

Agent 可观测性 (Observability)

Edit page

tags: [LLM, Agent, Observability, Monitoring, Tracing] created: 2026-05-22

Agent 可观测性 (Observability)

核心问题:当 Agent 出错时,能在 5 分钟内定位到是哪个环节的问题吗?
LLM 系统的非确定性使得传统监控手段不够用——需要专门的 Trace 体系。


一、为什么 LLM 系统的可观测性更难

传统服务LLM Agent 系统
输入输出确定输出非确定,同一输入结果可能不同
失败有明确异常”幻觉”不会抛异常,但结果是错的
延迟稳定TTFT/TPS 波动大,受 KV Cache 影响
成本固定Token 消耗随输入/输出长度变化
调用链简单Thought→Tool→Observation 多层嵌套

二、Span 数据模型

graph TD
    A["AgentRun Span<br/>(整次 Agent 执行)"]
    A --> B["LLM Call Span<br/>(Thought 生成)"]
    A --> C["Tool Call Span<br/>(search_web)"]
    A --> D["LLM Call Span<br/>(最终回答生成)"]
    C --> E["HTTP Request Span<br/>(实际网络请求)"]

    style A fill:#4a9eff,color:#fff
    style B fill:#7bc8a4,color:#fff
    style C fill:#f4a261,color:#fff
    style D fill:#7bc8a4,color:#fff
    style E fill:#e76f51,color:#fff
import time
import uuid
import json
from dataclasses import dataclass, field
from typing import Any
from enum import Enum
from contextlib import contextmanager


class SpanType(str, Enum):
    AGENT_RUN = "agent_run"
    LLM_CALL = "llm_call"
    TOOL_CALL = "tool_call"
    RETRIEVAL = "retrieval"
    RERANK = "rerank"
    HTTP_REQUEST = "http_request"


@dataclass
class Span:
    """单个追踪 Span(代表一次操作的记录)"""
    span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    parent_id: str | None = None
    trace_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
    span_type: SpanType = SpanType.LLM_CALL
    name: str = ""
    start_time: float = field(default_factory=time.monotonic)
    end_time: float | None = None
    duration_ms: float | None = None
    # LLM 相关
    model: str | None = None
    prompt_tokens: int = 0
    completion_tokens: int = 0
    total_tokens: int = 0
    # 输入输出(可选,可能包含敏感信息,生产中可关闭)
    input_preview: str = ""   # 截断后的输入预览
    output_preview: str = ""  # 截断后的输出预览
    # 工具相关
    tool_name: str | None = None
    tool_args: dict = field(default_factory=dict)
    # 状态
    status: str = "ok"   # ok | error | timeout
    error_msg: str | None = None
    # 自定义标签(用于过滤和聚合)
    tags: dict[str, str] = field(default_factory=dict)

    def finish(self, status: str = "ok", error: str | None = None):
        self.end_time = time.monotonic()
        self.duration_ms = (self.end_time - self.start_time) * 1000
        self.status = status
        self.error_msg = error

    def to_dict(self) -> dict:
        return {
            "span_id": self.span_id,
            "trace_id": self.trace_id,
            "parent_id": self.parent_id,
            "type": self.span_type.value,
            "name": self.name,
            "duration_ms": round(self.duration_ms or 0, 2),
            "tokens": self.total_tokens,
            "status": self.status,
            "error": self.error_msg,
            "tags": self.tags,
        }

三、AgentTracer — 完整追踪实现

import threading
from collections import defaultdict


class AgentTracer:
    """
    轻量级 Agent 追踪器
    
    线程安全,支持嵌套 Span(通过 thread-local 维护当前 Span 栈)
    生产中可对接 OpenTelemetry 或 LangSmith
    """

    def __init__(self):
        self._spans: list[Span] = []
        self._lock = threading.Lock()
        self._local = threading.local()  # 线程本地变量,存储当前 Span 栈

    def _current_span(self) -> Span | None:
        stack = getattr(self._local, "stack", [])
        return stack[-1] if stack else None

    @contextmanager
    def span(
        self,
        name: str,
        span_type: SpanType = SpanType.LLM_CALL,
        **tags,
    ):
        """上下文管理器:自动记录 Span 的开始和结束"""
        parent = self._current_span()
        s = Span(
            parent_id=parent.span_id if parent else None,
            trace_id=parent.trace_id if parent else str(uuid.uuid4())[:16],
            span_type=span_type,
            name=name,
            tags={k: str(v) for k, v in tags.items()},
        )

        # 入栈
        if not hasattr(self._local, "stack"):
            self._local.stack = []
        self._local.stack.append(s)

        try:
            yield s
            s.finish(status="ok")
        except Exception as e:
            s.finish(status="error", error=str(e))
            raise
        finally:
            # 出栈,记录 Span
            self._local.stack.pop()
            with self._lock:
                self._spans.append(s)

    def record_llm_call(
        self,
        span: Span,
        model: str,
        prompt_tokens: int,
        completion_tokens: int,
        input_text: str = "",
        output_text: str = "",
    ):
        """记录 LLM 调用的详细信息"""
        span.model = model
        span.prompt_tokens = prompt_tokens
        span.completion_tokens = completion_tokens
        span.total_tokens = prompt_tokens + completion_tokens
        span.input_preview = input_text[:200]   # 截断,避免日志过大
        span.output_preview = output_text[:200]

    def get_trace(self, trace_id: str) -> list[Span]:
        """获取一次 Agent 执行的完整 Trace"""
        with self._lock:
            return [s for s in self._spans if s.trace_id == trace_id]

    def print_trace(self, trace_id: str):
        """打印可读的 Trace 树(调试用)"""
        spans = self.get_trace(trace_id)
        if not spans:
            print("未找到 Trace")
            return

        print(f"\n{'='*60}")
        print(f"Trace: {trace_id}")
        for span in sorted(spans, key=lambda s: s.start_time):
            indent = "  " if span.parent_id else ""
            status_emoji = "" if span.status == "ok" else ""
            token_info = f"[{span.total_tokens}tok]" if span.total_tokens > 0 else ""
            print(
                f"{indent}{status_emoji} [{span.span_type.value}] {span.name} "
                f"- {span.duration_ms:.0f}ms {token_info}"
            )
            if span.error_msg:
                print(f"{indent}  └─ 错误: {span.error_msg}")
        print('='*60)


# 全局 Tracer 实例
tracer = AgentTracer()

四、@trace_span 装饰器

import functools
from openai import OpenAI

client = OpenAI()


def trace_span(name: str = "", span_type: SpanType = SpanType.LLM_CALL, **static_tags):
    """
    自动追踪装饰器
    在函数前后自动创建/结束 Span,记录执行时间和异常
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            span_name = name or func.__name__
            with tracer.span(span_name, span_type=span_type, **static_tags) as span:
                return func(*args, span=span, **kwargs)
        return wrapper
    return decorator


# 使用示例
@trace_span("llm_generate", span_type=SpanType.LLM_CALL)
def generate_with_tracing(prompt: str, span: Span = None) -> str:
    """带追踪的 LLM 调用"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
    )
    reply = response.choices[0].message.content

    # 记录 token 信息
    if span:
        tracer.record_llm_call(
            span=span,
            model="gpt-4o-mini",
            prompt_tokens=response.usage.prompt_tokens,
            completion_tokens=response.usage.completion_tokens,
            input_text=prompt,
            output_text=reply,
        )
    return reply

五、Token 成本追踪器

from dataclasses import dataclass, field
from threading import Lock


# 各模型定价(美元/1M tokens)
MODEL_PRICING = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-opus-4": {"input": 15.00, "output": 75.00},
    "claude-sonnet-4": {"input": 3.00, "output": 15.00},
    "qwen-max": {"input": 0.04, "output": 0.12},  # 阿里云百炼
}


@dataclass
class TokenCostTracker:
    """
    Token 消耗与成本追踪器
    线程安全,支持多模型、多会话的成本汇总
    """
    budget_usd: float = 10.0   # 预算上限(美元)
    alert_threshold: float = 0.8  # 达到 80% 预算时告警

    _records: list[dict] = field(default_factory=list)
    _total_cost: float = field(default=0.0, init=False)
    _lock: Lock = field(default_factory=Lock, init=False)

    def record(
        self,
        model: str,
        prompt_tokens: int,
        completion_tokens: int,
        session_id: str = "default",
    ) -> float:
        """记录一次调用的 Token 消耗,返回本次成本(美元)"""
        pricing = MODEL_PRICING.get(model, {"input": 1.0, "output": 3.0})
        cost = (
            prompt_tokens * pricing["input"] / 1_000_000
            + completion_tokens * pricing["output"] / 1_000_000
        )

        with self._lock:
            self._total_cost += cost
            self._records.append({
                "model": model,
                "prompt_tokens": prompt_tokens,
                "completion_tokens": completion_tokens,
                "cost_usd": cost,
                "session_id": session_id,
                "timestamp": time.time(),
            })

        # 预算告警
        if self._total_cost >= self.budget_usd * self.alert_threshold:
            print(
                f"⚠ 成本告警:已消耗 ${self._total_cost:.4f} "
                f"({self._total_cost/self.budget_usd:.0%} 预算)"
            )
        if self._total_cost >= self.budget_usd:
            raise BudgetExceededError(
                f"已超出预算上限 ${self.budget_usd}!当前消耗 ${self._total_cost:.4f}"
            )

        return cost

    def summary(self) -> dict:
        """生成成本汇总报告"""
        with self._lock:
            by_model: dict[str, dict] = defaultdict(lambda: {"cost": 0, "tokens": 0, "calls": 0})
            for r in self._records:
                m = r["model"]
                by_model[m]["cost"] += r["cost_usd"]
                by_model[m]["tokens"] += r["prompt_tokens"] + r["completion_tokens"]
                by_model[m]["calls"] += 1

            return {
                "total_cost_usd": round(self._total_cost, 6),
                "budget_usd": self.budget_usd,
                "budget_used_pct": f"{self._total_cost/self.budget_usd:.1%}",
                "total_calls": len(self._records),
                "by_model": dict(by_model),
            }

    def print_summary(self):
        s = self.summary()
        print(f"\n💰 成本报告")
        print(f"  总消耗: ${s['total_cost_usd']:.6f} / ${s['budget_usd']} ({s['budget_used_pct']})")
        for model, data in s["by_model"].items():
            print(f"  {model}: ${data['cost']:.6f} ({data['calls']} 次, {data['tokens']} tokens)")


class BudgetExceededError(Exception):
    pass


# 全局成本追踪器
cost_tracker = TokenCostTracker(budget_usd=5.0)

六、失败归因分析器

@dataclass
class FailureReport:
    trace_id: str
    root_cause_type: str   # intent_error | retrieval_miss | tool_failure | llm_hallucination
    failed_span: str
    analysis: str
    recommendations: list[str]


def analyze_failure(trace_id: str) -> FailureReport | None:
    """
    分析失败的 Trace,自动归因
    
    归因逻辑:
    1. 找出所有 status=error 的 Span
    2. 按 Span 类型判断根因类别
    3. 给出改进建议
    """
    spans = tracer.get_trace(trace_id)
    failed_spans = [s for s in spans if s.status == "error"]

    if not failed_spans:
        return None  # 没有明显失败,可能是幻觉(需要 Eval 检测)

    # 按时间排序,找最早的失败
    first_failure = min(failed_spans, key=lambda s: s.start_time)

    # 根据失败 Span 类型归因
    cause_map = {
        SpanType.RETRIEVAL: (
            "retrieval_miss",
            "检索阶段未能找到相关文档",
            ["检查 Embedding 模型是否匹配语言", "增加 BM25 混合检索", "扩大 Top-K 召回数量"],
        ),
        SpanType.TOOL_CALL: (
            "tool_failure",
            f"工具 '{first_failure.tool_name}' 调用失败",
            ["检查工具参数格式", "增加工具调用重试", "验证外部服务可用性"],
        ),
        SpanType.LLM_CALL: (
            "llm_error",
            "LLM 调用失败",
            ["检查 API Key 和 Rate Limit", "增加请求重试(指数退避)", "切换备用模型"],
        ),
    }

    span_type = first_failure.span_type
    if span_type in cause_map:
        cause_type, analysis, recommendations = cause_map[span_type]
    else:
        cause_type = "unknown"
        analysis = f"未知错误:{first_failure.error_msg}"
        recommendations = ["查看完整 Trace 日志", "联系 on-call 工程师"]

    return FailureReport(
        trace_id=trace_id,
        root_cause_type=cause_type,
        failed_span=first_failure.name,
        analysis=analysis,
        recommendations=recommendations,
    )


# 测试
def test_failure_analysis():
    with tracer.span("agent_run", span_type=SpanType.AGENT_RUN) as root_span:
        trace_id = root_span.trace_id
        with tracer.span("search_docs", span_type=SpanType.RETRIEVAL) as s:
            s.finish(status="error", error="向量数据库连接超时")

    report = analyze_failure(trace_id)
    if report:
        print(f"\n🔍 故障归因报告")
        print(f"  类型: {report.root_cause_type}")
        print(f"  失败环节: {report.failed_span}")
        print(f"  分析: {report.analysis}")
        print(f"  建议:")
        for rec in report.recommendations:
            print(f"    - {rec}")

test_failure_analysis()

七、关键 Metrics 滑动窗口统计

from collections import deque
import statistics


class SlidingWindowMetrics:
    """
    滑动时间窗口内的指标统计
    用于实时监控 TTFT、TPS、成功率等
    """

    def __init__(self, window_seconds: float = 60.0):
        self.window = window_seconds
        self._ttft_records: deque[tuple[float, float]] = deque()   # (timestamp, ms)
        self._tps_records: deque[tuple[float, float]] = deque()    # (timestamp, tokens/s)
        self._success_records: deque[tuple[float, bool]] = deque() # (timestamp, success)
        self._lock = Lock()

    def _cleanup(self, records: deque):
        """移除窗口外的旧数据"""
        cutoff = time.monotonic() - self.window
        while records and records[0][0] < cutoff:
            records.popleft()

    def record_request(self, ttft_ms: float, tps: float, success: bool):
        now = time.monotonic()
        with self._lock:
            self._ttft_records.append((now, ttft_ms))
            self._tps_records.append((now, tps))
            self._success_records.append((now, success))

    def get_stats(self) -> dict:
        with self._lock:
            for records in [self._ttft_records, self._tps_records, self._success_records]:
                self._cleanup(records)

            ttft_values = [v for _, v in self._ttft_records]
            tps_values = [v for _, v in self._tps_records]
            success_values = [v for _, v in self._success_records]

        def safe_stats(values):
            if not values:
                return {"p50": 0, "p95": 0, "p99": 0, "mean": 0}
            sorted_v = sorted(values)
            n = len(sorted_v)
            return {
                "p50": sorted_v[int(n * 0.5)],
                "p95": sorted_v[int(n * 0.95)],
                "p99": sorted_v[int(n * 0.99)],
                "mean": statistics.mean(sorted_v),
            }

        return {
            "window_seconds": self.window,
            "request_count": len(success_values),
            "success_rate": sum(success_values) / len(success_values) if success_values else 0,
            "ttft_ms": safe_stats(ttft_values),
            "tps": safe_stats(tps_values),
        }

    def print_dashboard(self):
        stats = self.get_stats()
        print(f"\n📊 实时监控(最近 {stats['window_seconds']}s)")
        print(f"  请求数: {stats['request_count']}")
        print(f"  成功率: {stats['success_rate']:.1%}")
        print(f"  TTFT(ms): p50={stats['ttft_ms']['p50']:.0f} | p95={stats['ttft_ms']['p95']:.0f}")
        print(f"  TPS: p50={stats['tps']['p50']:.1f} | p95={stats['tps']['p95']:.1f}")


metrics = SlidingWindowMetrics(window_seconds=60.0)
# 模拟记录
for i in range(10):
    metrics.record_request(ttft_ms=150 + i*10, tps=30 + i, success=i != 7)
metrics.print_dashboard()

八、告警规则速查表

指标告警阈值严重级别处理策略
TTFT p99> 5000msCritical检查 KV Cache 命中率;扩容推理实例
成功率< 95%Critical触发熔断;切换备用模型
Tool 调用成功率< 90%Warning检查外部服务;增加重试逻辑
Faithfulness< 0.7Warning检索质量下降;Review Chunking 策略
Token 成本/小时> 预算 20%Warning检查是否有异常请求;优化 Prompt 长度
Context 长度 p95> 80% max_tokensWarning触发摘要压缩;检查 Sliding Window 配置
并发数> Semaphore 上限 80%Info考虑扩容;调整限流参数
错误率(连续 5 次)100%Critical触发熔断器;规则引擎降级

← [[agent-05-infra-performance]] | 返回索引 → [[agent-engineering-system]]


Edit page
Share this post on:

Next Post
How to configure AstroPaper theme