试做基于GitHub Copilot的AI Agent

一、核心需求与本质

很多开发者会遇到一个痛点:GitHub Copilot仅能完成代码补全,却无法像Claude Code(类龙虾工具)那样具备工具调用和本地上下文感知能力。通俗来说,就是希望Copilot能实现“读本地文件、看编译错误、跑测试、自主决策下一步行动”,而不仅仅是“写代码”。

这个需求本质上已经进入AI工程系统设计层,我们可以通过一套简化版MCP(Model Context Protocol)架构实现,这套方案可落地、能对接现有Python脚本,且具备可扩展性,能逐步升级为完整的工程系统。

二、整体架构设计(简化版MCP)

架构核心是“Copilot做决策,Python做执行”,通过MCP控制器实现两者的联动,具体架构如下:

                ┌───────────────┐
                │  Copilot      │
                └──────┬────────┘
                       │(自然语言指令)
                ┌──────▼────────┐
                │ MCP Controller│ ← Python执行器
                └───────┬───────┘
      ┌─────────────────┼────────────────┐
      │                 │                │
┌─────▼──────┐   ┌──────▼───────┐ ┌──────▼───────┐
│ file\_tool │   │ compile\_tool│ │  test\_tool  │
└────────────┘   └──────────────┘ └──────────────┘

各模块职责:

  • Copilot:作为决策引擎,输出符合MCP协议的指令(JSON格式);

  • MCP Controller:Python编写的核心控制器,解析Copilot输出的指令,调用对应工具;

  • 工具层(file_tool/compile_tool/test_tool):实现具体的本地操作,如读写文件、编译代码、运行测试。

三、核心协议设计(JSON-based MCP协议)

MCP协议的核心是约束Copilot的输出格式,确保Python控制器能准确解析并执行,同时定义支持的工具集,实现标准化调用。

3.1 Copilot输出格式(关键约束)

强制Copilot仅输出JSON,格式如下(无任何多余解释或代码块):

{
  "action": "tool_name",  // 工具名称,如read_file、compile
  "args": {               // 工具参数,根据工具类型调整
    "param1": "xxx"       // 示例参数,如文件路径path
  }
}

3.2 第一版支持工具

优先实现核心工具,满足基础需求,后续可扩展:

  • read_file(path):读取指定路径的本地文件内容;

  • write_file(path, content):将指定内容写入指定路径文件;

  • compile():调用g++编译代码,生成可执行文件;

  • run_tests():运行编译生成的可执行文件,执行测试用例。

四、核心实现:Python控制器

Python控制器是整个系统的“大脑”,负责调用Copilot、解析指令、执行工具、循环迭代,以下是完整可运行代码,分为多个功能模块,结构清晰可维护。

4.1 环境准备(必做)

首先安装依赖工具,确保Copilot CLI可正常调用:

# 安装GitHub CLI
brew install gh

# 登录GitHub(关联Copilot权限)
gh auth login

# 安装Copilot CLI扩展
gh extension install github/gh-copilot

4.2 完整Python脚本(基础稳定版)

保存为copilot_mcp_agent.py,可直接对接C++自动生成工程:

import os
import json
import subprocess
from pathlib import Path

# ========================= 配置项(可根据需求调整) =========================
MAX_STEPS = 12  # 最大迭代步数,防止死循环
INTERFACE_FILE = "./interfaces/math_api.h"  # 接口文件路径
CPP_FILE = "./generated/impl.cpp"  # 生成的C++实现文件路径
TEST_FILE = "./tests/test.cpp"  # 测试文件路径
EXECUTABLE = "./app"  # 编译生成的可执行文件名称
LOG_FILE = "agent.log"  # 日志文件路径
# ===========================================================================

# ========================= Prompt定义(核心约束) =========================
SYSTEM_PROMPT = """
你是一个C++自动化工程Agent,你必须通过“调用工具”完成任务。
你不能直接输出代码或解释,只能输出JSON,格式如下:
{
  "action": "...",
  "args": { ... }
}
可用工具:
1. read_file(path)
2. write_file(path, content)
3. compile()
4. run_tests()
任务目标:
- 读取接口文件
- 生成实现代码
- 写入.cpp文件
- 编译
- 修复错误
- 运行测试
- 直到成功
策略:
- 每一步只做一个动作
- 出现错误必须修复
- 不要跳步骤
- 不要解释
"""

USER_PROMPT = f"""
任务:根据接口文件生成C++实现,并通过编译和测试
接口文件路径:{INTERFACE_FILE}
输出必须是JSON action
"""
# ===========================================================================

# ========================= 日志函数(兼容print,自动写入文件) =========================
import datetime
import sys

def log(*args, sep=' ', end='\n', file=None, flush=False, level="INFO"):
    """
    类似print的日志函数,支持:
    - 多参数、sep/end/file/flush参数
    - 自动添加时间戳和日志级别
    - 同时输出到控制台和日志文件
    """
    # 拼接日志内容
    message = sep.join(str(arg) for arg in args)
    # 生成时间戳
    ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    final_msg = f"[{ts}][{level}] {message}"
    
    # 输出到控制台
    print(final_msg, end=end, file=file or sys.stdout, flush=flush)
    
    # 写入日志文件(异常不影响主流程)
    try:
        with open(LOG_FILE, "a", encoding="utf-8") as f:
            f.write(final_msg + end)
    except Exception as e:
        print(f"[LOG ERROR] {e}", file=sys.stderr)
# ===========================================================================

# ========================= Copilot调用(适配最新CLI) =========================
def call_copilot(prompt):
    """调用GitHub Copilot CLI,获取决策指令(JSON)"""
    log("开始调用Copilot,获取决策指令")
    # 最新Copilot CLI用法,避免shell模式导致权限问题
    cmd = [
        "gh", "copilot", "suggest-command",
        prompt
    ]
    
    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        encoding="utf-8",
        errors="ignore"
    )
    
    output = result.stdout.strip()
    # 打印Copilot输入输出,方便调试
    log("Copilot输入Prompt(前2000字符):", prompt[:2000], level="INFO")
    log("Copilot输出(前2000字符):", output[:2000], level="INFO")
    
    if result.returncode != 0:
        log("Copilot调用失败,错误信息:", result.stderr, level="ERROR")
        return ""
    return output
# ===========================================================================

# ========================= 工具实现(核心操作) =========================
def read_file(path):
    """读取指定路径文件内容"""
    try:
        with open(path, "r", encoding="utf-8") as f:
            content = f.read()
        log(f"成功读取文件:{path},文件大小:{len(content)}字节")
        return content
    except Exception as e:
        log(f"读取文件{path}失败,错误:{str(e)}", level="ERROR")
        return str(e)

def write_file(path, content):
    """将内容写入指定路径文件,自动创建目录"""
    try:
        # 自动创建上级目录
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, "w", encoding="utf-8") as f:
            f.write(content)
        log(f"成功写入文件:{path}")
        return "write ok"
    except Exception as e:
        log(f"写入文件{path}失败,错误:{str(e)}", level="ERROR")
        return str(e)

def compile_code():
    """调用g++编译代码,生成可执行文件"""
    cmd = [
        "g++",
        "-std=c++17",
        "-O2",
        CPP_FILE,
        TEST_FILE,
        "-o",
        EXECUTABLE
    ]
    log("开始编译,编译命令:", " ".join(cmd))
    
    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        encoding="utf-8",
        errors="ignore"
    )
    
    if result.returncode != 0:
        log("编译失败,错误信息:", result.stderr, level="ERROR")
        return result.stderr
    log("编译成功,生成可执行文件:", EXECUTABLE)
    return "compile success"

def run_tests():
    """运行测试用例,返回测试结果"""
    log("开始运行测试用例")
    try:
        result = subprocess.run(
            [EXECUTABLE],
            capture_output=True,
            text=True,
            encoding="utf-8",
            errors="ignore",
            timeout=5  # 超时保护,防止程序卡死
        )
        
        if result.returncode != 0:
            log("测试失败,输出信息:", result.stdout + result.stderr, level="ERROR")
            return result.stdout + result.stderr
        log("测试通过,输出信息:", result.stdout)
        return "测试通过\n" + result.stdout
    except subprocess.TimeoutExpired:
        log("测试超时(超过5秒)", level="ERROR")
        return "timeout"
# ===========================================================================

# ========================= JSON解析与校验(防止失控) =========================
def extract_json_strict(text):
    """严格提取包含action的JSON,过滤多余内容"""
    import re
    if not text:
        return None
    # 去除ANSI颜色码和代码块标记
    text = re.sub(r'\x1B[@-_][0-?]*[ -/]*[@-~]', '', text)
    text = re.sub(r"```[a-zA-Z]*", "", text)
    text = text.replace("```", "")
    # 只匹配包含"action"的JSON(关键,避免误解析代码)
    matches = re.findall(r"\{[\s\S]*?\}", text)
    for m in reversed(matches):
        if '"action"' in m:
            return m
    return None

def safe_json_load(json_str):
    """安全解析JSON,避免解析失败崩溃"""
    try:
        return json.loads(json_str)
    except Exception as e:
        log(f"JSON解析失败,错误:{str(e)},待解析内容:{json_str[:500]}", level="ERROR")
        return None

ALLOWED_ACTIONS = ["read_file", "write_file", "compile", "run_tests"]
def validate_action(action_json):
    """校验action合法性,防止非法指令"""
    if not action_json:
        return False
    action = action_json.get("action")
    if action not in ALLOWED_ACTIONS:
        log(f"非法action:{action},仅允许:{ALLOWED_ACTIONS}", level="ERROR")
        return False
    return True
# ===========================================================================

# ========================= MCP执行器(解析并执行指令) =========================
def execute_action(action_json):
    """根据Copilot输出的JSON指令,执行对应工具"""
    action = action_json.get("action")
    args = action_json.get("args", {})
    
    # 拦截compile和run_tests,由系统统一控制,避免Copilot越权
    if action in ["compile", "run_tests"]:
        log(f"拦截AI直接调用{action},由系统统一执行")
        return "系统自动执行,不允许AI直接调用"
    
    if action == "read_file":
        return read_file(args.get("path", ""))
    elif action == "write_file":
        return write_file(args.get("path", ""), args.get("content", ""))
    else:
        log(f"未知action:{action}", level="ERROR")
        return f"unknown action: {action}"
# ===========================================================================

# ========================= Agent主循环(核心逻辑) =========================
def agent_loop():
    """Agent主循环,实现多轮决策-执行-反馈"""
    context = SYSTEM_PROMPT + "\n" + USER_PROMPT
    log("Agent启动,开始执行任务,最大迭代步数:", MAX_STEPS)
    
    for step in range(MAX_STEPS):
        log(f"\n====== 迭代步骤 {step+1}/{MAX_STEPS} ======")
        
        # 调用Copilot获取决策指令
        raw_response = call_copilot(context)
        
        # 提取并校验JSON
        json_str = extract_json_strict(raw_response)
        if not json_str:
            log("未提取到有效JSON,提醒Copilot重新输出", level="WARNING")
            context += "\n你刚才没有输出JSON,请严格按照格式输出仅包含action和args的JSON,不要添加任何多余内容。"
            continue
        
        # 安全解析JSON
        action_json = safe_json_load(json_str)
        if not validate_action(action_json):
            log("JSON指令非法,重试", level="WARNING")
            continue
        
        # 执行工具指令
        tool_result = execute_action(action_json)
        
        # 系统主动执行编译和测试(核心优化,避免Copilot越权)
        compile_result = compile_code()
        test_result = run_tests()
        
        # 更新上下文,将工具结果和编译、测试结果反馈给Copilot
        context += f"""

上一步执行指令:
{json.dumps(action_json, ensure_ascii=False, indent=2)}

工具执行结果:
{tool_result[:2000]}

编译结果:
{compile_result[:2000]}

测试结果:
{test_result[:2000]}
"""
        
        # 任务完成条件:测试通过
        if "测试通过" in test_result:
            log("任务完成!所有测试通过", level="INFO")
            return True
    
    log("达到最大迭代步数,任务未完成", level="ERROR")
    return False
# ===========================================================================

# ========================= 主函数(程序入口) =========================
def main():
    """程序入口,检查依赖并启动Agent"""
    # 检查接口文件是否存在
    if not Path(INTERFACE_FILE).exists():
        log(f"接口文件不存在:{INTERFACE_FILE}", level="ERROR")
        return
    
    # 启动Agent循环
    success = agent_loop()
    
    if success:
        log("系统运行成功,任务完成!", level="INFO")
    else:
        log("系统运行失败,任务未完成", level="ERROR")

if __name__ == "__main__":
    main()

实际运行结果

[2026-04-20 15:16:18] 
====== Step 1 ======
[2026-04-20 15:16:18] 
================ Copilot Prompt ================

[2026-04-20 15:16:18] 
你是一个“工具调用决策器(Tool Agent)”。

:warning: 强制规则(必须遵守):

1. 你不能执行任何操作
2. 你不能描述过程
3. 你不能写解释性文字
4. 你不能输出 shell / PowerShell / 命令
5. 你不能说“Reading file…”之类的话

你唯一能做的是:

:point_right: 输出一个 JSON

格式:

{
  "action": "tool_name",
  "args": { ... }
}

可用工具:

- read_file(path)
- write_file(path, content)
- compile()
- run_tests()

策略:

1. 第一步:read_file
2. 第二步:write_file(生成完整.cpp)
3. 第三步:compile
4. 第四步:run_tests
5. 失败则修复

:warning: 如果你输出任何非JSON内容,系统将失败
:warning: 不允许多步操作
:warning: 一次只能调用一个工具
你不是IDE,不允许模拟文件操作
你只能返回JSON,不允许描述任何行为


任务:

根据接口文件生成完整C++实现,并通过编译和测试。

接口路径:
./interfaces/math_api.h

必须生成完整.cpp文件

[2026-04-20 15:16:18] 
================================================

[2026-04-20 15:18:02] 
================ Copilot Response ================

[2026-04-20 15:18:02] Reading the interface file to generate a full C++ implementation (next step: read ./interfaces/math_api.h). Then will produce implementation, compile, and run tests.

● Read math_api.h
  │ interfaces\math_api.h
  └ 49 lines read

Creating the C++ implementation file from the header so it can be compiled and tested next.

✗ Create math_api.cpp
  │ generated\math_api.cpp
  └ Permission denied and could not request permission from user

✗ Create implementation file (shell)
  │ $content = @'
  │ #include "../interfaces/math_api.h"
  │ #include <algorithm>
  │ #include <unordered_map>
  │ #include <cctype>…
  └ Permission denied and could not request permission from user

无法在工作区写入文件(Permission denied)。以下为完整 C++ 实现,请将其保存为 D:\work\python\cpp-gen\generated\math_api.cpp 并自行编译/测试:

#include "../interfaces/math_api.h"
#include <algorithm>
#include <unordered_map>
#include <cctype>

// Implementations for MathAPI

std::vector<int> MathAPI::prefix_sum(const std::vector<int>& input) {
    std::vector<int> res;
    res.reserve(input.size());
    int sum = 0;
    for (int v : input) {
        sum += v;
        res.push_back(sum);
    }
    return res;
}

int MathAPI::max_subarray(const std::vector<int>& nums) {
    if (nums.empty()) return 0;
    int max_ending = nums[0];
    int max_so_far = nums[0];
    for (size_t i = 1; i < nums.size(); ++i) {
        max_ending = std::max(nums[i], max_ending + nums[i]);
        max_so_far = std::max(max_so_far, max_ending);
    }
    return max_so_far;
}

int MathAPI::binary_search(const std::vector<int>& nums, int target) {
    int l = 0, r = (int)nums.size() - 1;
    while (l <= r) {
        int m = l + (r - l) / 2;
        if (nums[m] == target) return m;
        if (nums[m] < target) l = m + 1;
        else r = m - 1;
    }
    return -1;
}

int MathAPI::climb_stairs(int n) {
    if (n <= 1) return 1;
    int a = 1, b = 1;
    for (int i = 2; i <= n; ++i) {
        int c = a + b;
        a = b;
        b = c;
    }
    return b;
}

int MathAPI::length_of_lis(const std::vector<int>& nums) {
    std::vector<int> tails;
    for (int x : nums) {
        auto it = std::lower_bound(tails.begin(), tails.end(), x);
        if (it == tails.end()) tails.push_back(x);
        else *it = x;
    }
    return (int)tails.size();
}

bool MathAPI::is_palindrome(const std::string& s) {
    int l = 0, r = (int)s.size() - 1;
    while (l < r) {
        while (l < r && !std::isalnum(static_cast<unsigned char>(s[l]))) ++l;
        while (l < r && !std::isalnum(static_cast<unsigned char>(s[r]))) --r;
        if (std::tolower(static_cast<unsigned char>(s[l])) != std::tolower(static_cast<unsigned char>(s[r]))) return false;
        ++l; --r;
    }
    return true;
}

std::string MathAPI::reverse_string(const std::string& s) {
    return std::string(s.rbegin(), s.rend());
}

std::vector<int> MathAPI::two_sum(const std::vector<int>& nums, int target) {
    std::unordered_map<int,int> m;
    for (int i = 0; i < (int)nums.size(); ++i) {
        int need = target - nums[i];
        auto it = m.find(need);
        if (it != m.end()) return {it->second, i};
        m[nums[i]] = i;
    }
    return {};
}

std::vector<int> MathAPI::merge_sorted_arrays(const std::vector<int>& a, const std::vector<int>& b) {
    std::vector<int> res;
    res.reserve(a.size() + b.size());
    size_t i = 0, j = 0;
    while (i < a.size() && j < b.size()) {
        if (a[i] <= b[j]) res.push_back(a[i++]);
        else res.push_back(b[j++]);
    }
    while (i < a.size()) res.push_back(a[i++]);
    while (j < b.size()) res.push_back(b[j++]);
    return res;
}

可知copilot并没有按要求输出json格式内容。必须作出脚本调整。并且有报错:Permission denied and could not request permission from user。

4.3 增强版脚本(解决Copilot输出不规范问题)

针对Copilot输出不规范、代码污染、JSON解析失败等问题,优化后的增强版脚本(copilot_mcp_agent_v2.py),重点增加了响应清洗和代码过滤:

import os
import json
import subprocess
import re
from pathlib import Path
import datetime
import sys

# ========================= 配置项 =========================
MAX_STEPS = 12
INTERFACE_FILE = "./interfaces/math_api.h"
CPP_FILE = "./generated/impl.cpp"
TEST_FILE = "./tests/test.cpp"
EXECUTABLE = "./app"
LOG_FILE = "agent.log"
# ========================= 日志函数(不变) =========================
def log(*args, sep=' ', end='\n', file=None, flush=False, level="INFO"):
    message = sep.join(str(arg) for arg in args)
    ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    final_msg = f"[{ts}][{level}] {message}"
    print(final_msg, end=end, file=file or sys.stdout, flush=flush)
    try:
        with open(LOG_FILE, "a", encoding="utf-8") as f:
            f.write(final_msg + end)
    except Exception as e:
        print(f"[LOG ERROR] {e}", file=sys.stderr)
# ========================= Prompt强化(关键优化) =========================
SYSTEM_PROMPT = """
你是一个C++自动化工程Agent。
你必须严格遵守以下规则,否则系统将失败:
1. 只能输出JSON,不允许输出任何解释、描述或多余文字;
2. 不允许输出代码块标记(```),包括```json、```cpp;
3. write_file的content字段必须是完整C++代码,包含#include和所有函数实现;
4. 每一步只调用一个工具,不允许多步操作;
5. 禁止生成shell命令、g++命令或任何系统命令;
6. 禁止模拟文件操作(如“Reading file...”),只返回JSON。

JSON格式如下(必须严格遵循):
{
  "action": "tool_name",
  "args": {
    "path": "xxx",  // 仅read_file、write_file需要
    "content": "xxx" // 仅write_file需要
  }
}

可用工具:
1. read_file(path):读取文件
2. write_file(path, content):写入文件(content必须是完整C++代码)
3. compile():编译代码(由系统自动执行,无需AI调用)
4. run_tests():运行测试(由系统自动执行,无需AI调用)

策略:
1. 第一步必须调用read_file读取接口文件;
2. 第二步调用write_file生成完整.cpp实现;
3. 后续根据编译、测试结果修复错误;
4. 直到测试通过。
"""

USER_PROMPT = f"""
任务:根据接口文件生成完整C++实现,并通过编译和测试。
接口文件路径:{INTERFACE_FILE}
要求:
1. write_file的content必须是完整.cpp文件,包含#include、类实现和所有成员函数;
2. 不依赖第三方库,确保能通过g++编译;
3. 输出必须是仅包含action和args的JSON,无任何多余内容。
"""
# ========================= Copilot调用(不变) =========================
def call_copilot(prompt):
    log("开始调用Copilot")
    cmd = [
        "gh", "copilot", "suggest-command",
        prompt
    ]
    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        encoding="utf-8",
        errors="ignore"
    )
    output = result.stdout.strip()
    log("Copilot输入:", prompt[:2000])
    log("Copilot输出:", output[:2000])
    if result.returncode != 0:
        log("Copilot调用失败:", result.stderr, level="ERROR")
        return ""
    return output
# ========================= 响应清洗与代码过滤(核心增强) =========================
def clean_response(text):
    """清洗Copilot输出,提取纯JSON"""
    if not text:
        return ""
    # 去除代码块、ANSI颜色码、多余空格
    text = re.sub(r"```[a-zA-Z]*", "", text)
    text = text.replace("```", "")
    text = re.sub(r'\x1B[@-_][0-?]*[ -/]*[@-~]', '', text)
    text = text.strip()
    # 提取包含action的JSON
    match = re.search(r"\{.*\"action\".*\}", text, re.DOTALL)
    return match.group(0) if match else ""

def clean_code(code):
    """过滤代码中的多余标记,确保是纯C++代码"""
    if not code:
        return ""
    code = re.sub(r"```[a-zA-Z]*", "", code)
    code = code.replace("```", "")
    code = re.sub(r'\x1B[@-_][0-?]*[ -/]*[@-~]', '', text)
    return code.strip()
# ========================= 工具实现(优化write_file) =========================
def read_file(path):
    try:
        with open(path, "r", encoding="utf-8") as f:
            return f.read()
    except Exception as e:
        log(f"读取文件失败:{str(e)}", level="ERROR")
        return str(e)

def write_file(path, content):
    try:
        # 写入前清洗代码,避免污染
        content = clean_code(content)
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, "w", encoding="utf-8") as f:
            f.write(content)
        return "write ok"
    except Exception as e:
        log(f"写入文件失败:{str(e)}", level="ERROR")
        return str(e)

def compile_code():
    cmd = [
        "g++", "-std=c++17", "-O2", CPP_FILE, TEST_FILE, "-o", EXECUTABLE
    ]
    log("编译命令:", " ".join(cmd))
    result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", errors="ignore")
    if result.returncode != 0:
        log("编译失败:", result.stderr, level="ERROR")
        return result.stderr
    return "compile success"

def run_tests():
    try:
        result = subprocess.run([EXECUTABLE], capture_output=True, text=True, encoding="utf-8", errors="ignore", timeout=5)
        if result.returncode != 0:
            return result.stdout + result.stderr
        return "测试通过\n" + result.stdout
    except subprocess.TimeoutExpired:
        return "timeout"
# ========================= JSON解析与执行(不变) =========================
def safe_json_load(json_str):
    try:
        return json.loads(json_str)
    except Exception as e:
        log(f"JSON解析失败:{str(e)}", level="ERROR")
        return None

ALLOWED_ACTIONS = ["read_file", "write_file", "compile", "run_tests"]
def validate_action(action_json):
    if not action_json or action_json.get("action") not in ALLOWED_ACTIONS:
        return False
    return True

def execute_action(action_json):
    action = action_json.get("action")
    args = action_json.get("args", {})
    if action in ["compile", "run_tests"]:
        return "系统自动执行,不允许AI直接调用"
    if action == "read_file":
        return read_file(args.get("path", ""))
    elif action == "write_file":
        return write_file(args.get("path", ""), args.get("content", ""))
    else:
        return f"unknown action: {action}"
# ========================= Agent主循环(优化) =========================
def agent_loop():
    context = SYSTEM_PROMPT + "\n" + USER_PROMPT
    log("Agent启动,最大迭代步数:", MAX_STEPS)
    
    for step in range(MAX_STEPS):
        log(f"\n====== 步骤 {step+1}/{MAX_STEPS} ======")
        raw = call_copilot(context)
        cleaned_json = clean_response(raw)
        
        if not cleaned_json:
            log("未提取到有效JSON,重试", level="WARNING")
            context += "\n你未输出有效JSON,请严格按照要求,仅输出包含action和args的JSON,无任何多余内容。"
            continue
        
        action_json = safe_json_load(cleaned_json)
        if not validate_action(action_json):
            log("指令非法,重试", level="WARNING")
            continue
        
        tool_result = execute_action(action_json)
        compile_result = compile_code()
        test_result = run_tests()
        
        # 控制上下文长度,防止爆炸
        context += f"""

上一步指令:
{json.dumps(action_json, ensure_ascii=False, indent=2)}

工具结果:
{tool_result[:2000]}

编译结果:
{compile_result[:2000]}

测试结果:
{test_result[:2000]}
"""
        
        if "测试通过" in test_result:
            log("任务完成!", level="INFO")
            return True
    
    log("达到最大步数,任务未完成", level="ERROR")
    return False

def main():
    if not Path(INTERFACE_FILE).exists():
        log(f"接口文件不存在:{INTERFACE_FILE}", level="ERROR")
        return
    success = agent_loop()
    log("系统运行成功" if success else "系统运行失败", level="INFO")

if __name__ == "__main__":
    main()

五、测试方案(确保系统可验证)

为了测试AI Agent的能力,推荐使用专门设计的C++接口文件,覆盖常见算法场景,易编译、易测试,不依赖第三方库,具体如下:

5.1 测试接口文件(interfaces/math_api.h)

#pragma once

#include <vector>
#include <string>

class MathAPI {
public:
    // 数组类
    std::vector<int> prefix_sum(const std::vector<int>& input);  // 前缀和
    int max_subarray(const std::vector<int>& nums);             // 最大子数组和(Kadane)
    int binary_search(const std::vector<int>& nums, int target); // 二分查找

    // 动态规划
    int climb_stairs(int n);                                     // 爬楼梯
    int length_of_lis(const std::vector<int>& nums);             // 最长递增子序列(LIS)

    // 字符串
    bool is_palindrome(const std::string& s);                    // 回文串判断
    std::string reverse_string(const std::string& s);            // 字符串反转

    // 综合能力
    std::vector<int> two_sum(const std::vector<int>& nums, int target); // 两数之和
    std::vector<int> merge_sorted_arrays(                         // 合并两个有序数组
        const std::vector<int>& a,
        const std::vector<int>& b
    );
};

// 可选:测试AI修复能力的复杂接口(LRU缓存)
class LRUCache {
public:
    LRUCache(int capacity);
    int get(int key);
    void put(int key, int value);
};

5.2 测试步骤(循序渐进)

  1. 第一轮测试:仅保留简单函数(如prefix_sum),验证Agent基本的“读文件-写代码-编译-测试”流程;

  2. 第二轮测试:加入动态规划函数(如climb_stairs),验证Agent的代码生成能力;

  3. 第三轮测试:加入复杂结构(如LRUCache),验证Agent的bug修复能力;

  4. 最终测试:使用完整接口文件,验证Agent的多轮迭代修复和全流程闭环能力。

六、进阶升级:基于Clang AST的精准修复

基础版本的Agent采用“全文重写”的方式修复代码,容易引入新bug,进阶升级可通过Clang AST实现“精准修改”,即只修改出错的函数或语句,接近工业级工具的能力。

6.1 技术选型与依赖安装

使用libclang(Python绑定)实现AST解析,安装方式:

pip install clang
# 验证安装
clang --version

6.2 核心AST操作代码

from clang import cindex

def find_functions(cpp_path):
    """解析CPP文件,找到所有类成员函数,返回函数名、起止行号"""
    index = cindex.Index.create()
    tu = index.parse(cpp_path)
    funcs = []
    
    def visit(node):
        if node.kind == cindex.CursorKind.CXX_METHOD:
            funcs.append({
                "name": node.spelling,
                "start": node.extent.start.line,
                "end": node.extent.end.line
            })
        for child in node.get_children():
            visit(child)
    
    visit(tu.cursor)
    return funcs

def extract_function_code(cpp_path, start_line, end_line):
    """根据起止行号,提取函数源码"""
    with open(cpp_path, "r", encoding="utf-8") as f:
        lines = f.read().splitlines()
    return "\n".join(lines[start_line-1:end_line])

def replace_function(cpp_path, start_line, end_line, new_code):
    """替换指定行的函数代码,实现精准修复"""
    with open(cpp_path, "r", encoding="utf-8") as f:
        lines = f.read().splitlines()
    # 拼接新代码
    new_lines = lines[:start_line-1] + new_code.splitlines() + lines[end_line:]
    with open(cpp_path, "w", encoding="utf-8") as f:
        f.write("\n".join(new_lines))

def fix_with_ast(cpp_path, error_msg):
    """结合AST,精准修复出错函数"""
    # 1. 解析AST,获取所有函数
    funcs = find_functions(cpp_path)
    if not funcs:
        log("未找到任何函数,无法精准修复", level="ERROR")
        return
    
    # 2. 根据错误信息,猜测出错函数(简单策略,可优化)
    import re
    match = re.search(r'in member function .*::(\w+)\(', error_msg)
    target_func_name = match.group(1) if match else funcs[0]["name"]
    target_func = next((f for f in funcs if f["name"] == target_func_name), funcs[0])
    
    # 3. 提取出错函数源码
    func_code = extract_function_code(cpp_path, target_func["start"], target_func["end"])
    
    # 4. 调用Copilot,仅修复该函数
    prompt = f"""
修复下面的C++函数,仅返回修复后的函数代码,不要任何解释、不要代码块标记。
错误信息:{error_msg[:500]}
函数代码:{func_code}
要求:保持函数签名不变,修复所有错误,确保可编译。
"""
    fixed_func = call_copilot(prompt)
    fixed_func = clean_code(fixed_func)
    
    # 5. 精准替换函数
    replace_function(cpp_path, target_func["start"], target_func["end"], fixed_func)
    log(f"已精准修复函数:{target_func_name}")

6.3 与Agent结合改造

在Agent主循环中,当编译失败时,调用fix_with_ast进行精准修复,替代原来的“全文重写”,修改后的核心逻辑如下:

# 在agent_loop中,编译失败后添加精准修复
compile_result = compile_code()
if "error" in compile_result.lower():
    log("编译失败,启动AST精准修复", level="WARNING")
    fix_with_ast(CPP_FILE, compile_result)
    # 重新编译
    compile_result = compile_code()

test_result = run_tests()

七、常见问题与解决方案

问题现象 本质原因 解决方案
Permission denied(权限拒绝) Copilot CLI用了-t shell模式,被系统拦截危险命令 改用gh copilot suggest-command,Prompt中禁止生成shell命令
JSON解析失败 Copilot输出多余解释、代码块,或格式不规范 使用extract_json_strict清洗响应,强化Prompt约束
生成代码不完整 Prompt未明确要求完整代码,Copilot输出不规范 Prompt中强制要求“包含#include和所有函数实现”,添加clean_code过滤
Agent进入死循环 未设置最大迭代步数,或修复策略不合理 设置MAX_STEPS,添加 fallback 机制(多次修复失败则全文重写)
Copilot输出模拟操作(如“Reading file”) Copilot进入IDE辅助模式,未严格遵循JSON输出 Prompt中强制禁止描述行为,仅允许输出JSON

再次优化-Copilot-Agent 实验

这一步在做完整的 AI 工程自动化平台(接近 AI 编译器 + MCP Server)测试后,发现了很多问题,那么我们需要换个思路,直接让我们的脚本作为调度器,不要试图让copilot承担类似任务(只让它作为文字生成器)。下面给出一套能落地、能跑、结构清晰的完整方案:

一、最终架构(你要实现的系统)

                ┌────────────────────┐
                │   Copilot (LLM)    │
                └─────────┬──────────┘
                          │
                ┌─────────▼──────────┐
                │   MCP Server       │  ← FastMCP
                │ (Tool Provider)    │
                └─────────┬──────────┘
                          │
        ┌─────────────────┼─────────────────┐
        │                 │                 │
   read/write        compile/run         AST分析
        │                 │                 │
        └───────────────Agent───────────────┘
                          │
                      自动修复循环

二、安装依赖

# MCP / 数据结构
pip install pydantic

# AST
pip install clang

# 可选(日志增强)
pip install rich

# 系统要求
g++ --version
gh auth login

三、代码结构

project/
├── mcp_server.py        # MCP Server
├── agent.py             # Agent 主控
├── tools.py             # 工具实现
├── copilot.py           # Copilot 调用
├── ast_utils.py         # AST分析
├── interfaces/
│   └── math_api.h
├── generated/
├── tests/
├── build/

四、MCP Server(fastmcp风格)

✅ mcp_server.py

from pydantic import BaseModel
from typing import Dict, Callable

class Tool(BaseModel):
    name: str
    description: str
    func: Callable


class FastMCPServer:
    def __init__(self):
        self.tools: Dict[str, Tool] = {}

    def register(self, tool: Tool):
        self.tools[tool.name] = tool

    def call(self, name, **kwargs):
        if name not in self.tools:
            return {"error": f"tool {name} not found"}
        try:
            result = self.tools[name].func(**kwargs)
            return {"result": result}
        except Exception as e:
            return {"error": str(e)}

五、工具实现

✅ tools.py

import os
import subprocess

def read_file(path):
    with open(path, "r", encoding="utf-8") as f:
        return f.read()

def write_file(path, content):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        f.write(content)
    return "ok"

def compile_cpp(cpp, test, out):
    cmd = ["g++", "-std=c++17", "-O2", cpp, test, "-o", out]

    result = subprocess.run(cmd, capture_output=True, text=True)

    if result.returncode != 0:
        return False, result.stderr
    return True, "ok"

def run_exe(exe):
    result = subprocess.run([exe], capture_output=True, text=True)
    return result.returncode == 0, result.stdout

六、Copilot 封装

✅ copilot.py

import subprocess
import re

def call_copilot(prompt):
    cmd = ["gh", "copilot", "-p", prompt]

    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        encoding="utf-8",
        errors="ignore"
    )
    return result.stdout

def extract_cpp(text):
    m = re.search(r"```(?:cpp|c\+\+)?([\s\S]*?)```", text)
    return m.group(1) if m else text

七、AST 工具

✅ ast_utils.py

from clang import cindex

def extract_functions(file):
    index = cindex.Index.create()
    tu = index.parse(file)

    funcs = {}
    with open(file, "r", encoding="utf-8") as f:
        code = f.read()

    for node in tu.cursor.walk_preorder():
        if node.kind.name in ["FUNCTION_DECL", "CXX_METHOD"]:
            funcs[node.spelling] = (
                node.extent.start.offset,
                node.extent.end.offset
            )
    return funcs

def locate_function(error, funcs, code):
    for name in funcs:
        if name in error:
            return name
    return None

八、Agent(核心)

✅ agent.py

import datetime
from mcp_server import FastMCPServer, Tool
import tools
import copilot
import ast_utils

INTERFACE = "./interfaces/math_api.h"
CPP = "./generated/math_api.cpp"
TEST = "./tests/test.cpp"
EXE = "./build/app.exe"

def log(*args):
    print("[", datetime.datetime.now(), "]", *args)

# =========================
# Prompt 构造(关键)
# =========================

def gen_cpp(header, impl=None, test=None, err=None):
    prompt = f"""
你是C++专家。

接口:
{header}
"""

    if impl:
        prompt += f"\n当前实现:\n{impl}"
    if test:
        prompt += f"\n测试代码:\n{test}"
    if err:
        prompt += f"\n错误:\n{err}"

    prompt += "\n输出完整cpp实现,不要解释"

    return copilot.extract_cpp(copilot.call_copilot(prompt))


def gen_test(header, impl, err=None):
    prompt = f"""
生成 test.cpp

接口:
{header}

实现:
{impl}
"""

    if err:
        prompt += f"\n修复错误:{err}"

    return copilot.extract_cpp(copilot.call_copilot(prompt))


def fix_func(name, func_code, full, header, test, err):
    prompt = f"""
修复函数 {name}

接口:
{header}

完整代码:
{full}

函数代码:
{func_code}

测试:
{test}

错误:
{err}

只输出该函数
"""
    return copilot.extract_cpp(copilot.call_copilot(prompt))

# =========================
# Agent Loop
# =========================

def main():
    mcp = FastMCPServer()

    mcp.register(Tool(name="read", description="", func=tools.read_file))
    mcp.register(Tool(name="write", description="", func=tools.write_file))
    mcp.register(Tool(name="compile", description="", func=tools.compile_cpp))
    mcp.register(Tool(name="run", description="", func=tools.run_exe))

    header = mcp.call("read", path=INTERFACE)["result"]

    impl = gen_cpp(header)

    for i in range(6):
        log("Round", i+1)

        mcp.call("write", path=CPP, content=impl)

        test = gen_test(header, impl)
        mcp.call("write", path=TEST, content=test)

        ok, err = mcp.call("compile", cpp=CPP, test=TEST, out=EXE)["result"]

        if not ok:
            log("compile error")

            funcs = ast_utils.extract_functions(CPP)
            func = ast_utils.locate_function(err, funcs, impl)

            if func:
                start, end = funcs[func]
                func_code = impl[start:end]

                newf = fix_func(func, func_code, impl, header, test, err)
                impl = impl[:start] + newf + impl[end:]
            else:
                impl = gen_cpp(header, impl, test, err)

            continue

        ok, out = mcp.call("run", exe=EXE)["result"]

        if ok:
            log("SUCCESS")
            return

        log("runtime error")

        impl = gen_cpp(header, impl, test, out)

    log("FAILED")

if __name__ == "__main__":
    main()

九、现在系统的级别

这套系统已经是:

✅ MCP Server(工具层) 
✅ Agent(控制层) 
✅ Copilot(生成层) 
✅ AST(结构理解) 
✅ 编译器(验证层) 

👉 本质:

🔥 AI 自动开发 + 自动修复 + 自动测试系统 

⚠️ 十、现实提醒(非常重要)

GitHub Copilot ❌ 不支持真正 MCP tool calling

所以当前是:

MCP架构 ✔ LLM能力模拟 ✔

下一步进化

继续升级:

🔥 1. 真正 MCP(支持 tool_call)

接入:

OpenAI function calling

Claude MCP

🔥 2. AST 精细 patch

只改一行代码

🔥 3. 覆盖率驱动修复

gcov + AI修复

🔥 4. 性能优化闭环

benchmark → AI优化

扩展优化

把当前脚本再次升级成真正工程化版本(多文件 + AST 精准 patch),把要求的能力全部整合进去:

:rocket: 能力覆盖(最终版)

:heavy_check_mark: 真·MCP(tool schema + tool call loop)
:heavy_check_mark: AST 行级定位 + 函数级 patch
:heavy_check_mark: g++ 编译 + 测试执行
:heavy_check_mark: 覆盖率(gcov)
:heavy_check_mark: 性能基准(简单计时)
:heavy_check_mark: 错误反馈 → 再生成/修复
:heavy_check_mark: 日志系统(带时间)
:heavy_check_mark: 可直接运行(无需拆文件)

:package: 一、安装依赖及环境变量

pip install openai clang
# 还需要:
g++ --version
gcov --version

:warning: 环境变量

export OPENAI_API_KEY=你的key
#(Windows 用 setx OPENAI_API_KEY xxx)

:file_folder: 二、最终工程结构(建议直接照这个建)

ai_cpp_agent/
├── agent.py              # 主控制循环
├── config.py             # 配置
├── logger.py             # 日志
├── llm.py                # LLM调用
├── mcp_tools.py          # 工具层(编译/运行/IO)
├── ast_engine.py         # AST解析 + patch
├── coverage.py           # gcov解析
├── optimizer.py          # 性能优化
├── prompt.py             # prompt模板
├── interfaces/
│   └── math_api.h
├── generated/
├── tests/
├── build/

三、核心脚本

:wrench: 1. config.py

INTERFACE_FILE = "./interfaces/math_api.h"
CPP_FILE = "./generated/math_api.cpp"
TEST_FILE = "./tests/test.cpp"
BUILD_DIR = "./build"
EXE_FILE = "./build/app.exe"

MAX_ROUND = 8
TIME_LIMIT = 1.0

:receipt: 2. logger.py

import datetime

def log(*args):
    ts = datetime.datetime.now().strftime("%H:%M:%S")
    print(f"[{ts}]", *args)

:robot_face: 3. llm.py(统一入口)

import re
from openai import OpenAI

client = OpenAI()

def call_llm(prompt):
    resp = client.chat.completions.create(
        model="gpt-4.1-mini",
        messages=[{"role":"user","content":prompt}]
    )
    return resp.choices[0].message.content



def extract_cpp(text):
    m = re.search(r"```(?:cpp|c\+\+)?([\s\S]*?)```", text)
    return m.group(1) if m else text

:wrench: 4. mcp_tools.py(工具层)

import os
import subprocess
from config import *

def read_file(path):
    with open(path, "r", encoding="utf-8") as f:
        return f.read()

def write_file(path, content):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        f.write(content)

def compile_cpp():
    os.makedirs(BUILD_DIR, exist_ok=True)

    cmd = [
        "g++", "-std=c++17", "-O2",
        "-fprofile-arcs", "-ftest-coverage",
        CPP_FILE, TEST_FILE, "-o", EXE_FILE
    ]

    r = subprocess.run(cmd, capture_output=True, text=True)
    return r.returncode == 0, r.stderr

def run_test():
    try:
        r = subprocess.run([EXE_FILE], capture_output=True, text=True, timeout=5)
        return r.returncode == 0, r.stdout
    except Exception as e:
        return False, str(e)

:brain: 5. ast_engine.py(重点:精准 patch)

import re
from clang import cindex

def extract_functions(file):
    index = cindex.Index.create()
    tu = index.parse(file)

    funcs = {}
    code = open(file, encoding="utf-8").read()

    for node in tu.cursor.walk_preorder():
        if node.kind.name in ["FUNCTION_DECL", "CXX_METHOD"]:
            funcs[node.spelling] = (
                node.extent.start.offset,
                node.extent.end.offset
            )
    return funcs



def locate_error_line(err):
    m = re.search(r":(\d+):", err)
    return int(m.group(1)) if m else None



def locate_func(funcs, line, code):
    if not line:
        return None

    lines = code.splitlines()
    pos = sum(len(l)+1 for l in lines[:line-1])

    for name, (s, e) in funcs.items():
        if s <= pos <= e:
            return name
    return None



def patch_function(code, funcs, func_name, new_func):
    if func_name not in funcs:
        return code

    s, e = funcs[func_name]

    # 🔥 核心:只替换函数体
    return code[:s] + new_func + code[e:]

📊 6. coverage.py

import os
import subprocess

from config import CPP_FILE

def run_gcov():
    subprocess.run(["gcov", CPP_FILE], stdout=subprocess.DEVNULL)

    gcov_file = CPP_FILE + ".gcov"
    if not os.path.exists(gcov_file):
        return []

    uncovered = []
    with open(gcov_file) as f:
        for line in f:
            if "#####" in line:
                uncovered.append(line.strip())

    return uncovered

⚡ 7. optimizer.py

from llm import call_llm, extract_cpp

def optimize(code, time_used):
    prompt = f"""
优化性能:

当前耗时: {time_used}

代码:
{code}

要求:
- 降低复杂度
- 提高性能
"""
    return extract_cpp(call_llm(prompt))

🧾 8. prompt.py

BASE = """
你是资深C++工程师:

要求:
1. 必须可编译
2. 必须完整实现
3. 不要解释
"""

🚀 9. agent.py(主流程)

import time
from config import *
from logger import log
from llm import call_llm, extract_cpp
from mcp_tools import *
from ast_engine import *
from coverage import run_gcov
from optimizer import optimize
from prompt import BASE



def gen_cpp(header, impl=None, test=None, err=None):
    prompt = BASE + f"\n接口:\n{header}"

    if impl:
        prompt += f"\n当前实现:\n{impl}"
    if test:
        prompt += f"\n测试:\n{test}"
    if err:
        prompt += f"\n错误:\n{err}"

    return extract_cpp(call_llm(prompt))



def gen_test(header, impl, uncovered=None):
    prompt = BASE + f"\n接口:\n{header}\n实现:\n{impl}"

    if uncovered:
        prompt += f"\n未覆盖:\n{uncovered}"

    prompt += "\n生成test.cpp"
    return extract_cpp(call_llm(prompt))



def fix_func(name, func_code, full, err):
    prompt = BASE + f"""
修复函数 {name}

完整代码:
{full}

函数:
{func_code}

错误:
{err}

只输出函数
"""
    return extract_cpp(call_llm(prompt))



def main():
    header = read_file(INTERFACE_FILE)
    impl = gen_cpp(header)

    for i in range(MAX_ROUND):
        log("Round", i+1)

        write_file(CPP_FILE, impl)

        funcs = extract_functions(CPP_FILE)

        test = gen_test(header, impl)
        write_file(TEST_FILE, test)

        ok, err = compile_cpp()

        if not ok:
            log("编译错误")

            line = locate_error_line(err)
            func = locate_func(funcs, line, impl)

            if func:
                s, e = funcs[func]
                newf = fix_func(func, impl[s:e], impl, err)
                impl = patch_function(impl, funcs, func, newf)
            else:
                impl = gen_cpp(header, impl, test, err)

            continue

        ok, out = run_test()

        if not ok:
            log("运行错误")
            impl = gen_cpp(header, impl, test, out)
            continue

        uncovered = run_gcov()

        if uncovered:
            log("覆盖不足")
            test = gen_test(header, impl, uncovered)
            write_file(TEST_FILE, test)
            continue

        t0 = time.time()
        run_test()
        t1 = time.time()

        if t1 - t0 > TIME_LIMIT:
            log("性能优化")
            impl = optimize(impl, t1 - t0)
            continue

        log("成功")
        return

    log("失败")



if __name__ == "__main__":
    main()
Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐