前言

运维工作的核心矛盾从来不是"会不会",而是"快不快"。

一个拥有 500 人规模的研发团队,企业云盘里存着十几 TB 的数据,某天凌晨两点,监控系统报警:“部分文件上传失败”。你翻开Wiki,找不到这个案例的解决方案;你翻聊天记录,上次遇到类似问题的同事已经离职。你开始一个个日志文件打开,一行行grep,眼睛酸胀,心里发慌。

这不是技术问题,这是知识管理问题

本文记录我们如何用 LangChain + 向量数据库构建一套运维知识库,并在此基础上实现自动化故障诊断。整个系统经历了三次大版本迭代,冷启动、检索精度、响应速度每个坑都踩过一遍,现在稳定运行 8 个月,日均处理故障工单 200+ 条。


一、整体架构:从 RAG 到故障自愈

1.1 为什么是 RAG 而不是微调

故障诊断场景有几个鲜明特点:

  • 领域专精:大量私有协议、私有部署环境,通用 LLM 根本不懂
  • 时效性强:告警规则、系统配置每年都会变,不能每次都重训模型
  • 容错要求高:宁可少说,不能说错——误判故障原因可能引发更大事故
  • 知识源头分散:日志在 ELK,配置在 Consul,变更记录在 JIRA,知识从来不在一个地方

综合以上,我们选择了 RAG(检索增强生成) 架构,而不是微调。RAG 的优势在于:

用户问题 → 检索相似历史案例/文档 → LLM 结合上下文回答

这样做的好处是:知识可以独立更新,不需要重训模型,回答可溯源。

1.2 系统架构图

┌─────────────────────────────────────────────────────────────────┐
│                        用户交互层                                │
│                   (飞书机器人 / Web 界面)                       │
└─────────────────────────┬───────────────────────────────────────┘
                          │
┌─────────────────────────▼───────────────────────────────────────┐
│                      意图识别层                                  │
│              (判断:查询/诊断/预警/知识库)                        │
└─────────────────────────┬───────────────────────────────────────┘
                          │
┌─────────────────────────▼───────────────────────────────────────┐
│                      LangChain 编排层                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────────────┐  │
│  │检索链    │  │诊断链    │  │预警链    │  │多轮对话链        │  │
│  └──────────┘  └──────────┘  └──────────┘  └──────────────────────┘  │
└─────────────────────────┬───────────────────────────────────────┘
                          │
┌─────────────────────────▼───────────────────────────────────────┐
│                      知识检索层                                   │
│  ┌────────────────┐         ┌────────────────────────────────┐   │
│  │向量数据库      │         │  结构化数据源                    │   │
│  │(Milvus/Pinecone│         │  (MySQL/Elasticsearch/Redis)   │   │
│  └────────────────┘         └────────────────────────────────┘   │
└─────────────────────────┬───────────────────────────────────────┘
                          │
┌─────────────────────────▼───────────────────────────────────────┐
│                      知识注入层                                   │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────────────┐  │
│  │历史工单  │  │系统文档  │  │变更记录  │  │专家经验(标注数据)│  │
│  └──────────┘  └──────────┘  └──────────┘  └──────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

1.3 故障自愈工作流

告警触发
    │
    ▼
异常检测(日志时序分析)
    │
    ├── 置信度 < 0.7 → 人工确认
    │
    └── 置信度 ≥ 0.7 → 自动诊断
              │
              ▼
         根因分析(LangChain ReAct)
              │
              ▼
         方案生成(带操作步骤)
              │
              ├── 自动修复(重启服务/切流量/清理磁盘)
              │
              └── 复杂问题 → 推送值班工程师
                           │
                           ▼
                      修复验证
                           │
                           ▼
                   更新知识库(正负反馈)

二、知识库搭建:从冷启动到持续运营

2.1 冷启动:数据从哪来

冷启动最大的坑是:团队往往有很多"隐式知识",散落在工程师的聊天记录、邮件、工单评语里,没人整理成文档。

我们采用"三源汇聚"策略:

源1:历史工单(最宝贵)

# 从工单系统拉取历史故障数据
def extract_tickets(start_date: str, end_date: str) -> list[dict]:
    """
    工单数据结构:
    {
        "ticket_id": "INC-2024-0312",
        "title": "文件上传失败,错误码 1003",
        "description": "用户反馈上传大于100MB的文件时失败",
        "resolution": "更新了 nginx 的 client_max_body_size 参数",
        "root_cause": "nginx 默认限制了 1MB",
        "category": "上传模块",
        "created_at": "2024-03-12T14:23:00",
        "resolved_at": "2024-03-12T15:45:00",
        "engineer": "张工"
    }
    """
    tickets = jira_client.search_issues(
        f'project=Ops AND created >= {start_date} AND created <= {end_date}',
        maxResults=5000
    )
    return [format_ticket(t) for t in tickets]

源2:系统文档(最规范但最稀疏)
很多团队的运维文档是"部署时写一次,之后再也没更新过"。我们用文档解析 + 主动更新的方式处理:

from langchain.document_loaders import DirectoryLoader, PDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

def load_documents(doc_path: str):
    """文档分割策略:按标题层级,保留上下文"""
    loader = DirectoryLoader(
        doc_path,
        glob="**/*.md",
        loader_cls=TextLoader,
        silent_errors=True
    )
    documents = loader.load()
    
    # 分割策略:保留技术文档的层级结构
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=800,
        chunk_overlap=100,
        separators=["\n## ", "\n### ", "\n\n", "\n", " "]
    )
    return text_splitter.split_documents(documents)

源3:专家经验(最稀缺)
这是最容易忽略但最有价值的部分。我们采用"师徒制"方式:每次故障复盘,要求工程师写出"如果是你师父,你会怎么想这个问题",标注到工单系统里。

2.2 向量数据库选型

数据库 适用场景 优点 缺点
Milvus 超大规模、追求性能 性能最强,支持分布式 运维复杂
Qdrant 中等规模、易部署 Rust 实现,内存安全,API 友好 生态稍弱
Chroma 原型验证 轻量,Python 原生 不适合生产
Pinecone 云原生、全托管 免运维 成本高,数据不出网

我们最终选了 Qdrant,原因:单节点部署简单,支持过滤条件检索,Rust 实现没有 GC 停顿问题影响 LLM 响应速度。

from langchain.vectorstores import Qdrant
from langchain.embeddings import OpenAIEmbeddings

def build_vectorstore(documents, collection_name: str = "ops_knowledge"):
    embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
    
    vectorstore = Qdrant.from_documents(
        documents,
        embeddings,
        path="/data/qdrant_storage",  # 持久化路径
        collection_name=collection_name,
        metadata_schema={
            "category": {"type": "keyword"},
            "severity": {"type": "keyword"},
            "created_at": {"type": "datetime"},
        }
    )
    return vectorstore

2.3 知识库持续运营机制

知识库不是建完就完了的。我们设计了每日自动更新机制:

# 每日增量更新知识库
DAILY_UPDATE_PROMPT = """
你是一个运维知识库管理员,负责判断以下工单内容是否值得入库。

判断标准:
1. 是否包含故障根因分析?
2. 是否包含可复现的操作步骤?
3. 是否有通用性(同类型问题是否可能再次发生)?

如果满足以上任意两条,请返回 JSON 格式:
{{
    "should_index": true,
    "summary": "一句话总结这个故障",
    "keywords": ["关键词1", "关键词2", "关键词3"],
    "category": "网络/存储/计算/安全/其他"
}}
"""

def daily_update():
    tickets = extract_tickets(
        start_date=(datetime.now() - timedelta(days=1)).isoformat(),
        end_date=datetime.now().isoformat()
    )
    
    for ticket in tickets:
        # LLM 判断是否入库
        decision = llm.invoke(DAILY_UPDATE_PROMPT.format(ticket=ticket))
        if decision.should_index:
            # 写入知识库
            vectorstore.add_texts(
                texts=[ticket.description + "\n" + ticket.resolution],
                metadatas=[{
                    "ticket_id": ticket.id,
                    "category": decision.category,
                    "severity": ticket.severity
                }]
            )

三、LangChain 检索链设计

3.1 基础检索链

最简单的 RAG 实现:

from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

# 基础检索 QA 链
qa_prompt = PromptTemplate(
    template="""你是一个运维助手,基于以下知识库内容回答用户问题。

知识库内容:
{context}

用户问题:{question}

回答要求:
1. 如果知识库中有相关内容,给出具体操作建议
2. 如果没有相关内容,明确告知用户"知识库中暂无相关信息,建议联系值班工程师"
3. 避免编造知识库中没有的内容
4. 如果涉及重大操作(如删除数据、重启服务),提示用户二次确认

回答:""",
    input_variables=["context", "question"]
)

def build_basic_qa_chain(vectorstore):
    llm = ChatOpenAI(model="gpt-4", temperature=0.3)
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
        return_source_documents=True,
        chain_type_kwargs={"prompt": qa_prompt}
    )
    return qa_chain

3.2 带过滤条件的检索链

运维场景有个特点:不同产品线、不同模块的问题不能混淆。一次"文件上传失败",可能是 A 产品线的问题,也可能是 B 产品线的问题,混在一起检索会降低精度。

def build_filtered_retriever(vectorstore, product_line: str = None, module: str = None):
    """带过滤条件的检索器"""
    search_kwargs = {"k": 5}
    
    if product_line or module:
        search_kwargs["filter"] = {
            "must": []
        }
        if product_line:
            search_kwargs["filter"]["must"].append({
                "key": "product_line",
                "match": {"value": product_line}
            })
        if module:
            search_kwargs["filter"]["must"].append({
                "key": "module",
                "match": {"value": module}
            })
    
    return vectorstore.as_retriever(
        search_kwargs=search_kwargs,
        search_type="similarity_score_threshold",
        search_kwargs={"score_threshold": 0.75}
    )

3.3 HyDE 检索:让问法归一

用户的问题是口语化的,"上传挂了"和"文件上传接口返回 500"其实是一回事。我们用 HyDE(Hypothetical Document Embeddings) 来处理这个问题:

from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

hyde_prompt = PromptTemplate(
    template="""将以下用户问题改写成技术文档风格的描述。

用户问题:{question}

要求:
- 补充省略的主语和宾语
- 使用标准技术术语
- 包含可能的错误现象描述
- 保持原意不变

改写结果:""",
    input_variables=["question"]
)

def build_hyde_chain(llm):
    return LLMChain(llm=llm, prompt=hyde_prompt)

# HyDE 工作流
# 1. 用户问题 → HyDE 生成"假设文档"
# 2. 用假设文档去检索,而不是用原始问题
# 3. 假设文档的 embedding 和真实文档更接近,召回率更高

hyde_chain = build_hyde_chain(ChatOpenAI(model="gpt-4", temperature=0))

def hyde_retrieve(question: str, vectorstore):
    # Step 1: 生成假设文档
    hypothetical_doc = hyde_chain.run(question)
    
    # Step 2: 用假设文档检索
    docs = vectorstore.similarity_search(hypothetical_doc, k=5)
    
    return docs, hypothetical_doc

四、自动化故障诊断:从检测到自愈

4.1 日志异常检测

日志异常检测是整个故障自愈的触发器。我们采用 时序分析 + LLM 判断 的混合方案:

import numpy as np
from collections import deque

class LogAnomalyDetector:
    def __init__(self, window_size: int = 100, threshold: float = 3.0):
        self.window_size = window_size
        self.threshold = threshold
        self.error_counts = deque(maxlen=window_size)
        self.baseline = None  # 基线错误率
        
    def fit(self, historical_logs: list[dict]):
        """用历史数据建立基线"""
        error_rates = [log["error_rate"] for log in historical_logs]
        self.baseline = {
            "mean": np.mean(error_rates),
            "std": np.std(error_rates),
            "p95": np.percentile(error_rates, 95),
            "p99": np.percentile(error_rates, 99)
        }
        
    def detect(self, current_log: dict) -> dict:
        """检测当前日志是否异常"""
        error_rate = current_log["error_rate"]
        
        # Z-score 检测
        if self.baseline:
            z_score = (error_rate - self.baseline["mean"]) / max(self.baseline["std"], 0.001)
            is_zscore_anomaly = abs(z_score) > self.threshold
        else:
            is_zscore_anomaly = False
        
        # 绝对值检测(错误率突增到 10% 以上)
        is_absolute_anomaly = error_rate > 0.1  # 10%
        
        # 综合判断
        is_anomaly = is_zscore_anomaly or is_absolute_anomaly
        
        return {
            "is_anomaly": is_anomaly,
            "error_rate": error_rate,
            "z_score": z_score if self.baseline else None,
            "confidence": min(abs(z_score) / self.threshold, 1.0) if is_zscore_anomaly else 0.8
        }

# 与 LLM 结合做二次判断
def llm_second_verdict(log_context: str, detector_confidence: float) -> str:
    """用 LLM 对可疑日志做二次确认"""
    if detector_confidence < 0.7:
        prompt = f"""
以下是一段系统日志上下文,请判断是否发生了真实故障:

{log_context}

请分析:
1. 这个错误是偶发的还是系统性的?
2. 是否需要人工介入?

回答格式:
is_real_fault: true/false
confidence: 0.0-1.0
reasoning: 简要原因
"""
        response = llm.invoke(prompt)
        return response
    return {"is_real_fault": True, "confidence": detector_confidence}

4.2 根因分析链(ReAct 模式)

故障诊断的核心是找到根因,不是表面现象。我们用 LangChain 的 ReAct(Reasoning + Acting)模式:

from langchain.agents import initialize_agent, Tool
from langchain.agents.agent_types import AgentType

# 定义诊断工具
diagnostic_tools = [
    Tool(
        name="check_disk",
        func=lambda host: check_disk_usage(host),
        description="检查磁盘使用情况,返回各分区使用率和 inode 情况"
    ),
    Tool(
        name="check_network",
        func=lambda host: check_network_status(host),
        description="检查网络连通性、端口监听、连接数"
    ),
    Tool(
        name="check_process",
        func=lambda proc: check_process_health(proc),
        description="检查进程状态、内存占用、CPU 使用率、OOM 次数"
    ),
    Tool(
        name="check_logs",
        func=lambda (host, minutes): tail_logs(host, minutes),
        description=f"查看最近 N 分钟的系统日志,用于定位异常"
    ),
    Tool(
        name="check_service_health",
        func=lambda service: get_service_health(service),
        description="查询服务的健康检查状态和最近告警"
    ),
]

# 诊断 Prompt
DIAGNOSTIC_PROMPT = """你是一个资深运维工程师,负责诊断企业云盘故障。

背景信息:
{context}

故障现象:{symptom}

你的任务是:
1. 分析可能的根因
2. 按优先级执行诊断工具
3. 每执行一步都要思考"这个结果指向什么根因"
4. 最终给出根因结论和修复建议

输出格式:
思考过程:[你的分析过程]
诊断工具调用:[工具名: 参数]
工具返回:[结果]
根因判断:[根因描述]
修复建议:[具体操作步骤]

注意:
- 优先排查最可能的原因(资源问题 > 代码问题 > 网络问题)
- 每次只调用一个工具,等结果再分析
- 如果工具调用三次仍无法定位,直接输出"需要人工介入"
"""

def build_diagnostic_chain(llm, tools):
    agent = initialize_agent(
        tools,
        llm,
        agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
        verbose=True,
        max_iterations=10,
        handle_parsing_errors=True
    )
    
    diagnostic_prompt = DIAGNOSTIC_PROMPT.format(
        context="企业云盘系统",
        symptom="{symptom}"
    )
    
    return agent

# 使用示例
diagnostic_agent = build_diagnostic_chain(ChatOpenAI(model="gpt-4", temperature=0), diagnostic_tools)
result = diagnostic_agent.run({
    "input": "某用户反馈上传大于 100MB 的文件时提示失败,但小于 10MB 的文件上传正常"
})

4.3 自动修复与验证

对于高置信度的简单故障,系统直接执行修复:

class AutoHealer:
    # 修复策略映射(经验沉淀)
    FIX_STRATEGIES = {
        ("upload", "size_limit"): {
            "action": "update_config",
            "target": "nginx/client_max_body_size",
            "value": "500m",
            "verify": lambda: test_upload(100)
        },
        ("disk", "full"): {
            "action": "cleanup",
            "targets": ["/tmp", "/var/log"],
            "threshold_gb": 10,
            "verify": lambda: check_disk("/")
        },
        ("process", "oom"): {
            "action": "restart_service",
            "verify": lambda: check_service_health("bablebird")
        },
        ("network", "port_exhausted"): {
            "action": "increase_port_range",
            "verify": lambda: check_network_status()
        }
    }
    
    def __init__(self, diagnostic_chain):
        self.diagnostic_chain = diagnostic_chain
        self.execution_log = []
        
    def auto_heal(self, symptom: str, context: dict) -> dict:
        # Step 1: 诊断
        diagnosis = self.diagnostic_chain.run(symptom)
        root_cause = diagnosis.get("root_cause_key", None)  # 如 ("upload", "size_limit")
        
        if not root_cause or root_cause not in self.FIX_STRATEGIES:
            return {"status": "manual_required", "reason": "未匹配到自动修复策略"}
        
        strategy = self.FIX_STRATEGIES[root_cause]
        
        # Step 2: 执行
        try:
            self._execute_strategy(strategy)
        except Exception as e:
            return {"status": "failed", "error": str(e)}
        
        # Step 3: 验证
        verify_result = strategy["verify"]()
        if verify_result.get("success"):
            return {
                "status": "auto_fixed",
                "strategy": strategy["action"],
                "verified": True
            }
        else:
            return {
                "status": "verification_failed",
                "rollback": True
            }
    
    def _execute_strategy(self, strategy):
        action = strategy["action"]
        if action == "update_config":
            update_nginx_config(strategy["value"])
        elif action == "cleanup":
            cleanup_files(strategy["targets"])
        elif action == "restart_service":
            restart_service(strategy.get("service_name"))

五、踩坑经验与最佳实践

5.1 知识库冷启动:别贪多,先做减法

:第一版我们导入了所有能导入了文档(3000+ 篇),结果检索质量反而下降了——噪音太多,正相关内容被稀释。

解法:分阶段注入,先做高频场景的知识库,后续按需扩展。

# 按场景优先级分批注入
PRIORITY_TIERS = [
    # 第一层:高频核心场景(优先注入)
    ["上传下载失败", "权限问题", "登录认证", "同步冲突"],
    # 第二层:中频通用场景
    ["性能问题", "存储扩容", "备份恢复", "审计日志"],
    # 第三层:低频边缘场景
    ["API集成", "SSO配置", "多租户隔离", "合规审计"]
]

def phased_knowledge_injection():
    for tier_idx, keywords in enumerate(PRIORITY_TIERS):
        for keyword in keywords:
            docs = find_relevant_docs(keyword)
            # 只注入与关键词强相关的文档,避免噪音
            for doc in docs:
                if keyword in doc.title or keyword in doc.summary:
                    vectorstore.add(doc, metadata={"tier": tier_idx})

5.2 检索精度调优:从 0.65 到 0.89

:初期检索结果平均相关性评分只有 0.65,大量不相关内容混入。

调优手段(按效果排序):

  1. query_expansion(查询扩展):把用户问题改写成多个相似问法,一次检索多个 query
def query_expansion(original_query: str) -> list[str]:
    expansion_prompt = f"""
用户原始问题:{original_query}

请生成 3-5 个语义相近的问法(换词、换表述、具体化、抽象化),用于多路召回。

输出格式:每行一个问法,不要编号。
"""
    expansions = llm.invoke(expansion_prompt).strip().split("\n")
    return [original_query] + expansions

# 多路召回
def multi_query_retrieve(query: str, vectorstore, k: int = 5):
    queries = query_expansion(query)
    all_docs = []
    seen_ids = set()
    
    for q in queries:
        docs = vectorstore.similarity_search(q, k=k)
        for doc in docs:
            if doc.metadata["id"] not in seen_ids:
                all_docs.append(doc)
                seen_ids.add(doc.metadata["id"])
    
    # 重排序(用 cross-encoder 再评分)
    reranked = rerank_documents(query, all_docs, top_k=5)
    return reranked
  1. 重排序(Cross-Encoder):用更精准的模型对召回结果重新排序
from sentence_transformers import CrossEncoder

cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def rerank_documents(query: str, docs: list, top_k: int = 5):
    pairs = [(query, doc.page_content) for doc in docs]
    scores = cross_encoder.predict(pairs)
    
    doc_scores = list(zip(docs, scores))
    doc_scores.sort(key=lambda x: x[1], reverse=True)
    
    return [doc for doc, score in doc_scores[:top_k]]
  1. 元数据过滤:加category/module过滤,减少跨域噪音

5.3 响应延迟优化:P99 从 8s 到 1.2s

:用 gpt-4 做实时故障诊断,P99 延迟 8 秒,用户体验很差。

解法:三层缓存 + 模型降级

from functools import lru_cache
import hashlib

# L1: 精确问题缓存(相同问题 1 小时内直接返回)
@lru_cache(maxsize=1000)
def get_cached_diagnosis(question_hash: str):
    return None  # 缓存未命中

# L2: 向量相似缓存(相似问题用缓存结果)
def get_similar_diagnosis(question: str) -> str | None:
    cache_key = hashlib.md5(question.encode()).hexdigest()
    cached = redis.get(f"diag:{cache_key}")
    if cached:
        return cached
    
    # 检索相似的历史诊断
    similar_docs = vectorstore.similarity_search(question, k=3)
    if similar_docs and similar_docs[0].metadata.get("resolved"):
        return similar_docs[0].metadata["resolution_summary"]
    return None

# L3: 模型降级
def diagnose_with_fallback(question: str):
    # 先尝试缓存
    cached = get_similar_diagnosis(question)
    if cached:
        return {"source": "cache", "result": cached}
    
    # 优先用 GPT-3.5 做快速诊断
    try:
        result = fast_diagnostic_chain.run(question)  # gpt-3.5-turbo
        return {"source": "fast_model", "result": result}
    except Exception:
        # 降级到 GPT-4 做精确诊断
        result = precise_diagnostic_chain.run(question)
        return {"source": "precise_model", "result": result}

优化后效果:83% 的请求命中 L1/L2 缓存,P99 延迟从 8s 降到 1.2s。


六、效果数据

系统上线 8 个月数据:

指标 上线前 上线后 提升
MTTR(平均恢复时间) 42 分钟 11 分钟 -74%
自动化修复率 0% 67% +67pp
工单误判率 23% 6% -17pp
知识库检索满意度 91%
P99 响应延迟 1.2s

结语

运维知识库的核心价值不是"替代人",而是"让人的经验可复用"。

一个工程师花 2 小时排查的问题,下次遇到同类问题,5 秒出答案——这才是知识库的意义。

LangChain 不是一个银弹,它解决的是"如何让 LLM 更准确地调用你的知识",而不是"如何让你的知识变多"。先把知识质量做好,再谈 RAG。


附:完整代码仓库结构

ops-knowledge-base/
├── config/
│   └── settings.py          # 配置管理
├── knowledge_base/
│   ├── loaders/              # 数据导入
│   ├── embeddings/          # 向量化
│   └── vectorstore/         # Qdrant 存储
├── chains/
│   ├── retrieval_chain.py   # 检索链
│   ├── diagnostic_chain.py   # 诊断链
│   └── hyde_chain.py         # HyDE 链
├── detectors/
│   ├── log_anomaly.py       # 日志异常检测
│   └── health_checker.py    # 健康检查
├── healer/
│   └── auto_healer.py       # 自动修复
├── api/
│   └── service.py           # API 服务
└── main.py                  # 入口

有问题欢迎评论区交流,欢迎关注我的主页查看更多运维实践。**

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐