双卡 A100 + Ollama 的 Python 生产调用轮询分发、失败重试、健康检查与吞吐压测
一、先把问题想明白:为什么调用层决定了双卡是否真的“跑起来”
很多人以为双实例已经拆好了:
11434→ GPU011435→ GPU1
那业务程序随便调一个端口,反正总会跑。
这其实不对。
因为你如果把所有请求都固定打到 11434,那从业务视角看,第二个实例等于不存在。双实例只是“有了两条车道”,但如果你的 Python 程序永远只走左边那条,右边那条再宽都没意义。
所以,双卡要吃起来,调用层至少要解决四件事:
- 请求要会分流
- 失败要会切换
- 实例要能探活
- 性能要能量化
而 Ollama 官方 API 正好把这四件事需要的关键接口都给了出来:
/api/generate:生成文本/api/chat:聊天式交互/api/ps:查看当前已加载到内存中的模型/api/version:轻量探活- OpenAI 兼容
/v1/:方便直接复用现有 SDK 代码
这意味着我们并不需要为了本地推理服务重新造一套复杂协议,直接围绕这些接口做 Python 侧封装就够了。(Ollama 文档)
二、Python 调 Ollama,其实有两条路线
路线一:直接调用原生 HTTP API
这是最直接、也最可控的方式。
例如,官方给出的最基础调用就是:
curl http://localhost:11434/api/generate -d '{
"model": "gemma3",
"prompt": "Why is the sky blue?"
}'
也就是说,Python 里你完全可以直接用 requests 或 httpx 去调用。这样做的好处是:
- 端口切换很方便
- 自己控制重试逻辑
- 自己控制健康检查
- 自己收集接口返回指标
对于双实例本地推理服务,这通常是最适合的路线。(Ollama 文档)
路线二:走 OpenAI 兼容接口
如果你现有业务代码已经大量使用 OpenAI SDK,那 Ollama 的 OpenAI 兼容能力会非常方便。官方文档给出的 Python 示例里,base_url 指向 http://localhost:11434/v1/,api_key='ollama' 虽然必填,但本地场景下会被忽略。(Ollama 文档)
示例长这样:
from openai import OpenAI
client = OpenAI(
base_url='http://localhost:11434/v1/',
api_key='ollama',
)
这条路线最大的优点是:你几乎不用改现有 OpenAI 调用代码,只需要把 base_url 指到本地 Ollama。 (Ollama 文档)
但如果你的重点是“双实例轮询和故障切换”,我更建议优先采用第一条,也就是:直接调原生 HTTP API。因为控制权更完整,分流逻辑更自然。
三、先写一个最小可用客户端
先不讲轮询,不讲压测,不讲熔断,先把最基础的调用类写出来。
下面这版只做三件事:
- 调
/api/generate - 调
/api/chat - 调
/api/version和/api/ps
import requests
from typing import Any, Dict, List, Optional
class OllamaClient:
def __init__(self, base_url: str, timeout: int = 300):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
def version(self) -> Dict[str, Any]:
resp = requests.get(f"{self.base_url}/api/version", timeout=10)
resp.raise_for_status()
return resp.json()
def ps(self) -> Dict[str, Any]:
resp = requests.get(f"{self.base_url}/api/ps", timeout=10)
resp.raise_for_status()
return resp.json()
def generate(
self,
model: str,
prompt: str = "",
stream: bool = False,
keep_alive: Optional[Any] = None,
options: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
payload = {
"model": model,
"prompt": prompt,
"stream": stream,
}
if keep_alive is not None:
payload["keep_alive"] = keep_alive
if options:
payload["options"] = options
resp = requests.post(
f"{self.base_url}/api/generate",
json=payload,
timeout=self.timeout,
)
resp.raise_for_status()
return resp.json()
def chat(
self,
model: str,
messages: List[Dict[str, Any]],
stream: bool = False,
keep_alive: Optional[Any] = None,
options: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
payload = {
"model": model,
"messages": messages,
"stream": stream,
}
if keep_alive is not None:
payload["keep_alive"] = keep_alive
if options:
payload["options"] = options
resp = requests.post(
f"{self.base_url}/api/chat",
json=payload,
timeout=self.timeout,
)
resp.raise_for_status()
return resp.json()
这段代码背后依赖的接口能力,全部来自 Ollama 官方 API:
- 默认 API 基地址是
http://localhost:11434/api /api/generate可用于文本生成/api/chat可用于聊天/api/ps可列出当前内存中已加载模型- 本地访问默认不需要认证
这些都是我们后续做轮询、探活、预热和观测的基石。(Ollama 文档)
四、双实例最核心的一步:不要写死单端口
真正让双卡跑起来的,不是你写了两个 service,而是你在 Python 里承认“后端不是一个实例,而是一个实例池”。
最简单的实例池可以这样设计:
import itertools
import threading
from typing import List
class OllamaPool:
def __init__(self, endpoints: List[str], timeout: int = 300):
self.clients = [OllamaClient(ep, timeout=timeout) for ep in endpoints]
self._cycle = itertools.cycle(range(len(self.clients)))
self._lock = threading.Lock()
def next_client(self) -> OllamaClient:
with self._lock:
idx = next(self._cycle)
return self.clients[idx]
你可以把它理解成一个最朴素的轮询器:
- 第 1 个请求给
11434 - 第 2 个请求给
11435 - 第 3 个请求再回到
11434 - 不断往复
如果你后端的两个实例都已经预热好,而且运行模型一致,这种最简单的轮询方式,往往就已经能把双卡吞吐明显拉起来。
为什么这个思路靠谱?
因为 Ollama 的 API 本质就是本地 HTTP 服务,而不是某种只能单点调用的特殊协议。只要你的 Python 客户端把两个地址都纳入调度范围,双实例就会自然变成“一个可轮询的后端池”。(Ollama 文档)
五、生产里不能只有轮询,还得有失败重试
轮询解决的是“均衡”,失败重试解决的是“韧性”。
现实里,一个实例可能会因为很多原因短暂不可用:
- 正在重启
- 正在重新加载模型
- 一次性请求太多
- 队列满了
- 进程临时异常
而 Ollama 官方 FAQ 明确提到:当请求太多时,服务可能会返回 503 overloaded;同时,OLLAMA_MAX_QUEUE 控制忙碌时最多能排多少请求。(Ollama 文档)
也就是说,503 不是“程序写坏了”,很多时候只是服务告诉你:我现在太忙了。 (Ollama 文档)
所以更合理的客户端不是“某个端口失败就直接报错”,而是:
- 先试一个实例
- 如果失败,再切到另一个实例
- 如果都失败,再向上抛错
示例代码如下:
import time
import requests
class ResilientOllamaPool(OllamaPool):
def generate(self, model: str, prompt: str, retries: int = 2):
last_error = None
for _ in range(retries):
client = self.next_client()
try:
return client.generate(
model=model,
prompt=prompt,
stream=False,
keep_alive=-1,
)
except requests.RequestException as e:
last_error = e
time.sleep(0.2)
raise last_error
def chat(self, model: str, messages, retries: int = 2):
last_error = None
for _ in range(retries):
client = self.next_client()
try:
return client.chat(
model=model,
messages=messages,
stream=False,
keep_alive=-1,
)
except requests.RequestException as e:
last_error = e
time.sleep(0.2)
raise last_error
这里有个很重要的参数:keep_alive=-1。
官方 FAQ 明确说明:
- 默认模型会在内存中保留 5 分钟
- API 支持
keep_alive - 任意负数都表示“常驻内存”
0则表示响应后立即卸载
这意味着,如果你希望两个实例都长期保持热模型状态,那么在关键请求里主动传 keep_alive=-1 是非常有价值的。(Ollama 文档)
六、只会重试还不够,还得学会“先探活再分流”
有些团队喜欢把所有失败都交给“请求失败后重试”去解决,这种方式能用,但不够优雅。
更好的做法是:把健康检查前置。
为什么?
因为你明知道某个实例不健康,就没必要继续把请求打过去,再浪费一次超时和重试成本。
Ollama 官方 API 里,/api/version 是个非常合适的轻量探活接口,而 /api/ps 则适合用来确认模型是否真的已经装载到内存中。(Ollama 文档)
所以一个更像生产代码的健康检查器可以这样写:
import time
class HealthAwareOllamaPool(OllamaPool):
def __init__(self, endpoints, timeout=300, unhealthy_ttl=10):
super().__init__(endpoints, timeout=timeout)
self.unhealthy = {}
self.unhealthy_ttl = unhealthy_ttl
def _is_available(self, idx: int) -> bool:
ts = self.unhealthy.get(idx)
if ts is None:
return True
return (time.time() - ts) > self.unhealthy_ttl
def _mark_unhealthy(self, idx: int):
self.unhealthy[idx] = time.time()
def healthy_clients(self):
result = []
for idx, client in enumerate(self.clients):
if self._is_available(idx):
result.append((idx, client))
return result
def probe(self) -> None:
for idx, client in enumerate(self.clients):
try:
client.version()
self.unhealthy.pop(idx, None)
except Exception:
self._mark_unhealthy(idx)
def generate(self, model: str, prompt: str):
self.probe()
candidates = self.healthy_clients()
if not candidates:
raise RuntimeError("没有可用的 Ollama 实例")
last_error = None
for idx, client in candidates:
try:
return client.generate(
model=model,
prompt=prompt,
stream=False,
keep_alive=-1,
)
except Exception as e:
self._mark_unhealthy(idx)
last_error = e
raise last_error
这版代码的思想比前一版更完整:
- 先探活
- 再分流
- 失败立即标记为不健康
- 暂时摘除一段时间
- 之后再尝试恢复
这和你前面文章里的 Nginx / HAProxy 健康检查思路是同一套哲学,只不过这次是在 Python 调用侧实现的。
七、预热感知:别让第一批用户帮你做冷启动
这是 Ollama 生产调用里非常容易被忽略、但又非常影响体验的一点。
官方 FAQ 明确说了,你可以给 /api/generate 或 /api/chat 发送一个空请求来预加载模型。例如:
curl http://localhost:11434/api/generate -d '{"model": "mistral"}'
如果再配合 keep_alive=-1,模型就可以一直留在内存中。(Ollama 文档)
所以,Python 侧不要等第一批用户请求到来才临时触发装载,而应该在服务启动后主动预热:
def prewarm(pool: OllamaPool, model: str):
for client in pool.clients:
client.generate(
model=model,
prompt="",
stream=False,
keep_alive=-1,
)
然后再用 /api/ps 检查两个实例是否都已经把模型装入内存:
def show_running_models(pool: OllamaPool):
for client in pool.clients:
print(client.base_url, client.ps())
官方 /api/ps 接口会返回当前正在运行的模型,并带上如 size_vram、context_length、expires_at 等信息,这对于判断“预热是否真的生效”非常有帮助。(Ollama 文档)
一句话概括就是:
预热不是锦上添花,而是让首包性能稳定的关键。
八、别只看“有没有返回”,要学会读 Ollama 的性能指标
很多人压测本地模型服务时,只会看两件事:
- 是否成功
- 总耗时多少秒
但 Ollama 官方 API 实际上会返回更细的性能指标,包括:
total_durationload_durationprompt_eval_countprompt_eval_durationeval_counteval_duration
而且这些时间字段的单位是纳秒。(Ollama 文档)
这意味着你可以非常清楚地拆解一次调用到底慢在什么地方。
1)load_duration
这表示模型加载耗时。
如果这个值很高,通常意味着:
- 模型刚被装载
- 模型没预热
- 模型被卸掉后又重新加载
- 内存状态不稳定
2)prompt_eval_duration
这表示处理输入提示词的耗时。
如果这个值很高,说明你可能:
- 输入上下文太长
- Prompt 太复杂
- 上下文长度配置太激进
3)eval_duration
这表示生成输出 token 的耗时。
如果这个值很高,通常更偏向模型生成阶段慢,而不一定是装载问题。
所以,在生产压测里,最有价值的不是“总耗时”,而是把这几项都记录下来。这样你才能真正判断:
- 是不是预热没做好
- 是不是 prompt 太长
- 是不是生成长度太长
- 是不是实例已经过载
这些指标字段都来自官方 API Usage 文档。(Ollama 文档)
九、写一个真正能用的小型压测脚本
下面我给一份偏实战的 Python 压测脚本。它不追求像专业压测工具那样花哨,但非常适合本地验证:
- 双实例是否都吃到流量
- 平均耗时是多少
- 模型装载是否稳定
- 失败率高不高
import time
import statistics
from concurrent.futures import ThreadPoolExecutor, as_completed
def benchmark(pool: ResilientOllamaPool, model: str, prompt: str, total_requests: int = 20, workers: int = 4):
results = []
failures = 0
def one_call():
start = time.time()
resp = pool.generate(model=model, prompt=prompt)
end = time.time()
return {
"wall_time": end - start,
"total_duration": resp.get("total_duration", 0) / 1e9,
"load_duration": resp.get("load_duration", 0) / 1e9,
"prompt_eval_duration": resp.get("prompt_eval_duration", 0) / 1e9,
"eval_duration": resp.get("eval_duration", 0) / 1e9,
"eval_count": resp.get("eval_count", 0),
}
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(one_call) for _ in range(total_requests)]
for future in as_completed(futures):
try:
results.append(future.result())
except Exception:
failures += 1
if results:
print("成功数:", len(results))
print("失败数:", failures)
print("平均 wall_time:", round(statistics.mean(x["wall_time"] for x in results), 3), "秒")
print("平均 total_duration:", round(statistics.mean(x["total_duration"] for x in results), 3), "秒")
print("平均 load_duration:", round(statistics.mean(x["load_duration"] for x in results), 3), "秒")
print("平均 prompt_eval_duration:", round(statistics.mean(x["prompt_eval_duration"] for x in results), 3), "秒")
print("平均 eval_duration:", round(statistics.mean(x["eval_duration"] for x in results), 3), "秒")
这类压测最适合回答下面这些问题:
- 两个实例是否都已预热
- 双实例轮询后吞吐是否比单实例更高
- 并发上来后
load_duration是否开始变差 - 是否出现明显的失败或超载
而且因为它直接读取 Ollama 自己返回的指标,所以你不仅知道“快不快”,还知道“慢在哪里”。(Ollama 文档)
十、如何判断压测结果是不是“健康”
这是很多人压测后最迷茫的地方。
跑完一轮,拿到一堆数字,然后不知道该怎么看。
我给你一个非常实用的判断框架。
情况一:load_duration 高,但后续下降明显
这通常是好事。
说明模型在预热或首次加载,后面开始进入稳定状态。这个时候你要做的不是恐慌,而是确认:
- 是否所有实例都已经预热
- 是否
keep_alive配置正确 - 是否
/api/ps里能看到模型常驻
官方 FAQ 对预热和 keep_alive 的行为解释得很清楚。(Ollama 文档)
情况二:prompt_eval_duration 高得离谱
这往往说明问题不在服务本身,而在输入。
例如:
- RAG 拼的上下文太长
- 一次性把大量对话历史塞进去
OLLAMA_CONTEXT_LENGTH和并发一起拉得太猛
因为官方 FAQ 已经明确指出,并发会按 OLLAMA_NUM_PARALLEL * OLLAMA_CONTEXT_LENGTH 放大内存需求。你如果一边高并发,一边长上下文,服务就很容易进入低效率区间。(Ollama 文档)
情况三:返回大量 503
这说明服务已经过载了。
官方 FAQ 直接写了:当请求过多时,Ollama 会返回 503 overloaded,而 OLLAMA_MAX_QUEUE 控制最大排队数。遇到这种情况,应该优先做三件事:
- 降低单实例并发冲击
- 增加分流能力
- 调整调用侧节流或重试策略
而不是只会盲目把队列继续拉大。(Ollama 文档)
十一、如果你已有 OpenAI SDK 代码,怎么接进双实例
有些团队已经有成熟的 OpenAI SDK 调用代码,不太想全部重写成原生 HTTP API。
那也可以,只不过你要把“实例池”逻辑做在 OpenAI Client 的外层。
官方 OpenAI 兼容文档给的关键点是:
base_url='http://localhost:11434/v1/'api_key='ollama'api_key本地必填但会被忽略
也就是说,双实例下你完全可以准备两个客户端:
from openai import OpenAI
client0 = OpenAI(
base_url="http://127.0.0.1:11434/v1/",
api_key="ollama",
)
client1 = OpenAI(
base_url="http://127.0.0.1:11435/v1/",
api_key="ollama",
)
然后对它们做轮询、健康检查和失败切换。底层思想和前面那套原生 HTTP API 没区别,只是把目标对象从 requests.post() 换成了 client.chat.completions.create()。(Ollama 文档)
如果你的现有系统已经深度绑定 OpenAI SDK,这条路线非常顺手。
但如果你问我哪条路线更适合“本地双实例高性能服务”,我还是会说:
原生 HTTP API 更适合做底层调度,OpenAI 兼容更适合做业务层接入。
十二、一套更像生产代码的完整示例
下面我把前面的核心逻辑合成一份更完整的示例,方便你直接落地。
import itertools
import threading
import time
from typing import Any, Dict, List, Optional
import requests
class OllamaClient:
def __init__(self, base_url: str, timeout: int = 300):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
def version(self) -> Dict[str, Any]:
r = requests.get(f"{self.base_url}/api/version", timeout=10)
r.raise_for_status()
return r.json()
def ps(self) -> Dict[str, Any]:
r = requests.get(f"{self.base_url}/api/ps", timeout=10)
r.raise_for_status()
return r.json()
def generate(
self,
model: str,
prompt: str = "",
stream: bool = False,
keep_alive: Optional[Any] = -1,
) -> Dict[str, Any]:
payload = {
"model": model,
"prompt": prompt,
"stream": stream,
"keep_alive": keep_alive,
}
r = requests.post(
f"{self.base_url}/api/generate",
json=payload,
timeout=self.timeout,
)
r.raise_for_status()
return r.json()
class ProductionOllamaPool:
def __init__(self, endpoints: List[str], timeout: int = 300, unhealthy_ttl: int = 10):
self.clients = [OllamaClient(ep, timeout=timeout) for ep in endpoints]
self._cycle = itertools.cycle(range(len(self.clients)))
self._lock = threading.Lock()
self._unhealthy_since: Dict[int, float] = {}
self.unhealthy_ttl = unhealthy_ttl
def _next_index(self) -> int:
with self._lock:
return next(self._cycle)
def _available(self, idx: int) -> bool:
ts = self._unhealthy_since.get(idx)
if ts is None:
return True
return time.time() - ts > self.unhealthy_ttl
def _mark_unhealthy(self, idx: int):
self._unhealthy_since[idx] = time.time()
def probe_all(self):
for idx, client in enumerate(self.clients):
try:
client.version()
self._unhealthy_since.pop(idx, None)
except Exception:
self._mark_unhealthy(idx)
def prewarm(self, model: str):
for idx, client in enumerate(self.clients):
try:
client.generate(model=model, prompt="", stream=False, keep_alive=-1)
except Exception:
self._mark_unhealthy(idx)
def generate(self, model: str, prompt: str, max_attempts: int = 2) -> Dict[str, Any]:
self.probe_all()
last_error = None
tried = set()
for _ in range(len(self.clients) * max_attempts):
idx = self._next_index()
if idx in tried and len(tried) >= len(self.clients):
break
if not self._available(idx):
continue
tried.add(idx)
client = self.clients[idx]
try:
return client.generate(
model=model,
prompt=prompt,
stream=False,
keep_alive=-1,
)
except Exception as e:
self._mark_unhealthy(idx)
last_error = e
raise last_error or RuntimeError("没有可用实例")
这份代码已经具备一个生产调用器最核心的四项能力:
- 实例轮询
- 探活
- 失败摘除
- 预热
它未必是终极版本,但对于双卡 A100 上的双实例 Ollama,本质已经够用了。
十三、这套方案最容易踩的坑
1)看似双实例,实际上业务只调了一个端口
这是最常见、也最隐蔽的问题。
服务层已经拆成两份了,但 Python 配置里只写了:
base_url = "http://127.0.0.1:11434"
那第二张卡就只是摆设。
2)有轮询,没有探活
这种情况表面上看是“双实例调度”,实际上只要有一个实例挂掉,请求就会周期性撞墙,表现出来就是:
- 一会儿成功
- 一会儿失败
- 整体时延抖动很明显
3)做了重试,却没做预热
这样程序看似健壮,实际上第一批流量仍然会很慢。
官方已经明确支持空请求预热模型,所以生产里不做预热,其实是在放弃一个非常低成本的性能收益点。(Ollama 文档)
4)只看 wall time,不看 API 指标
这样你永远只能知道“慢了”,却不知道“为什么慢”。
而 Ollama 官方已经把 load_duration、prompt_eval_duration、eval_duration 这些拆解指标直接给你了。既然接口已经返回,就应该接住并利用起来。(Ollama 文档)
十四、结尾
到这里,双卡 A100 + 双实例 Ollama 的调用层,才算真正像一套“能上线的东西”了。
前面的文章解决的是:
- 服务怎么装
- 进程怎么起
- 双实例怎么拆
- 目录权限怎么修
- 预热怎么做
而这一篇解决的是:
- Python 怎么调
- 双实例怎么分流
- 失败怎么切换
- 健康怎么判断
- 压测怎么做
- 性能怎么读
真正的生产能力,很多时候不是体现在“你会不会启动服务”,而是体现在“服务出问题时,你的调用层会不会替你兜住”。
而在 Ollama 这里,官方其实已经把这套兜底能力需要的接口都准备好了:
- 本地无认证访问
- 原生 API
- OpenAI 兼容
- 运行模型查询
- 可量化的调用指标
- 预热与常驻机制
更多推荐



所有评论(0)