Go 泛型封装 Claude API SDK:写一个优雅的客户端

环境说明: 本文代码基于 Go 1.21+(泛型自 1.18 引入,1.21 起 slices/maps 标准库可用)和官方 SDK github.com/anthropics/anthropic-sdk-go。阅读前建议先确认本地 Go 版本:go version


0. 先看一段让人头疼的代码

不加任何封装,用官方 SDK 发一条最简单的消息,代码大概长这样:

client := anthropic.NewClient(option.WithAPIKey(os.Getenv("ANTHROPIC_API_KEY")))

msg, err := client.Messages.New(context.Background(), anthropic.MessageNewParams{
    Model:     anthropic.F(anthropic.ModelClaude3_5SonnetLatest),
    MaxTokens: anthropic.F(int64(1024)),
    Messages: anthropic.F([]anthropic.MessageParam{
        anthropic.NewUserMessage(anthropic.NewTextBlock("你好")),
    }),
})
if err != nil {
    log.Fatal(err)
}

// 还要类型断言才能拿到文本
if block, ok := msg.Content[0].(anthropic.TextBlock); ok {
    fmt.Println(block.Text)
}

anthropic.F()anthropic.NewUserMessage()anthropic.NewTextBlock()——每一步都在和 SDK 的类型体操较劲。这些代码一旦散落在业务逻辑里,可读性会迅速崩塌。这就是我们需要封装层的根本原因。


1. 官方 SDK 的现状与局限

anthropic/anthropic-sdk-go 是 Anthropic 的官方 Go SDK,接口设计相当完整,消息、流式响应、工具调用这些核心功能都有覆盖。但在生产环境里长期维护,直接用原始 SDK 会碰到几个比较具体的麻烦:

① 参数结构冗长 所有字段都要通过泛型函数 anthropic.F[T]() 包成 Field[T] 类型——这是 SDK 内部区分「零值」和「未设置」的机制,对 SDK 开发者来说合理,但对业务调用方来说纯粹是噪音。

② 响应解析要手动断言 msg.Content[]ContentBlock 接口切片,每次想取文本都得断言 .(anthropic.TextBlock),稍不留神就会在非文本响应时 panic,或者悄悄把错误吞掉。

③ 错误处理没有分层 官方 SDK 的错误基本只分「有 err」和「没 err」,但业务层往往需要自己判断 429 限流、401 鉴权失败、400 上下文超长……这些逻辑如果不统一收拢,会在每个调用点反复出现。

④ 可测试性弱 SDK 暴露的是具体结构体而非接口,业务代码直接依赖 *anthropic.Client,单元测试就没办法注入 mock,只能打真实 API——这在 CI/CD 环境里几乎是不可接受的。

封装层要解决的,正是这四个问题。


2. 封装层的设计目标与选型

动手写代码之前,先把封装层的边界说清楚:

层次 职责 不做什么
封装层 类型安全转换、重试、限速、日志 不替换模型选择、不侵入业务提示词
官方 SDK 实际 HTTP 通信、签名、流式解析 保留,不绕过
业务层 调用封装层接口,处理领域逻辑 不感知 HTTP 细节

为什么选 Functional Options 而不是 Builder?

Builder 模式要求调用方记住最后调 .Build(),容易漏掉;Functional Options 每个选项独立,可以自由组合,对已有代码也没有侵入:

client, err := claude.NewClient(
    claude.WithAPIKey(os.Getenv("ANTHROPIC_API_KEY")),
    claude.WithModel(anthropic.ModelClaude3_5SonnetLatest),
    claude.WithMaxRetries(3),
    claude.WithRateLimit(10), // 每秒最多 10 次请求
)

这就是我们希望业务代码看到的样子——初始化一次,到处复用。


3. 用 Go 泛型定义类型安全的响应包装器

这是全文最核心的部分,也是大多数同类文章没有深入展开的地方。

3.1 没有泛型时的真实痛点

假设要写一个通用的「解析响应文本」函数,Go 1.18 之前只能这么写:

func extractText(resp interface{}) (string, error) {
    msg, ok := resp.(*anthropic.Message)
    if !ok {
        return "", errors.New("unexpected response type")
    }
    block, ok := msg.Content[0].(anthropic.TextBlock)
    if !ok {
        return "", errors.New("unexpected content block type")
    }
    return block.Text, nil
}

两层类型断言,编译器对类型错误毫无感知,所有问题都留到运行时才爆发。

3.2 泛型响应包装器

用泛型定义一个统一的响应容器,让类型信息在编译期就可见:

// Response 是封装层统一的响应结构
type Response[T any] struct {
    Data       T
    TokensUsed TokenUsage
    Model      string
    StopReason string
}

type TokenUsage struct {
    InputTokens  int64
    OutputTokens int64
}

// TextResponse 是最常用的文本响应具体化
type TextResponse = Response[string]

有了 Response[T],调用方拿到的永远是类型确定的 Data,不再需要任何断言:

resp, err := client.Chat(ctx, "你好")
if err != nil {
    return err
}
fmt.Println(resp.Data)          // string,直接用
fmt.Println(resp.TokensUsed)    // 结构化 token 统计,直接用

3.3 泛型约束在请求参数中的应用

定义请求选项时,同样可以用类型约束来限制合法输入范围:

// MessageOption 是函数式选项的类型
type MessageOption func(*messageConfig)

type messageConfig struct {
    model      string
    maxTokens  int64
    system     string
    historyMsg []anthropic.MessageParam
    temperature *float64
}

// WithTemperature 带边界守卫的温度设置,避免传入明显非法值
func WithTemperature(t float64) MessageOption {
    return func(c *messageConfig) {
        if t < 0 || t > 1 {
            return // 或 panic,视项目策略而定
        }
        c.temperature = &t
    }
}

对于更复杂的场景,比如同时支持 TextRequestVisionRequest 这两种请求类型,可以定义约束接口:

type RequestPayload interface {
    TextRequest | VisionRequest
}

func Send[T RequestPayload](ctx context.Context, payload T, opts ...MessageOption) (*Response[string], error) {
    // 根据 T 的实际类型分支处理,编译期保证 T 只能是已知类型
    // ...
}

这个模式在扩展多模态请求时特别有用——新增请求类型只需在约束里加一个,已有代码完全不受影响。


4. 核心客户端实现

4.1 客户端结构定义

package claude

import (
    "net/http"
    "golang.org/x/time/rate"
    "github.com/anthropics/anthropic-sdk-go"
    "github.com/anthropics/anthropic-sdk-go/option"
)

// Client 是封装后对外暴露的接口,便于 mock
type Client interface {
    Chat(ctx context.Context, prompt string, opts ...MessageOption) (*TextResponse, error)
    ChatWithHistory(ctx context.Context, history []Message, opts ...MessageOption) (*TextResponse, error)
    Stream(ctx context.Context, prompt string, dst io.Writer, opts ...MessageOption) error
}

type client struct {
    sdk     *anthropic.Client
    cfg     *clientConfig
    limiter *rate.Limiter
}

func NewClient(opts ...ClientOption) (Client, error) {
    cfg := defaultConfig()
    for _, o := range opts {
        o(cfg)
    }
    if cfg.apiKey == "" {
        return nil, errors.New("claude: API key is required")
    }

    httpClient := &http.Client{
        Transport: &http.Transport{
            MaxIdleConns:        100,
            MaxIdleConnsPerHost: 10,
            IdleConnTimeout:     90 * time.Second,
        },
        Timeout: cfg.requestTimeout,
    }

    sdk := anthropic.NewClient(
        option.WithAPIKey(cfg.apiKey),
        option.WithHTTPClient(httpClient),
    )

    return &client{
        sdk:     sdk,
        cfg:     cfg,
        limiter: rate.NewLimiter(rate.Limit(cfg.rateLimit), cfg.rateLimit),
    }, nil
}

4.2 单轮对话

func (c *client) Chat(ctx context.Context, prompt string, opts ...MessageOption) (*TextResponse, error) {
    cfg := c.applyOptions(opts)

    // 客户端侧主动限速,在触发 429 之前就等待
    if err := c.limiter.Wait(ctx); err != nil {
        return nil, fmt.Errorf("claude: rate limiter: %w", err)
    }

    params := anthropic.MessageNewParams{
        Model:     anthropic.F(cfg.model),
        MaxTokens: anthropic.F(cfg.maxTokens),
        Messages: anthropic.F([]anthropic.MessageParam{
            anthropic.NewUserMessage(anthropic.NewTextBlock(prompt)),
        }),
    }
    if cfg.system != "" {
        params.System = anthropic.F([]anthropic.TextBlockParam{
            {Text: anthropic.F(cfg.system)},
        })
    }

    return c.doWithRetry(ctx, params)
}

4.3 流式响应处理

func (c *client) Stream(ctx context.Context, prompt string, dst io.Writer, opts ...MessageOption) error {
    cfg := c.applyOptions(opts)

    stream := c.sdk.Messages.NewStreaming(ctx, anthropic.MessageNewParams{
        Model:     anthropic.F(cfg.model),
        MaxTokens: anthropic.F(cfg.maxTokens),
        Messages: anthropic.F([]anthropic.MessageParam{
            anthropic.NewUserMessage(anthropic.NewTextBlock(prompt)),
        }),
    })
    defer stream.Close()

    for stream.Next() {
        event := stream.Current()
        // 只处理文本 delta 事件
        if delta, ok := event.Delta.(anthropic.ContentBlockDeltaEventDelta); ok {
            if textDelta, ok := delta.AsUnion().(anthropic.TextDelta); ok {
                if _, err := io.WriteString(dst, textDelta.Text); err != nil {
                    return fmt.Errorf("claude: write stream: %w", err)
                }
            }
        }
    }

    if err := stream.Err(); err != nil {
        return classifyError(err) // 见第 5 节
    }
    return nil
}

这里用 dst io.Writer 接收输出,好处是同一套代码可以无缝适配 HTTP SSE(传入 http.ResponseWriter)和 CLI 实时打印(传入 os.Stdout),调用方完全不用关心底层差异。ctx 取消信号由 stream.Next() 内部感知,配合 context.WithCancel 可以随时从外部优雅中止流。


5. 错误处理分层设计

5.1 自定义错误类型

type ErrorKind int

const (
    ErrUnknown        ErrorKind = iota
    ErrAuth                     // 401
    ErrRateLimit                // 429
    ErrContextLength            // 400 context too long
    ErrNetwork                  // 超时、连接拒绝
)

type APIError struct {
    Kind    ErrorKind
    Status  int
    Message string
    Wrapped error
}

func (e *APIError) Error() string {
    return fmt.Sprintf("claude[%d]: %s", e.Status, e.Message)
}
func (e *APIError) Unwrap() error { return e.Wrapped }

5.2 错误分类函数

func classifyError(err error) error {
    var apiErr *anthropic.APIStatusError
    if !errors.As(err, &apiErr) {
        return &APIError{Kind: ErrNetwork, Message: err.Error(), Wrapped: err}
    }
    switch apiErr.StatusCode {
    case 401:
        return &APIError{Kind: ErrAuth, Status: 401, Message: "authentication failed", Wrapped: err}
    case 429:
        return &APIError{Kind: ErrRateLimit, Status: 429, Message: "rate limit exceeded", Wrapped: err}
    case 400:
        if strings.Contains(apiErr.Message, "context") {
            return &APIError{Kind: ErrContextLength, Status: 400, Message: apiErr.Message, Wrapped: err}
        }
    }
    return &APIError{Kind: ErrUnknown, Status: apiErr.StatusCode, Message: apiErr.Message, Wrapped: err}
}

5.3 指数退避重试

func (c *client) doWithRetry(ctx context.Context, params anthropic.MessageNewParams) (*TextResponse, error) {
    var lastErr error
    for attempt := 0; attempt < c.cfg.maxRetries; attempt++ {
        if attempt > 0 {
            // 指数退避 + 随机 jitter,避免惊群效应
            base := time.Duration(1<<attempt) * 500 * time.Millisecond
            jitter := time.Duration(rand.Int63n(int64(base / 2)))
            select {
            case <-ctx.Done():
                return nil, ctx.Err()
            case <-time.After(base + jitter):
            }
        }

        msg, err := c.sdk.Messages.New(ctx, params)
        if err != nil {
            lastErr = classifyError(err)
            var apiErr *APIError
            // 鉴权失败没有重试意义,直接返回
            if errors.As(lastErr, &apiErr) && apiErr.Kind == ErrAuth {
                return nil, lastErr
            }
            continue
        }

        return buildTextResponse(msg), nil
    }
    return nil, fmt.Errorf("claude: exceeded max retries: %w", lastErr)
}

6. 测试策略:Mock 与集成测试

6.1 接口注入让单元测试成为可能

因为 Client 是接口,业务层可以直接注入 mock,不需要打真实 API:

type MockClient struct {
    ChatFn func(ctx context.Context, prompt string, opts ...MessageOption) (*TextResponse, error)
}

func (m *MockClient) Chat(ctx context.Context, prompt string, opts ...MessageOption) (*TextResponse, error) {
    return m.ChatFn(ctx, prompt, opts...)
}
// 实现其他接口方法...

6.2 用 httptest 验证请求格式

func TestClientSendsCorrectModel(t *testing.T) {
    var capturedBody []byte
    srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        capturedBody, _ = io.ReadAll(r.Body)
        // 返回最小合法响应
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(minimalMsgResponse("hello"))
    }))
    defer srv.Close()

    client, _ := NewClient(
        WithAPIKey("test-key"),
        WithBaseURL(srv.URL), // 指向本地测试服务器
    )
    _, err := client.Chat(context.Background(), "hi")
    require.NoError(t, err)

    var body map[string]any
    require.NoError(t, json.Unmarshal(capturedBody, &body))
    assert.Equal(t, "claude-3-5-sonnet-latest", body["model"])
}

6.3 表驱动测试覆盖四种典型场景

var chatTests = []struct {
    name       string
    serverCode int
    serverBody string
    wantErrKind ErrorKind
    wantText    string
}{
    {"正常响应", 200, validResponse("你好"), 0, "你好"},
    {"限流 429", 429, `{"error":{"type":"rate_limit_error"}}`, ErrRateLimit, ""},
    {"鉴权失败 401", 401, `{"error":{"type":"authentication_error"}}`, ErrAuth, ""},
    {"畸形 JSON", 200, `{invalid`, ErrUnknown, ""},
}

7. 并发与性能调优

7.1 http.Client 单例复用

Go 的 http.Client 是并发安全的,连接池由 Transport 统一管理。封装层在 NewClient 时创建一次,之后所有 goroutine 共享同一实例——这是大多数教程不会特别强调、但生产环境里必须保证的配置。

上文 NewClient 里已经配置了 MaxIdleConns: 100,QPS 较高的场景可以根据实际并发量调整 MaxIdleConnsPerHost

7.2 主动限速比被动重试更划算

// 每秒允许 10 次请求,峰值令牌桶容量也是 10
limiter: rate.NewLimiter(rate.Limit(cfg.rateLimit), cfg.rateLimit),

limiter.Wait(ctx) 在令牌不足时会阻塞,直到可用或 ctx 超时。相比被服务端 429 之后再退避重试,主动限速的 P99 延迟通常低一个数量级——毕竟一次主动等待,远比一次完整的请求-失败-等待-重试周期便宜。

7.3 基准测试参考

# 用 httptest 模拟,不打真实 API
go test -bench=BenchmarkChat -benchmem -count=3 ./...

BenchmarkChat-8   2847   421334 ns/op   4821 B/op   62 allocs/op

主要的内存分配来自 JSON 序列化/反序列化,这是网络层的固有成本。封装层本身引入的额外分配可以控制在个位数 alloc 以内。


8. 封装前后对比与使用决策

8.1 调用代码量对比

场景 原始 SDK 行数 封装后行数
单轮对话(含错误处理) ~20 行 ~5 行
流式响应 ~35 行 ~8 行
多轮对话(含历史管理) ~40 行 ~10 行

8.2 三个维度的横向比较

维度 直接使用官方 SDK 本文封装层
类型安全 需手动断言,运行时才暴露问题 泛型 Response[T],编译期就能发现
可测试性 依赖具体 struct,难以 mock Client 接口,mock 注入开箱即用
可扩展性 重试、限速逻辑散落各处 封装层统一策略,业务层完全无感知

8.3 什么时候用,什么时候不用

直接用官方 SDK 更合适的情况:

  • 一次性脚本或内部小工具,生命周期短;
  • 团队 Go 经验有限,多一层抽象反而增加认知负担;
  • 项目只需要调用一个固定端点,没有重试、限速、多租户这类需求。

引入封装层更合适的情况:

  • 需要对 API 调用做集中的可观测性(metrics、tracing);
  • 多个业务模块共用同一个 Claude 客户端;
  • 需要在 CI/CD 中对 AI 调用逻辑做单元测试;
  • 未来有可能切换模型供应商,或者接入兼容 API 平台(比如 ClaudeAPI 这类第三方兼容接入服务,只需在封装层统一改一下 BaseURL,业务代码完全不用动)。

说到底,封装层不是负担,而是一道边界——把「跟 SDK 打交道的脏活」集中在一处,让业务代码只关心真正该关心的事。


附录

参考资料

关于国内接入

如果在国内访问 Anthropic API 存在网络问题,可以考虑 ClaudeAPI 等第三方兼容接入平台。这类平台通常兼容 Anthropic API 协议,改一下封装层的 BaseURL 就能接入,一般也支持企业充值、开票等国内结算方式。具体能力和限制建议以各平台官网的最新说明为准,选型前最好实际测一测。

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐