从零搭建 AI 网络安全 Agent:LangGraph + DeepSeek-R1 + 安全工具链全栈实战

摘要:本文详细记录了如何基于 LangGraph 状态图引擎 + DeepSeek-R1 32B 大模型 + 本地安全工具链(nmap/nuclei/subfinder 等),构建一个具备侦察、漏洞扫描、漏洞验证、人工审批、报告生成全链路能力的 AI 网络安全 Agent。项目支持 CVE RAG 知识库增强、ELK Stack 审计日志、等保/ISO27001 合规检查。附完整源码、部署脚本和用例文档。


文章目录

一、项目背景与目标

1.1 为什么需要 AI 安全 Agent?

传统的渗透测试流程高度依赖安全工程师的手动操作:

  • 侦察阶段:手动运行 nmap、subfinder、httpx,逐个分析输出
  • 漏洞扫描:手动配置 nuclei 模板、nikto 参数,人工筛选误报
  • 报告编写:手动整理发现、计算 CVSS 评分、撰写修复建议

整个流程耗时数小时甚至数天,且高度依赖个人经验。

AI Agent 的价值

  • 自动编排多工具执行顺序
  • 智能分析工具输出,减少误报
  • RAG 增强:自动关联 CVE 知识库
  • 结构化审计日志,满足合规要求
  • 一键生成安全评估报告

1.2 技术选型

组件 选型 理由
推理框架 LangGraph 状态管理最稳,支持条件分支、循环、HITL(人机回环)
底层模型 DeepSeek-R1-Distill-Qwen-32B 逻辑链极强,能理解复杂混淆流量和利用链
执行引擎 vLLM / Ollama 本地部署,数据不出境,兼容 OpenAI API
向量存储 ChromaDB 轻量级,存放 CVE 知识库和攻击指纹,实现 RAG 增强
审计日志 ELK Stack Elasticsearch + Logstash + Kibana,满足等保合规
CLI 框架 Click + Rich 命令行交互,丰富的终端输出

1.3 系统架构

┌─────────────────────────────────────────────────────────┐
│                    用户接口 (CLI)                         │
│  sec-agent scan <target>                                │
└──────────────────────┬──────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────┐
│               LangGraph 状态图引擎                        │
│                                                          │
│  ┌────────┐  ┌────────┐  ┌────────┐  ┌────────┐        │
│  │ Planner│→│  Recon │→│VulnScan│→│ Report │         │
│  └───┬────┘  └───┬────┘  └───┬────┘  └───┬────┘        │
│      │           │           │           │               │
│   [LLM规划]  [HITL审批]  [RAG增强]   [生成报告]          │
└──────┼───────────┼───────────┼───────────┼───────────────┘
       │           │           │           │
┌──────▼───────────▼───────────▼───────────▼───────────────┐
│              工具执行层 (subprocess 沙箱)                   │
│  nmap │ subfinder │ httpx │ nuclei │ nikto │ shodan      │
└──────────────────────┬───────────────────────────────────┘
                       │
┌──────────────────────▼───────────────────────────────────┐
│       模型推理: DeepSeek-R1 32B (vLLM/Ollama)            │
│       知识库: ChromaDB (CVE RAG + 攻击指纹)               │
│       审计层: JSONL + Elasticsearch (等保合规)             │
└──────────────────────────────────────────────────────────┘

二、环境准备

2.1 系统要求

  • 操作系统:Windows 10/11(本文环境)、Linux、macOS
  • Python:3.11+
  • Docker:用于部署 ELK Stack(可选)
  • Go:用于安装安全工具(subfinder/httpx/nuclei)
  • GPU:运行 32B 模型需要 24GB+ 显存(或使用量化版本)

2.2 安装 Python 环境

# 创建项目目录
mkdir D:\网络安全agent\sec-agent
cd D:\网络安全agent\sec-agent

# 创建虚拟环境
python -m venv .venv

# 激活虚拟环境(Windows)
.venv\Scripts\activate

# 激活虚拟环境(Linux/macOS)
# source .venv/bin/activate

2.3 安装安全工具

nmap(端口扫描)
# Windows: 使用 winget
winget install Insecure.Nmap

# 或从官网下载安装包
# https://nmap.org/download.html

# 验证安装
nmap --version
Go 工具链(subfinder / httpx / nuclei)
# 先安装 Go: https://go.dev/dl/

# 安装 subfinder(子域名枚举)
go install -v github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest

# 安装 httpx(Web 指纹识别)
go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest

# 安装 nuclei(模板化漏洞扫描)
go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest

# 初始化 nuclei 模板库
nuclei -update-templates

# 验证安装
subfinder -version
httpx -version
nuclei -version
nikto(Web 服务器漏洞扫描)
# 方式一:本地安装(需要 Perl 环境)
# 下载: https://github.com/sullo/nikto

# 方式二:使用 Docker(推荐)
docker run --rm sullo/nikto -h <target>

2.4 配置环境变量

# 复制环境变量模板
copy .env.example .env   # Windows
# cp .env.example .env   # Linux/macOS

编辑 .env 文件:

# ========== 模型推理 ==========
# vLLM 本地部署(推荐生产环境)
VLLM_API_BASE=http://localhost:8000/v1
VLLM_API_KEY=not-needed
MODEL_NAME=deepseek-ai/DeepSeek-R1-Distill-Qwen-32B

# Ollama 本地部署(推荐开发调试)
# OLLAMA_API_BASE=http://localhost:11434/v1
# MODEL_NAME=deepseek-r1:32b

# ========== 安全工具 ==========
# Shodan API Key(可选,用于互联网资产搜索)
# 获取地址: https://account.shodan.io
SHODAN_API_KEY=

# ========== 审计 / ELK ==========
ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_INDEX=sec-agent-audit
OPERATOR_ID=admin

# ========== Agent 配置 ==========
MAX_ITERATIONS=50
TOOL_TIMEOUT=300
REQUIRE_APPROVAL=true

2.5 安装 Python 依赖

# 安装项目依赖
pip install -e .

# 开发模式(包含测试依赖)
pip install -e ".[dev]"

核心依赖说明

包名 版本 用途
langgraph >=0.2.0 状态图引擎
langgraph-checkpoint-sqlite >=2.0.0 持久化 checkpointer
openai >=1.50.0 连接 vLLM/Ollama(OpenAI 兼容 API)
chromadb >=0.5.0 向量数据库(CVE RAG)
elasticsearch >=8.16.0 审计日志推送到 ES
pydantic >=2.9.0 数据模型验证
click >=8.1.0 CLI 框架
rich >=13.9.0 终端美化输出
httpx >=0.28.0 HTTP 请求(Shodan API)
xmltodict >=0.14.0 解析 nmap XML 输出

三、项目结构详解

sec-agent/
├── cli.py                          # CLI 入口(7 个命令)
├── pyproject.toml                  # 依赖配置
├── .env.example                    # 环境变量模板
├── USAGE.md                        # 用例文档
│
├── config/                         # 配置层
│   ├── settings.py                 # 全局配置(Pydantic Settings)
│   └── prompts.py                  # 系统提示词 + 报告模板
│
├── agent/                          # Agent 核心层
│   ├── state.py                    # LangGraph 状态定义
│   ├── graph.py                    # 状态图编排
│   ├── nodes.py                    # 8 个节点实现
│   └── edges.py                    # 条件路由逻辑
│
├── tools/                          # 工具层
│   ├── registry.py                 # 工具注册中心(自动发现)
│   ├── recon/                      # 侦察工具
│   │   ├── nmap_tool.py            # 端口扫描
│   │   ├── subfinder_tool.py       # 子域名枚举
│   │   ├── httpx_tool.py           # Web 指纹识别
│   │   └── shodan_tool.py          # 互联网资产搜索
│   └── vuln/                       # 漏洞扫描工具
│       ├── nuclei_tool.py          # 模板化漏洞检测
│       └── nikto_tool.py           # Web 服务器漏洞
│
├── memory/                         # 知识库层
│   ├── vector_store.py             # ChromaDB 向量存储
│   ├── cve_loader.py               # NVD 数据加载器
│   └── session_memory.py           # 会话级内存
│
├── audit/                          # 审计层
│   ├── logger.py                   # 结构化审计日志
│   └── compliance.py               # 等保/ISO27001 合规检查
│
├── sandbox/                        # 执行沙箱
│   ├── executor.py                 # subprocess 执行器
│   └── validator.py                # 输出解析验证
│
├── elk/                            # ELK 部署
│   ├── docker-compose.yml          # 一键部署
│   └── logstash.conf               # 管道配置
│
└── tests/                          # 测试
    ├── conftest.py
    ├── test_registry.py
    ├── test_state.py
    └── test_validator.py


四、核心模块实现

4.1 工具注册中心(tools/registry.py)

这是整个工具链的基石。每个工具文件在导入时自动调用 registry.register() 注册自己。

@dataclass
class SecurityTool:
    name: str                    # 工具名称
    category: str                # 分类: recon | vuln | exploit | audit
    risk_level: str              # 风险等级: low | medium | high | critical
    requires_approval: bool      # 是否需要人工审批
    binary: str                  # 可执行文件名(API 工具留空)
    description: str = ""        # 工具描述
    timeout: int = 300           # 超时时间(秒)
    schema: dict = field(default_factory=dict)  # 参数 JSON Schema

关键设计:API 工具 vs CLI 工具的区分

def check_available(self, name: str) -> tuple[bool, str]:
    tool = self._tools.get(name)
    if not tool:
        return False, f"Tool '{name}' not registered"
    # API 类工具(如 Shodan)没有本地 binary,直接返回可用
    if not tool.binary:
        return True, ""
    # CLI 类工具需要检查 binary 是否在 PATH 中
    binary_path = shutil.which(tool.binary)
    if not binary_path:
        return False, f"Binary '{tool.binary}' not found in PATH"
    return True, ""

自动发现机制

def discover_tools() -> list[str]:
    """扫描 tools/ 下所有 *_tool.py 文件,自动导入注册"""
    tools_dir = Path(__file__).resolve().parent
    imported = []
    for subdir in ["recon", "vuln", "exploit", "audit", "report"]:
        pkg_dir = tools_dir / subdir
        if not pkg_dir.is_dir():
            continue
        for py_file in sorted(pkg_dir.glob("*_tool.py")):
            mod_name = f"tools.{subdir}.{py_file.stem}"
            importlib.import_module(mod_name)
            imported.append(mod_name)
    return imported

只需在 tools/recon/ 下创建 xxx_tool.py 并调用 registry.register(),无需手动注册。

4.2 工具执行器(sandbox/executor.py)

统一的 subprocess 封装,带超时控制和错误处理:

def run_command(cmd: list[str], timeout: int | None = None) -> ExecutionResult:
    timeout = timeout or settings.tool_timeout
    try:
        proc = subprocess.run(
            cmd, capture_output=True, text=True, timeout=timeout
        )
        return ExecutionResult(
            success=proc.returncode == 0,
            stdout=proc.stdout,
            stderr=proc.stderr,
            returncode=proc.returncode,
        )
    except subprocess.TimeoutExpired:
        return ExecutionResult(
            success=False, stderr=f"Command timed out after {timeout}s",
            returncode=-1, timed_out=True,
        )

4.3 侦察工具示例(tools/recon/nmap_tool.py)

以 nmap 为例,展示工具的标准实现模式:

def nmap_scan(target: str, ports: str = "1-10000", scan_type: str = "sV") -> dict:
    # 构建命令
    cmd = ["nmap", f"-{scan_type}", "-p", ports, "--open", "-oX", "-"]
    cmd.append(target)

    # 执行
    result = run_command(cmd)
    if not result.success:
        return {"error": result.stderr, "success": False}

    # 解析 XML 输出
    parsed = xmltodict.parse(result.stdout)
    hosts = parsed.get("nmaprun", {}).get("host", [])
    # ... 解析端口和服务信息 ...

    return {
        "success": True,
        "target": target,
        "services": services,
        "total_open_ports": len(services),
    }

# 注册工具
registry.register(
    tool=SecurityTool(
        name="nmap_scan",
        category="recon",
        risk_level="low",
        requires_approval=False,
        binary="nmap",
        description="Port scanning and service detection using nmap.",
        schema={...},
    ),
    handler=nmap_scan,
)

4.4 LangGraph 状态图(agent/graph.py)

这是 Agent 的"大脑",定义了任务的执行流程:

def build_security_agent(checkpointer=None):
    discover_tools()

    graph = StateGraph(AgentState)

    # 注册节点
    graph.add_node("planner", plan_task)         # LLM 规划
    graph.add_node("recon", run_recon)            # 侦察
    graph.add_node("vuln_scan", run_vuln_scan)    # 漏洞扫描
    graph.add_node("validate", validate_vulns)    # 漏洞验证
    graph.add_node("human_approval", wait_approval)  # 人工审批
    graph.add_node("exploit", run_exploit)        # 漏洞利用
    graph.add_node("audit_check", run_audit)      # 安全审计
    graph.add_node("report", generate_report)     # 报告生成

    # 定义执行流程
    graph.set_entry_point("planner")

    # Planner → 根据任务类型路由
    graph.add_conditional_edges("planner", route_task, {
        "recon": "recon",
        "vuln_scan": "vuln_scan",
        "audit": "audit_check",
    })

    # Recon → VulnScan → 判断是否需要利用
    graph.add_edge("recon", "vuln_scan")
    graph.add_conditional_edges("vuln_scan", check_exploit_needed, {
        "human_approval": "human_approval",  # 有高危漏洞 → 请求审批
        "validate": "validate",               # 直接验证
        "report": "report",                   # 无高危 → 直接报告
    })

    # 审批 → 验证 → 利用 → 报告
    graph.add_edge("human_approval", "validate")
    graph.add_edge("validate", "exploit")
    graph.add_edge("exploit", "report")
    graph.add_edge("audit_check", "report")
    graph.add_edge("report", END)

    # interrupt_before 实现 HITL:exploit 节点执行前暂停等待人工确认
    return graph.compile(
        checkpointer=checkpointer or _build_checkpointer(),
        interrupt_before=["exploit"],
    )

状态定义(agent/state.py)

class AgentState(TypedDict):
    target: str                          # 目标
    task_type: Literal["recon", "vuln_scan", "exploit", "audit", "full"]
    messages: Annotated[list, add_messages]
    recon_results: dict                  # 侦察结果
    vuln_results: list                   # 漏洞列表
    exploit_results: list                # 利用结果
    risk_score: float                    # 综合风险评分
    requires_approval: bool              # 是否需要审批
    approval_granted: bool               # 是否已批准
    current_phase: str                   # 当前阶段
    report: str                          # 最终报告

4.5 节点实现(agent/nodes.py)

Planner 节点 — LLM 智能规划
def plan_task(state: AgentState) -> dict:
    """调用 DeepSeek-R1 决定下一步操作"""
    state_summary = f"Target: {target} | Task: {task_type} | Phase: {current_phase}"
    # ... 构建状态摘要 ...

    response = call_llm([
        {"role": "system", "content": PLANNER_PROMPT},
        {"role": "user", "content": f"Plan next step for target: {target}"},
    ], json_mode=True)

    plan = json.loads(response)
    return {"next_action": plan["next_action"], "current_phase": plan["next_action"]}

Recon 节点 — 带审计和 RAG 增强
def run_recon(state: AgentState) -> dict:
    audit = AuditLogger(session_id=state.get("audit_log_id"))
    target = state["target"]
    results = {}

    # 域名才跑 subfinder
    if not _is_ip(target):
        results["subfinder"] = _safe_execute("subfinder_enum", {"domain": target}, audit)

    # httpx 指纹 + nmap 端口扫描
    results["httpx"] = _safe_execute("httpx_probe", {"target": target}, audit)
    results["nmap"] = _safe_execute("nmap_scan", {"target": target}, audit)

    # RAG: 根据发现的服务查询相关 CVE
    vs = VectorStore()
    for svc in results["nmap"].get("services", [])[:5]:
        cve_hits = vs.search_cves(f"{svc['product']} {svc['version']}")
        results["rag_cves"].extend(cve_hits)

    return {"recon_results": results, "current_phase": "recon_done"}

关键设计:_safe_execute 容错包装

def _safe_execute(tool_name: str, args: dict, audit_logger=None) -> dict:
    """单个工具失败不会崩溃整个流程"""
    try:
        return registry.execute(tool_name, args, audit_logger=audit_logger, force=True)
    except ToolNotAvailable as e:
        return {"success": False, "error": str(e)}
    except Exception as e:
        return {"success": False, "error": str(e)}

4.6 CVE RAG 知识库(memory/vector_store.py)

使用 ChromaDB 存储 CVE 数据,实现语义检索:

class VectorStore:
    def __init__(self):
        self.client = chromadb.PersistentClient(path=str(persist_dir))
        self.cve_collection = self.client.get_or_create_collection(
            name="cve_knowledge",
            metadata={"hnsw:space": "cosine"},
        )

    def add_cves(self, cves: list[dict]) -> int:
        """批量写入 CVE 数据"""
        for cve in cves:
            doc_text = f"{cve['cve_id']}: {cve['description']} Severity: {cve['severity']}"
            self.cve_collection.upsert(
                ids=[cve["cve_id"]],
                documents=[doc_text],
                metadatas=[{"severity": cve["severity"], "cvss": str(cve["cvss"])}],
            )

    def search_cves(self, query: str, n_results: int = 5) -> list[dict]:
        """语义检索相关 CVE"""
        results = self.cve_collection.query(
            query_texts=[query],
            n_results=n_results,
            include=["documents", "metadatas", "distances"],
        )
        return [{"cve_id": doc_id, "document": doc, ...} for doc_id, doc in ...]

从 NVD 加载 CVE 数据

def fetch_cves_from_nvd(keyword: str) -> list[dict]:
    """调用 NVD API 2.0 获取 CVE 数据"""
    resp = httpx.get("https://services.nvd.nist.gov/rest/json/cves/2.0",
                     params={"keywordSearch": keyword, "resultsPerPage": 200})
    # 解析 CVSS 评分、影响产品、描述等
    # ...

4.7 审计日志层(audit/logger.py)

满足等保 2.0 和 ISO 27001 的审计要求:

class AuditLogger:
    def _write(self, doc: dict) -> None:
        doc["timestamp"] = datetime.now(timezone.utc).isoformat()
        doc["session_id"] = self.session_id
        doc["operator"] = settings.operator_id

        # 写入本地 JSONL 文件
        with open(self._log_file, "a", encoding="utf-8") as f:
            f.write(json.dumps(doc, ensure_ascii=False) + "\n")

        # 推送到 Elasticsearch
        es = self._get_es()
        if es:
            es.index(index=settings.elasticsearch_index, document=doc)

    def log_tool_call(self, tool_name, args, risk_level):
        self._write({"event_type": "tool_call", "tool_name": tool_name, ...})

    def log_approval(self, tool_name, approved, approver):
        self._write({"event_type": "human_approval", ...})

合规检查器

class ComplianceChecker:
    REQUIRED_EVENTS = ["tool_call", "tool_result", "human_approval", "phase_change", "error"]
    REQUIRED_FIELDS = ["timestamp", "session_id", "operator", "event_type"]

    def validate_log_completeness(self) -> dict:
        """检查日志是否包含所有必需事件和字段"""
        # ...
        return {"compliant": True/False, "missing_events": [...], "field_violations": [...]}


五、部署模型推理服务

5.1 方案一:vLLM 部署(推荐生产环境)

# 安装 vLLM
pip install vllm

# 启动服务(A100/H100 24GB+ 显存)
vllm serve deepseek-ai/DeepSeek-R1-Distill-Qwen-32B \
    --host 0.0.0.0 \
    --port 8000 \
    --max-model-len 32768 \
    --gpu-memory-utilization 0.9

# 如果显存不足,使用量化版本
vllm serve deepseek-ai/DeepSeek-R1-Distill-Qwen-32B-GPTQ \
    --host 0.0.0.0 \
    --port 8000 \
    --quantization gptq

5.2 方案二:Ollama 部署(推荐开发调试)

# 安装 Ollama: https://ollama.com

# 拉取模型
ollama pull deepseek-r1:32b

# 启动服务(默认端口 11434)
ollama serve

# .env 配置
# VLLM_API_BASE=http://localhost:11434/v1
# MODEL_NAME=deepseek-r1:32b

5.3 验证模型服务

# vLLM
curl http://localhost:8000/v1/models

# Ollama
curl http://localhost:11434/api/tags

# 测试推理
curl http://localhost:8000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-32B","messages":[{"role":"user","content":"Hello"}]}'


六、部署 ELK Stack(可选)

6.1 一键部署

cd elk
docker-compose up -d

6.2 服务地址

服务 地址 用途
Elasticsearch http://localhost:9200 数据存储与搜索
Kibana http://localhost:5601 可视化仪表板
Logstash localhost:5044 日志收集管道

6.3 验证部署

# 检查 Elasticsearch
curl http://localhost:9200/_cluster/health

# 使用 CLI 检查
python cli.py elk-status

6.4 Kibana 配置

  1. 打开 http://localhost:5601
  2. 进入 Stack Management → Index Patterns
  3. 创建索引模式:sec-agent-audit-*
  4. 进入 Discover 查看审计日志
  5. 可按 event_typetool_namerisk_leveloperator 等字段过滤

七、使用指南

7.1 查看工具状态

python cli.py tools

输出示例:

┌───────────── Available Security Tools ─────────────┐
│ Name            Category  Risk Level  Status       │
├─────────────────────────────────────────────────────┤
│ nmap_scan       recon     low         OK           │
│ subfinder_enum  recon     low         OK           │
│ httpx_probe     recon     low         OK           │
│ nuclei_scan     vuln      medium      OK           │
│ nikto_scan      vuln      medium      OK           │
│ shodan_search   recon     low         OK           │
└─────────────────────────────────────────────────────┘

7.2 全栈安全扫描

# 完整扫描(侦察 → 漏洞扫描 → 审批 → 报告)
python cli.py scan 192.168.1.1

# 指定任务类型
python cli.py scan example.com -t recon        # 仅侦察
python cli.py scan example.com -t vuln_scan     # 仅漏洞扫描
python cli.py scan example.com -t audit          # 仅安全审计

# 保存报告
python cli.py scan example.com -o report.md

# 跳过人工审批(自动化模式)
python cli.py scan example.com --no-approval

7.3 快捷命令

python cli.py recon example.com     # 快速侦察
python cli.py vuln example.com      # 快速漏洞扫描
python cli.py audit 192.168.1.1     # 快速安全审计

7.4 加载 CVE 知识库

# 从 NVD 在线加载(按关键词)
python cli.py load-cves -k apache -k nginx -k openssl -k log4j -l 200

# 注意:NVD API 有速率限制(无 API Key 时 5 req/30s)
# 代码中已内置 6 秒间隔限流

7.5 合规检查

# 检查当前会话的审计日志合规性
python cli.py compliance

# 检查指定会话
python cli.py compliance <session_id>

输出示例:

# 审计合规检查报告

## 检查结果: PASS

## 详情
- 总日志条目: 42
- 发现的事件类型: tool_call, tool_result, human_approval, phase_change, error
- 缺失的事件类型: 无

## 合规标准
- 等保 2.0: 要求所有安全操作可追溯
- ISO 27001: 要求完整的审计日志链


八、人工审批流程(HITL)

这是本项目的核心安全设计。高危操作(漏洞利用)必须经过人工确认。

8.1 工作原理

  1. Agent 执行侦察和漏洞扫描
  2. 发现高危漏洞(CVSS ≥ 7.0)时自动暂停
  3. 在终端展示漏洞列表,等待用户确认
  4. 用户批准后继续执行漏洞利用
  5. 用户拒绝则跳过利用,直接生成报告

8.2 代码流程

# cli.py 中的审批处理
if result.get("requires_approval") and not no_approval:
    # 展示高危漏洞
    for v in high_risk:
        table.add_row(v["severity"], v["name"], str(v["risk_score"]))
    console.print(table)

    # 等待用户确认
    if click.confirm("Approve exploitation?", default=False):
        result = agent.invoke({"approval_granted": True}, config=config)
        audit.log_approval("exploit", True, operator)
    else:
        audit.log_approval("exploit", False, operator)

8.3 LangGraph interrupt_before 机制

graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["exploit"],  # exploit 节点执行前暂停
)

暂停后通过 agent.invoke({"approval_granted": True}) 恢复执行。


九、Python API 调用

除了 CLI,也可以直接在 Python 中调用:

from agent.graph import run_agent

# 完整扫描
result = run_agent("192.168.1.1", task_type="full")

# 查看结果
print(result["report"])           # 安全报告
print(result["vuln_results"])     # 漏洞列表
print(result["risk_score"])       # 风险评分

# 单独调用工具
from tools.recon.nmap_tool import nmap_scan
from tools.vuln.nuclei_tool import nuclei_scan

nmap_result = nmap_scan("192.168.1.1", ports="1-1000")
nuclei_result = nuclei_scan("http://192.168.1.1", severity="critical,high")

# CVE RAG 检索
from memory.vector_store import VectorStore
vs = VectorStore()
cves = vs.search_cves("apache log4j remote code execution", n_results=5)

# 添加攻击指纹
vs.add_fingerprint(
    fp_id="sql_injection_union",
    description="SQL Injection via UNION SELECT",
    indicators=["UNION SELECT", "ORDER BY", "-- ", "OR 1=1"]
)


十、添加自定义工具

只需 3 步即可添加新工具:

步骤 1:创建工具文件

tools/recon/tools/vuln/ 下创建 my_tool.py

from sandbox.executor import run_command
from tools.registry import SecurityTool, registry

def my_scan(target: str, mode: str = "fast") -> dict:
    result = run_command(["my_tool", "--mode", mode, target])
    return {"success": result.success, "output": result.stdout}

registry.register(
    tool=SecurityTool(
        name="my_scan",
        category="recon",           # 分类
        risk_level="low",           # 风险等级
        requires_approval=False,    # 是否需要审批
        binary="my_tool",           # 可执行文件名(API 工具留空 "")
        description="My custom scanner",
        schema={
            "type": "object",
            "properties": {
                "target": {"type": "string", "description": "Scan target"},
                "mode": {"type": "string", "default": "fast"},
            },
            "required": ["target"],
        },
    ),
    handler=my_scan,
)

步骤 2:安装工具

确保 my_tool 在系统 PATH 中。

步骤 3:验证

python cli.py tools  # 应该能看到 my_scan

无需修改任何其他文件,自动发现机制会处理一切。


十一、项目源码文件清单

文件 行数 功能
cli.py 214 CLI 入口,7 个命令
config/settings.py 48 Pydantic Settings 配置
config/prompts.py 72 系统提示词 + 报告模板
agent/state.py 23 LangGraph 状态定义
agent/graph.py 124 状态图编排
agent/nodes.py 385 8 个节点实现
agent/edges.py 30 条件路由逻辑
tools/registry.py 142 工具注册中心
tools/recon/nmap_tool.py 89 nmap 端口扫描
tools/recon/subfinder_tool.py 63 子域名枚举
tools/recon/httpx_tool.py 90 Web 指纹识别
tools/recon/shodan_tool.py 143 Shodan 互联网资产搜索
tools/vuln/nuclei_tool.py 97 nuclei 漏洞扫描
tools/vuln/nikto_tool.py 95 nikto Web 漏洞扫描
memory/vector_store.py 145 ChromaDB 向量存储
memory/cve_loader.py 151 NVD 数据加载器
memory/session_memory.py 47 会话级内存
audit/logger.py 104 审计日志(JSONL + ES)
audit/compliance.py 103 等保/ISO27001 合规检查
sandbox/executor.py 74 subprocess 执行器
sandbox/validator.py 64 输出解析验证

十二、测试

# 运行全部测试
python -m pytest tests/ -v

# 运行单个测试文件
python -m pytest tests/test_registry.py -v

# 运行单个测试
python -m pytest tests/test_registry.py::TestToolRegistry::test_register_and_get -v

测试覆盖:

  • 工具注册与发现
  • API 工具 vs CLI 工具的可用性判断
  • 人工审批机制
  • 输出解析(JSON / NDJSON / 纯文本)
  • 目标验证(IP / 域名 / URL)
  • LangGraph 状态字段完整性

十三、常见问题

Q1: 工具未找到?

python cli.py tools  # 查看工具状态
# 确保工具已安装并在 PATH 中
# Windows 需要重启终端让 PATH 生效

Q2: 模型连接失败?

# 检查 vLLM
curl http://localhost:8000/v1/models

# 检查 Ollama
curl http://localhost:11434/api/tags

# 确认 .env 中的 VLLM_API_BASE 配置正确

Q3: ELK 连接失败?

python cli.py elk-status
# 确保 Docker 容器正在运行
docker ps | findstr sec-agent
# 查看容器日志
docker logs sec-agent-es

Q4: NVD API 被限流?

代码已内置 6 秒间隔限流。如果仍然被限流:

  • 申请 NVD API Key:https://nvd.nist.gov/developers/request-an-api-key
  • .env 中配置 NVD_API_KEY

Q5: 显存不足运行 32B 模型?

# 方案一:使用量化版本
vllm serve deepseek-ai/DeepSeek-R1-Distill-Qwen-32B-GPTQ --quantization gptq

# 方案二:使用更小的蒸馏版本
# MODEL_NAME=deepseek-ai/DeepSeek-R1-Distill-Qwen-7B

# 方案三:使用 Ollama 的 4bit 量化
ollama pull deepseek-r1:32b-q4_K_M


十四、总结

本文实现了一个完整的 AI 网络安全 Agent,核心特性:

  1. LangGraph 状态图:清晰定义侦察 → 扫描 → 验证 → 利用 → 报告的执行流程
  2. DeepSeek-R1 32B:强大的推理能力,智能规划任务和分析漏洞
  3. 自动工具发现:新增工具只需创建文件,无需修改任何其他代码
  4. CVE RAG 增强:扫描结果自动关联 CVE 知识库,提供上下文信息
  5. 人工审批(HITL):高危操作必须经过人工确认,安全可控
  6. 审计合规:完整的审计日志链,满足等保 2.0 和 ISO 27001 要求
  7. 容错设计:单个工具失败不影响整体流程

项目地址https//:github.com/ggjj-hub

下一步计划

  • 集成 sqlmap(SQL 注入自动化验证)
  • 集成 Metasploit(漏洞利用框架)
  • 支持并发扫描多目标
  • Web UI 仪表板
  • 定时扫描任务

免责声明:本项目仅用于授权的安全测试和研究目的。未经授权对他人系统进行安全测试是违法行为。请确保在使用前获得目标系统的明确授权。

求助:本人在网络安全大模型的自主训练有所准备,但由于相关的信息较为敏感以及封闭,所以目前还停留在数据集收集阶段以及框架搭建阶段,能否有大佬提供思路,介绍资源,在此,由衷感谢!

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐