Deepseek本地部署全攻略:Ollama到知识库的完整实践
·
Deepseek本地部署全攻略:Ollama到知识库的完整实践
简介
本文详解Deepseek本地化部署全流程,从Ollama框架搭建到个人知识库应用开发,提供分步技术指导与代码示例,助力开发者构建私有化AI知识管理系统。
一、完整环境部署指南
1.1 环境准备与系统要求
硬件要求
- 推荐配置:NVIDIA RTX 3060以上显卡(8GB+显存)
- 最低配置:16GB RAM + 支持AVX2的CPU
- 存储空间:至少50GB可用空间
系统要求
- Ubuntu 20.04/22.04 LTS(推荐)
- Windows 11 + WSL2
- macOS 13+(仅CPU模式)
1.2 基础环境安装
Ubuntu系统
# 1. 系统更新
sudo apt update && sudo apt upgrade -y
# 2. 安装Python和必要工具
sudo apt install -y python3.10 python3.10-venv python3.10-dev python3-pip
sudo apt install -y build-essential curl wget git vim
# 3. 安装CUDA工具包(NVIDIA显卡)
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-keyring_1.1-1_all.deb
sudo dpkg -i cuda-keyring_1.1-1_all.deb
sudo apt update
sudo apt install -y cuda-toolkit-12-4
# 4. 验证CUDA安装
nvcc --version
nvidia-smi
Windows系统(WSL2)
# 1. 启用WSL2
wsl --install -d Ubuntu-22.04
# 2. 在WSL中安装CUDA
wget https://developer.download.nvidia.com/compute/cuda/repos/wsl-ubuntu/x86_64/cuda-keyring_1.1-1_all.deb
sudo dpkg -i cuda-keyring_1.1-1_all.deb
sudo apt update
sudo apt install -y cuda-toolkit-12-4
1.3 Docker环境配置
# 安装Docker
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo usermod -aG docker $USER
# 安装NVIDIA Container Toolkit
distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -
curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list
sudo apt update
sudo apt install -y nvidia-container-toolkit
sudo systemctl restart docker
# 验证Docker GPU支持
docker run --rm --gpus all nvidia/cuda:12.4.0-base nvidia-smi
二、Ollama框架安装与配置
2.1 Ollama安装
# Linux安装
curl -fsSL https://ollama.ai/install.sh | sh
# 或使用Docker安装
docker run -d -v ollama:/root/.ollama -p 11434:11434 --name ollama --gpus all ollama/ollama
# 验证安装
ollama --version
systemctl status ollama
# 设置环境变量
echo 'export OLLAMA_HOST="0.0.0.0:11434"' >> ~/.bashrc
echo 'export OLLAMA_KEEP_ALIVE="24h"' >> ~/.bashrc
source ~/.bashrc
2.2 Deepseek模型下载与配置
创建模型配置文件
mkdir -p ~/.ollama/models/deepseek
cat > ~/.ollama/models/deepseek/Modelfile << 'EOF'
FROM deepseek-coder:6.7b
PARAMETER temperature 0.7
PARAMETER top_p 0.9
PARAMETER num_predict 4096
PARAMETER num_ctx 8192
PARAMETER stop "</s>"
PARAMETER seed 42
TEMPLATE """
{{ if .System }}<|system|>
{{ .System }}</s>{{ end }}
<|user|>
{{ .Prompt }}</s>
<|assistant|>
"""
SYSTEM """
你是一个AI助手,请用中文回答用户的问题。
请确保回答专业、准确、有用。
"""
EOF
下载和运行模型
# 下载模型(多个版本可选)
ollama pull deepseek-coder:6.7b
ollama pull deepseek-coder:6.7b-instruct-q4_K_M
ollama pull deepseek-coder:6.7b-instruct-fp16
# 创建自定义模型
ollama create deepseek-custom -f ~/.ollama/models/deepseek/Modelfile
# 运行模型
ollama run deepseek-coder:6.7b
# 后台运行API服务
ollama serve &
2.3 模型验证脚本
创建测试脚本 test_model.py:
#!/usr/bin/env python3
import requests
import json
import time
class OllamaClient:
def __init__(self, base_url="http://localhost:11434"):
self.base_url = base_url
self.headers = {"Content-Type": "application/json"}
def generate(self, prompt, model="deepseek-coder:6.7b", stream=False):
"""生成文本"""
url = f"{self.base_url}/api/generate"
data = {
"model": model,
"prompt": prompt,
"stream": stream,
"options": {
"temperature": 0.7,
"top_p": 0.9,
"num_predict": 1024
}
}
response = requests.post(url, json=data, headers=self.headers, stream=stream)
if stream:
for line in response.iter_lines():
if line:
chunk = json.loads(line.decode('utf-8'))
if "response" in chunk:
yield chunk["response"]
else:
return response.json().get("response", "")
def list_models(self):
"""列出可用模型"""
response = requests.get(f"{self.base_url}/api/tags")
return response.json()
def model_info(self, model_name):
"""获取模型信息"""
data = {"name": model_name}
response = requests.post(f"{self.base_url}/api/show", json=data)
return response.json()
def health_check(self):
"""健康检查"""
try:
response = requests.get(f"{self.base_url}/")
return response.status_code == 200
except:
return False
def test_ollama_connection():
"""测试连接"""
client = OllamaClient()
print("=== Ollama连接测试 ===")
print(f"服务健康: {client.health_check()}")
print("\n=== 可用模型 ===")
models = client.list_models()
for model in models.get("models", []):
print(f"- {model['name']} (大小: {model.get('size', '未知')})")
print("\n=== 模型测试 ===")
test_prompt = "用Python写一个快速排序算法,并添加详细注释"
print(f"问题: {test_prompt}")
start_time = time.time()
response = client.generate(test_prompt, stream=False)
elapsed_time = time.time() - start_time
print(f"\n回答 (耗时: {elapsed_time:.2f}秒):")
print("-" * 50)
print(response)
print("-" * 50)
# 流式输出测试
print("\n=== 流式输出测试 ===")
stream_response = ""
for chunk in client.generate("解释什么是RAG", stream=True):
stream_response += chunk
print(chunk, end="", flush=True)
return True
if __name__ == "__main__":
try:
test_ollama_connection()
except Exception as e:
print(f"测试失败: {e}")
print("请确保Ollama服务正在运行: ollama serve")
三、知识库应用完整开发代码
3.1 项目结构
deepseek-knowledge-base/
├── backend/
│ ├── app.py # FastAPI主应用
│ ├── requirements.txt # 后端依赖
│ ├── models/
│ │ ├── document.py # 文档模型
│ │ └── vector_db.py # 向量数据库
│ ├── services/
│ │ ├── embedding.py # 嵌入服务
│ │ └── rag.py # RAG服务
│ └── api/
│ ├── endpoints.py # API端点
│ └── schemas.py # Pydantic模型
├── frontend/
│ ├── app.py # Streamlit前端
│ └── requirements.txt # 前端依赖
├── knowledge_base/
│ └── documents/ # 存储上传的文档
├── docker-compose.yml # Docker编排
├── .env.example # 环境变量示例
└── README.md # 项目说明
3.2 后端FastAPI应用
后端依赖文件 backend/requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
python-multipart==0.0.6
pydantic==2.5.0
pydantic-settings==2.1.0
langchain==0.0.350
langchain-community==0.0.10
chromadb==0.4.22
sentence-transformers==2.2.2
pypdf==3.17.0
python-docx==1.1.0
openpyxl==3.1.2
nltk==3.8.1
pandas==2.1.3
numpy==1.24.3
redis==5.0.1
celery==5.3.4
prometheus-client==0.19.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
httpx==0.25.1
主应用文件 backend/app.py
#!/usr/bin/env python3
"""
Deepseek知识库后端服务
支持文档上传、向量化、检索和问答
"""
import os
import logging
from typing import List, Optional
from datetime import datetime
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from backend.api.endpoints import router as api_router
from backend.services.vector_db import VectorDB
from backend.services.embedding import EmbeddingService
from backend.services.rag import RAGService
from backend.models.document import Document, DocumentMetadata
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 安全验证
security = HTTPBearer()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时初始化
logger.info("初始化向量数据库...")
app.state.vector_db = VectorDB()
app.state.embedding_service = EmbeddingService()
app.state.rag_service = RAGService(
vector_db=app.state.vector_db,
embedding_service=app.state.embedding_service
)
yield
# 关闭时清理
logger.info("关闭向量数据库连接...")
if hasattr(app.state, 'vector_db'):
app.state.vector_db.close()
# 创建FastAPI应用
app = FastAPI(
title="Deepseek Knowledge Base API",
description="私有知识库检索与问答系统",
version="1.0.0",
lifespan=lifespan
)
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应限制来源
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 挂载API路由
app.include_router(api_router, prefix="/api/v1")
# 数据模型
class UploadResponse(BaseModel):
"""上传响应"""
document_id: str
filename: str
status: str
chunks: int
message: str
class QueryRequest(BaseModel):
"""查询请求"""
question: str
collection_name: Optional[str] = "default"
top_k: Optional[int] = 5
score_threshold: Optional[float] = 0.7
class QueryResponse(BaseModel):
"""查询响应"""
answer: str
sources: List[dict]
processing_time: float
model: str
class HealthResponse(BaseModel):
"""健康检查响应"""
status: str
ollama_status: str
vector_db_status: str
model_count: int
uptime: float
# 依赖项
def get_vector_db():
return app.state.vector_db
def get_rag_service():
return app.state.rag_service
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证访问令牌"""
# 生产环境应使用更复杂的验证逻辑
token = credentials.credentials
if token != os.getenv("API_TOKEN", "dev-token-2024"):
raise HTTPException(status_code=403, detail="无效的访问令牌")
return token
# API端点
@app.get("/")
async def root():
"""根端点"""
return {
"service": "Deepseek Knowledge Base",
"version": "1.0.0",
"docs": "/docs",
"health": "/health"
}
@app.get("/health", response_model=HealthResponse)
async def health_check(
vector_db: VectorDB = Depends(get_vector_db)
):
"""健康检查"""
try:
# 检查Ollama服务
import requests
ollama_resp = requests.get("http://localhost:11434/api/tags")
ollama_status = "healthy" if ollama_resp.status_code == 200 else "unhealthy"
# 检查向量数据库
vector_db_status = "healthy" if vector_db.check_health() else "unhealthy"
return HealthResponse(
status="healthy",
ollama_status=ollama_status,
vector_db_status=vector_db_status,
model_count=len(ollama_resp.json().get("models", [])),
uptime=0.0 # 实际应用中应计算运行时间
)
except Exception as e:
logger.error(f"健康检查失败: {e}")
raise HTTPException(status_code=503, detail="服务异常")
@app.post("/upload", response_model=UploadResponse)
async def upload_document(
file: UploadFile = File(...),
collection_name: str = "default",
background_tasks: BackgroundTasks = None,
token: str = Depends(verify_token)
):
"""上传文档并处理"""
try:
# 检查文件类型
allowed_types = ['.pdf', '.txt', '.md', '.docx', '.pptx', '.xlsx']
file_ext = os.path.splitext(file.filename)[1].lower()
if file_ext not in allowed_types:
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型。支持的类型: {', '.join(allowed_types)}"
)
# 保存文件
upload_dir = "knowledge_base/documents"
os.makedirs(upload_dir, exist_ok=True)
file_path = os.path.join(upload_dir, file.filename)
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
# 异步处理文档
if background_tasks:
background_tasks.add_task(
process_document_task,
file_path,
collection_name,
app.state.vector_db,
app.state.embedding_service
)
return UploadResponse(
document_id=f"doc_{int(datetime.now().timestamp())}",
filename=file.filename,
status="processing",
chunks=0,
message="文档已上传,正在后台处理"
)
else:
# 同步处理(仅用于测试)
chunks = process_document_task(
file_path, collection_name,
app.state.vector_db, app.state.embedding_service
)
return UploadResponse(
document_id=f"doc_{int(datetime.now().timestamp())}",
filename=file.filename,
status="completed",
chunks=len(chunks),
message="文档处理完成"
)
except Exception as e:
logger.error(f"上传失败: {e}")
raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}")
@app.post("/query", response_model=QueryResponse)
async def query_knowledge_base(
request: QueryRequest,
rag_service: RAGService = Depends(get_rag_service)
):
"""查询知识库"""
try:
import time
start_time = time.time()
# 执行查询
result = rag_service.query(
question=request.question,
collection_name=request.collection_name,
top_k=request.top_k,
score_threshold=request.score_threshold
)
processing_time = time.time() - start_time
return QueryResponse(
answer=result["answer"],
sources=result["sources"],
processing_time=processing_time,
model="deepseek-coder:6.7b"
)
except Exception as e:
logger.error(f"查询失败: {e}")
raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}")
# 辅助函数
def process_document_task(
file_path: str,
collection_name: str,
vector_db: VectorDB,
embedding_service: EmbeddingService
):
"""处理文档的后台任务"""
from backend.services.document_processor import DocumentProcessor
try:
processor = DocumentProcessor(
vector_db=vector_db,
embedding_service=embedding_service
)
chunks = processor.process_document(
file_path=file_path,
collection_name=collection_name
)
logger.info(f"文档处理完成: {file_path}, 生成 {len(chunks)} 个chunks")
return chunks
except Exception as e:
logger.error(f"文档处理失败: {e}")
raise
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
3.3 向量数据库服务
backend/services/vector_db.py
"""
向量数据库服务
使用ChromaDB存储和检索向量
"""
import os
import logging
from typing import List, Dict, Any, Optional
import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions
logger = logging.getLogger(__name__)
class VectorDB:
"""向量数据库管理器"""
def __init__(self, persist_directory: str = "./chroma_db"):
"""
初始化向量数据库
Args:
persist_directory: 数据持久化目录
"""
self.persist_directory = persist_directory
os.makedirs(persist_directory, exist_ok=True)
# 创建Chroma客户端
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(
anonymized_telemetry=False,
allow_reset=True
)
)
# 默认嵌入函数(使用sentence-transformers)
self.default_embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2"
)
logger.info(f"向量数据库初始化完成,存储路径: {persist_directory}")
def create_collection(
self,
collection_name: str,
embedding_function=None,
metadata: Optional[Dict] = None
):
"""
创建集合
Args:
collection_name: 集合名称
embedding_function: 嵌入函数
metadata: 集合元数据
"""
try:
embedding_fn = embedding_function or self.default_embedding_fn
collection = self.client.create_collection(
name=collection_name,
embedding_function=embedding_fn,
metadata=metadata or {}
)
logger.info(f"创建集合: {collection_name}")
return collection
except Exception as e:
logger.error(f"创建集合失败: {e}")
# 如果集合已存在,则获取现有集合
return self.get_collection(collection_name)
def get_collection(self, collection_name: str):
"""
获取集合
Args:
collection_name: 集合名称
"""
try:
return self.client.get_collection(
name=collection_name,
embedding_function=self.default_embedding_fn
)
except Exception as e:
logger.error(f"获取集合失败: {e}")
raise
def add_documents(
self,
collection_name: str,
documents: List[str],
metadatas: List[Dict[str, Any]],
ids: List[str]
):
"""
添加文档到集合
Args:
collection_name: 集合名称
documents: 文档内容列表
metadatas: 元数据列表
ids: 文档ID列表
"""
try:
collection = self.get_collection(collection_name)
# 分批添加以避免内存问题
batch_size = 100
for i in range(0, len(documents), batch_size):
batch_docs = documents[i:i+batch_size]
batch_metadatas = metadatas[i:i+batch_size]
batch_ids = ids[i:i+batch_size]
collection.add(
documents=batch_docs,
metadatas=batch_metadatas,
ids=batch_ids
)
logger.info(f"添加文档批次 {i//batch_size + 1}, 数量: {len(batch_docs)}")
return len(documents)
except Exception as e:
logger.error(f"添加文档失败: {e}")
raise
def search(
self,
collection_name: str,
query_texts: List[str],
n_results: int = 5,
where: Optional[Dict] = None,
score_threshold: float = 0.7
):
"""
搜索相似文档
Args:
collection_name: 集合名称
query_texts: 查询文本列表
n_results: 返回结果数量
where: 过滤条件
score_threshold: 相似度阈值
"""
try:
collection = self.get_collection(collection_name)
results = collection.query(
query_texts=query_texts,
n_results=n_results,
where=where
)
# 过滤结果
filtered_results = []
if results and results.get('documents'):
for i, distances in enumerate(results.get('distances', [])):
docs = results['documents'][i]
metas = results['metadatas'][i]
ids = results['ids'][i]
for j, distance in enumerate(distances):
# 转换距离为相似度分数(Chroma使用余弦距离)
similarity_score = 1 - distance
if similarity_score >= score_threshold:
filtered_results.append({
'document': docs[j],
'metadata': metas[j],
'id': ids[j],
'score': similarity_score,
'distance': distance
})
return filtered_results
except Exception as e:
logger.error(f"搜索失败: {e}")
raise
def delete_collection(self, collection_name: str):
"""删除集合"""
try:
self.client.delete_collection(collection_name)
logger.info(f"删除集合: {collection_name}")
except Exception as e:
logger.error(f"删除集合失败: {e}")
raise
def list_collections(self):
"""列出所有集合"""
try:
return self.client.list_collections()
except Exception as e:
logger.error(f"列出集合失败: {e}")
return []
def get_collection_stats(self, collection_name: str):
"""获取集合统计信息"""
try:
collection = self.get_collection(collection_name)
count = collection.count()
# 获取样本元数据
sample = collection.peek(limit=1)
return {
'name': collection_name,
'document_count': count,
'sample_metadata': sample.get('metadatas', [])[0] if sample.get('metadatas') else None
}
except Exception as e:
logger.error(f"获取集合统计失败: {e}")
return None
def check_health(self):
"""检查数据库健康状态"""
try:
# 简单的健康检查:尝试列出集合
self.list_collections()
return True
except:
return False
def close(self):
"""关闭连接"""
# Chroma持久化客户端无需显式关闭
pass
3.4 嵌入服务
backend/services/embedding.py
"""
嵌入服务
将文本转换为向量表示
"""
import logging
from typing import List, Optional
import numpy as np
from sentence_transformers import SentenceTransformer
import torch
logger = logging.getLogger(__name__)
class EmbeddingService:
"""嵌入服务管理器"""
def __init__(
self,
model_name: str = "all-MiniLM-L6-v2",
device: Optional[str] = None
):
"""
初始化嵌入服务
Args:
model_name: 嵌入模型名称
device: 计算设备(cuda/cpu)
"""
self.model_name = model_name
# 自动选择设备
if device is None:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
else:
self.device = device
logger.info(f"加载嵌入模型: {model_name},设备: {self.device}")
# 加载模型
try:
self.model = SentenceTransformer(model_name, device=self.device)
# 测试模型
test_embeddings = self.model.encode(["test"])
self.embedding_dim = len(test_embeddings[0])
logger.info(f"嵌入模型加载成功,维度: {self.embedding_dim}")
except Exception as e:
logger.error(f"加载嵌入模型失败: {e}")
# 降级到更小的模型
self.model = SentenceTransformer("all-MiniLM-L6-v2", device=self.device)
test_embeddings = self.model.encode(["test"])
self.embedding_dim = len(test_embeddings[0])
logger.info(f"使用备用模型,维度: {self.embedding_dim}")
def encode(
self,
texts: List[str],
batch_size: int = 32,
normalize_embeddings: bool = True,
show_progress_bar: bool = False
) -> List[List[float]]:
"""
将文本编码为向量
Args:
texts: 文本列表
batch_size: 批处理大小
normalize_embeddings: 是否归一化向量
show_progress_bar: 是否显示进度条
Returns:
向量列表
"""
if not texts:
return []
try:
# 编码文本
embeddings = self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=show_progress_bar,
normalize_embeddings=normalize_embeddings,
convert_to_numpy=True
)
# 转换为列表格式
if isinstance(embeddings, np.ndarray):
embeddings = embeddings.tolist()
return embeddings
except Exception as e:
logger.error(f"文本编码失败: {e}")
raise
def encode_single(self, text: str) -> List[float]:
"""编码单个文本"""
return self.encode([text])[0]
def similarity(
self,
embedding1: List[float],
embedding2: List[float]
) -> float:
"""
计算两个向量的相似度(余弦相似度)
Args:
embedding1: 向量1
embedding2: 向量2
Returns:
相似度分数(0-1)
"""
try:
# 转换为numpy数组
vec1 = np.array(embedding1)
vec2 = np.array(embedding2)
# 计算余弦相似度
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
similarity = dot_product / (norm1 * norm2)
return float(similarity)
except Exception as e:
logger.error(f"相似度计算失败: {e}")
return 0.0
def batch_similarity(
self,
query_embedding: List[float],
document_embeddings: List[List[float]]
) -> List[float]:
"""
批量计算相似度
Args:
query_embedding: 查询向量
document_embeddings: 文档向量列表
Returns:
相似度分数列表
"""
if not document_embeddings:
return []
try:
query_vec = np.array(query_embedding)
doc_matrix = np.array(document_embeddings)
# 批量计算余弦相似度
dot_products = np.dot(doc_matrix, query_vec)
query_norm = np.linalg.norm(query_vec)
doc_norms = np.linalg.norm(doc_matrix, axis=1)
# 避免除以零
nonzero_mask = (query_norm > 0) & (doc_norms > 0)
similarities = np.zeros(len(document_embeddings))
if np.any(nonzero_mask):
similarities[nonzero_mask] = dot_products[nonzero_mask] / (
query_norm * doc_norms[nonzero_mask]
)
return similarities.tolist()
except Exception as e:
logger.error(f"批量相似度计算失败: {e}")
return [0.0] * len(document_embeddings)
def get_info(self) -> dict:
"""获取模型信息"""
return {
"model_name": self.model_name,
"device": self.device,
"embedding_dimension": self.embedding_dim,
"max_seq_length": self.model.max_seq_length,
"vocab_size": getattr(self.model, "vocab_size", "unknown")
}
3.5 RAG服务
backend/services/rag.py
"""
RAG(检索增强生成)服务
结合向量检索和LLM生成
"""
import logging
import json
from typing import List, Dict, Any, Optional
import requests
from datetime import datetime
from .vector_db import VectorDB
from .embedding import EmbeddingService
logger = logging.getLogger(__name__)
class RAGService:
"""RAG服务管理器"""
def __init__(
self,
vector_db: VectorDB,
embedding_service: EmbeddingService,
ollama_url: str = "http://localhost:11434",
default_model: str = "deepseek-coder:6.7b"
):
"""
初始化RAG服务
Args:
vector_db: 向量数据库实例
embedding_service: 嵌入服务实例
ollama_url: Ollama API地址
default_model: 默认LLM模型
"""
self.vector_db = vector_db
self.embedding_service = embedding_service
self.ollama_url = ollama_url
self.default_model = default_model
logger.info(f"RAG服务初始化完成,Ollama地址: {ollama_url}")
def query(
self,
question: str,
collection_name: str = "default",
top_k: int = 5,
score_threshold: float = 0.7,
model: Optional[str] = None
) -> Dict[str, Any]:
"""
执行RAG查询
Args:
question: 用户问题
collection_name: 集合名称
top_k: 检索数量
score_threshold: 相似度阈值
model: 使用的LLM模型
Returns:
包含答案和来源的字典
"""
start_time = datetime.now()
try:
# 步骤1: 检索相关文档
retrieved_docs = self.retrieve(
question=question,
collection_name=collection_name,
top_k=top_k,
score_threshold=score_threshold
)
if not retrieved_docs:
logger.warning("未找到相关文档,将使用通用知识回答")
context = "无相关文档。请基于你的通用知识回答。"
sources = []
else:
# 构建上下文
context = self._build_context(retrieved_docs)
sources = [
{
"content": doc["document"][:200] + "...",
"score": doc["score"],
"metadata": doc["metadata"]
}
for doc in retrieved_docs
]
# 步骤2: 生成答案
answer = self.generate_answer(
question=question,
context=context,
model=model or self.default_model
)
# 步骤3: 记录查询
self._log_query(question, answer, sources)
elapsed_time = (datetime.now() - start_time).total_seconds()
return {
"answer": answer,
"sources": sources,
"retrieved_count": len(retrieved_docs),
"processing_time": elapsed_time,
"model": model or self.default_model,
"timestamp": start_time.isoformat()
}
except Exception as e:
logger.error(f"RAG查询失败: {e}")
raise
def retrieve(
self,
question: str,
collection_name: str,
top_k: int = 5,
score_threshold: float = 0.7
) -> List[Dict[str, Any]]:
"""
检索相关文档
Args:
question: 查询问题
collection_name: 集合名称
top_k: 返回结果数量
score_threshold: 相似度阈值
Returns:
检索到的文档列表
"""
try:
# 使用向量数据库搜索
results = self.vector_db.search(
collection_name=collection_name,
query_texts=[question],
n_results=top_k * 2, # 获取更多结果用于过滤
score_threshold=score_threshold
)
# 按分数排序并限制数量
sorted_results = sorted(
results,
key=lambda x: x.get('score', 0),
reverse=True
)[:top_k]
logger.info(f"检索到 {len(sorted_results)} 个相关文档")
return sorted_results
except Exception as e:
logger.error(f"文档检索失败: {e}")
return []
def generate_answer(
self,
question: str,
context: str,
model: Optional[str] = None
) -> str:
"""
生成答案
Args:
question: 用户问题
context: 检索到的上下文
model: 使用的模型
Returns:
生成的答案
"""
try:
# 构建提示词
prompt = self._build_prompt(question, context)
# 调用Ollama API
response = self._call_ollama(prompt, model or self.default_model)
return response
except Exception as e:
logger.error(f"答案生成失败: {e}")
return f"抱歉,生成答案时出现错误: {str(e)}"
def _build_context(self, documents: List[Dict[str, Any]]) -> str:
"""构建上下文文本"""
context_parts = []
for i, doc in enumerate(documents, 1):
content = doc.get('document', '')
metadata = doc.get('metadata', {})
source = metadata.get('source', '未知来源')
page = metadata.get('page', '')
context_parts.append(
f"[文档 {i}] 来源: {source} {page}\n"
f"内容: {content}\n"
f"相关度: {doc.get('score', 0):.2%}\n"
)
return "\n---\n".join(context_parts)
def _build_prompt(self, question: str, context: str) -> str:
"""构建提示词"""
return f"""你是一个专业的AI助手,请根据提供的上下文信息回答用户的问题。
上下文信息:
{context}
用户问题:{question}
请根据上下文信息回答问题,如果上下文信息不足以回答问题,请说明这一点。
回答时请:
1. 尽可能详细和准确
2. 引用相关的上下文信息
3. 使用清晰的结构(如需要)
4. 如果上下文没有相关信息,请说明
回答:"""
def _call_ollama(self, prompt: str, model: str) -> str:
"""调用Ollama API"""
try:
url = f"{self.ollama_url}/api/generate"
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.7,
"top_p": 0.9,
"num_predict": 1024
}
}
response = requests.post(url, json=payload, timeout=60)
if response.status_code == 200:
result = response.json()
return result.get("response", "").strip()
else:
error_msg = f"Ollama API错误: {response.status_code} - {response.text}"
logger.error(error_msg)
raise Exception(error_msg)
except requests.exceptions.Timeout:
error_msg = "Ollama API请求超时"
logger.error(error_msg)
raise Exception(error_msg)
except Exception as e:
logger.error(f"调用Ollama失败: {e}")
raise
def _log_query(self, question: str, answer: str, sources: List[Dict]):
"""记录查询日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"question": question,
"answer_length": len(answer),
"sources_count": len(sources),
"source_scores": [s.get("score", 0) for s in sources]
}
# 这里可以添加更复杂的日志记录逻辑
# 如保存到数据库、文件或发送到监控系统
logger.info(f"查询记录: {json.dumps(log_entry)}")
def test_connection(self) -> bool:
"""测试Ollama连接"""
try:
response = requests.get(f"{self.ollama_url}/api/tags", timeout=5)
return response.status_code == 200
except:
return False
3.6 文档处理器
backend/services/document_processor.py
"""
文档处理器
处理各种格式的文档,提取文本并分块
"""
import os
import logging
from typing import List, Dict, Any, Optional
import tempfile
# 文档加载器
import PyPDF2
from docx import Document
import pandas as pd
import markdown
from bs4 import BeautifulSoup
from .vector_db import VectorDB
from .embedding import EmbeddingService
logger = logging.getLogger(__name__)
class DocumentProcessor:
"""文档处理器"""
def __init__(
self,
vector_db: VectorDB,
embedding_service: EmbeddingService,
chunk_size: int = 1000,
chunk_overlap: int = 200
):
"""
初始化文档处理器
Args:
vector_db: 向量数据库实例
embedding_service: 嵌入服务实例
chunk_size: 块大小(字符数)
chunk_overlap: 块重叠大小
"""
self.vector_db = vector_db
self.embedding_service = embedding_service
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def process_document(
self,
file_path: str,
collection_name: str = "default",
metadata: Optional[Dict] = None
) -> List[Dict[str, Any]]:
"""
处理文档
Args:
file_path: 文档路径
collection_name: 集合名称
metadata: 额外元数据
Returns:
处理后的文本块列表
"""
try:
# 解析文件扩展名
file_ext = os.path.splitext(file_path)[1].lower()
# 读取文档内容
if file_ext == '.pdf':
text = self._read_pdf(file_path)
elif file_ext in ['.docx', '.doc']:
text = self._read_docx(file_path)
elif file_ext == '.txt':
text = self._read_txt(file_path)
elif file_ext == '.md':
text = self._read_markdown(file_path)
elif file_ext in ['.xlsx', '.xls', '.csv']:
text = self._read_excel(file_path)
else:
raise ValueError(f"不支持的文件格式: {file_ext}")
# 清理文本
text = self._clean_text(text)
# 分块
chunks = self._chunk_text(text)
# 准备元数据
base_metadata = {
"source": os.path.basename(file_path),
"file_type": file_ext[1:], # 去掉点号
"file_size": os.path.getsize(file_path),
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap
}
if metadata:
base_metadata.update(metadata)
# 创建集合(如果不存在)
self.vector_db.create_collection(collection_name)
# 准备添加到向量数据库的数据
documents = []
metadatas = []
ids = []
for i, chunk in enumerate(chunks):
chunk_id = f"{os.path.basename(file_path)}_{i}"
chunk_metadata = base_metadata.copy()
chunk_metadata.update({
"chunk_id": i,
"chunk_count": len(chunks),
"text_length": len(chunk)
})
documents.append(chunk)
metadatas.append(chunk_metadata)
ids.append(chunk_id)
# 添加到向量数据库
if documents:
self.vector_db.add_documents(
collection_name=collection_name,
documents=documents,
metadatas=metadatas,
ids=ids
)
logger.info(f"文档处理完成: {file_path} -> {len(chunks)} chunks")
return documents
except Exception as e:
logger.error(f"处理文档失败 {file_path}: {e}")
raise
def _read_pdf(self, file_path: str) -> str:
"""读取PDF文件"""
text = ""
try:
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num, page in enumerate(pdf_reader.pages):
page_text = page.extract_text()
text += f"\n--- 第 {page_num + 1} 页 ---\n{page_text}\n"
return text
except Exception as e:
logger.error(f"读取PDF失败 {file_path}: {e}")
# 尝试使用其他方法
try:
import pdfplumber
with pdfplumber.open(file_path) as pdf:
for page_num, page in enumerate(pdf.pages):
page_text = page.extract_text()
text += f"\n--- 第 {page_num + 1} 页 ---\n{page_text}\n"
return text
except:
raise Exception(f"无法读取PDF文件: {file_path}")
def _read_docx(self, file_path: str) -> str:
"""读取Word文档"""
try:
doc = Document(file_path)
text = []
for para in doc.paragraphs:
if para.text.strip():
text.append(para.text)
return "\n".join(text)
except Exception as e:
logger.error(f"读取DOCX失败 {file_path}: {e}")
raise
def _read_txt(self, file_path: str) -> str:
"""读取文本文件"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except UnicodeDecodeError:
# 尝试其他编码
with open(file_path, 'r', encoding='gbk') as file:
return file.read()
def _read_markdown(self, file_path: str) -> str:
"""读取Markdown文件"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
md_content = file.read()
# 将Markdown转换为纯文本
html = markdown.markdown(md_content)
soup = BeautifulSoup(html, 'html.parser')
return soup.get_text()
except Exception as e:
logger.error(f"读取Markdown失败 {file_path}: {e}")
return self._read_txt(file_path) # 降级为普通文本
def _read_excel(self, file_path: str) -> str:
"""读取Excel/CSV文件"""
try:
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
else:
df = pd.read_excel(file_path)
# 将DataFrame转换为文本
text_lines = []
# 添加列名
text_lines.append("列名: " + ", ".join(df.columns.tolist()))
text_lines.append("\n数据:")
# 添加数据行(限制行数避免过大)
for i, row in df.head(100).iterrows(): # 只读取前100行
row_text = ", ".join([str(val) for val in row.values])
text_lines.append(f"行 {i}: {row_text}")
return "\n".join(text_lines)
except Exception as e:
logger.error(f"读取Excel失败 {file_path}: {e}")
raise
def _clean_text(self, text: str) -> str:
"""清理文本"""
import re
# 移除多余的空格和换行
text = re.sub(r'\s+', ' ', text)
# 移除特殊字符(保留基本标点)
text = re.sub(r'[^\w\s.,!?;:()\-/\u4e00-\u9fff]', '', text)
# 标准化换行符
text = text.replace('\r\n', '\n').replace('\r', '\n')
return text.strip()
def _chunk_text(self, text: str) -> List[str]:
"""将文本分块"""
if not text:
return []
chunks = []
start = 0
text_length = len(text)
while start < text_length:
# 计算块结束位置
end = start + self.chunk_size
# 如果还有剩余文本,尝试在句子边界处分割
if end < text_length:
# 查找最近的句子结束符
sentence_enders = ['.', '!', '?', '。', '!', '?', '\n\n']
for i in range(end, max(start, end - 100), -1):
if i < text_length and text[i] in sentence_enders:
end = i + 1
break
# 提取块
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# 移动起始位置(考虑重叠)
start = end - self.chunk_overlap
return chunks
3.7 前端Streamlit应用
frontend/app.py
#!/usr/bin/env python3
"""
Deepseek知识库前端应用
基于Streamlit构建
"""
import streamlit as st
import requests
import json
import time
from typing import Optional
import os
# 页面配置
st.set_page_config(
page_title="Deepseek 知识库系统",
page_icon="📚",
layout="wide",
initial_sidebar_state="expanded"
)
# API配置
API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000")
API_TOKEN = os.getenv("API_TOKEN", "dev-token-2024")
# CSS样式
st.markdown("""
<style>
.main-header {
text-align: center;
padding: 2rem 0;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border-radius: 10px;
margin-bottom: 2rem;
}
.response-box {
background-color: #f8f9fa;
border-radius: 10px;
padding: 1.5rem;
margin: 1rem 0;
border-left: 5px solid #4e73df;
}
.source-box {
background-color: #e3f2fd;
border-radius: 8px;
padding: 1rem;
margin: 0.5rem 0;
border-left: 3px solid #2196f3;
}
.metric-box {
background-color: #fff;
border-radius: 8px;
padding: 1rem;
margin: 0.5rem;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
text-align: center;
}
.stButton button {
width: 100%;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
padding: 0.75rem 1.5rem;
border-radius: 8px;
font-weight: bold;
}
.stButton button:hover {
background: linear-gradient(135deg, #764ba2 0%, #667eea 100%);
}
</style>
""", unsafe_allow_html=True)
class KnowledgeBaseClient:
"""知识库客户端"""
def __init__(self, base_url: str, token: str):
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
def upload_document(self, file, collection_name: str = "default"):
"""上传文档"""
files = {"file": file}
data = {"collection_name": collection_name}
response = requests.post(
f"{self.base_url}/upload",
files=files,
data=data,
headers={"Authorization": f"Bearer {self.headers['Authorization'].split(' ')[1]}"}
)
return response.json()
def query(self, question: str, collection_name: str = "default", top_k: int = 5):
"""查询知识库"""
data = {
"question": question,
"collection_name": collection_name,
"top_k": top_k
}
response = requests.post(
f"{self.base_url}/query",
json=data,
headers=self.headers
)
return response.json()
def health_check(self):
"""健康检查"""
try:
response = requests.get(f"{self.base_url}/health")
return response.json()
except:
return None
def init_session_state():
"""初始化会话状态"""
if "client" not in st.session_state:
st.session_state.client = KnowledgeBaseClient(API_BASE_URL, API_TOKEN)
if "messages" not in st.session_state:
st.session_state.messages = []
if "upload_history" not in st.session_state:
st.session_state.upload_history = []
if "current_collection" not in st.session_state:
st.session_state.current_collection = "default"
def display_header():
"""显示页头"""
st.markdown("""
<div class="main-header">
<h1>🤖 Deepseek 知识库系统</h1>
<p>私有化AI知识管理与智能问答平台</p>
</div>
""", unsafe_allow_html=True)
def sidebar_controls():
"""侧边栏控件"""
with st.sidebar:
st.header("⚙️ 系统控制")
# 健康状态
health = st.session_state.client.health_check()
if health:
status_color = "🟢" if health.get("status") == "healthy" else "🔴"
st.metric("系统状态", f"{status_color} {health.get('status', 'unknown').upper()}")
col1, col2 = st.columns(2)
with col1:
st.metric("Ollama", health.get("ollama_status", "unknown"))
with col2:
st.metric("向量DB", health.get("vector_db_status", "unknown"))
else:
st.error("无法连接到后端服务")
# 集合管理
st.subheader("📂 知识库管理")
collection_name = st.text_input(
"当前集合",
value=st.session_state.current_collection,
help="选择或输入集合名称"
)
st.session_state.current_collection = collection_name
# 查询参数
st.subheader("🔍 查询设置")
top_k = st.slider("返回结果数量", 1, 10, 5)
score_threshold = st.slider("相似度阈值", 0.0, 1.0, 0.7, 0.05)
# 模型选择
st.subheader("🤖 模型设置")
model_option = st.selectbox(
"选择模型",
["deepseek-coder:6.7b", "deepseek-coder:6.7b-instruct", "deepseek-coder:6.7b-fp16"],
index=0
)
# 清除对话
if st.button("🗑️ 清除对话历史"):
st.session_state.messages = []
st.rerun()
# 系统信息
st.divider()
st.caption(f"API: {API_BASE_URL}")
st.caption("版本: 1.0.0")
def document_upload_section():
"""文档上传部分"""
st.header("📤 文档上传")
col1, col2 = st.columns([2, 1])
with col1:
uploaded_file = st.file_uploader(
"选择文档文件",
type=['pdf', 'txt', 'md', 'docx', 'pptx', 'xlsx'],
help="支持PDF、TXT、MD、DOCX、PPTX、XLSX格式"
)
with col2:
collection = st.text_input(
"目标集合",
value=st.session_state.current_collection
)
if uploaded_file and st.button("🚀 上传并处理"):
with st.spinner("正在上传和处理文档..."):
try:
result = st.session_state.client.upload_document(
uploaded_file,
collection_name=collection
)
if result.get("status") == "processing":
st.success("✅ 文档已上传,正在后台处理")
st.session_state.upload_history.append({
"filename": uploaded_file.name,
"collection": collection,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "processing"
})
# 显示处理进度
progress_bar = st.progress(0)
for i in range(100):
time.sleep(0.05)
progress_bar.progress(i + 1)
st.info("💡 文档处理完成后,即可在问答中使用")
else:
st.success(f"✅ 文档处理完成,生成 {result.get('chunks', 0)} 个文本块")
except Exception as e:
st.error(f"❌ 上传失败: {str(e)}")
def chat_interface():
"""聊天界面"""
st.header("💬 智能问答")
# 显示对话历史
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if message.get("sources"):
with st.expander("📚 查看来源"):
for source in message["sources"]:
st.markdown(f"""
<div class="source-box">
<strong>相关度: {source.get('score', 0):.2%}</strong><br>
<small>{source.get('content', '')[:200]}...</small>
</div>
""", unsafe_allow_html=True)
# 用户输入
if prompt := st.chat_input("请输入您的问题..."):
# 添加用户消息
st.session_state.messages.append({
"role": "user",
"content": prompt,
"timestamp": time.time()
})
# 显示用户消息
with st.chat_message("user"):
st.markdown(prompt)
# 生成回答
with st.chat_message("assistant"):
with st.spinner("正在思考..."):
try:
response = st.session_state.client.query(
question=prompt,
collection_name=st.session_state.current_collection
)
# 显示回答
st.markdown(response.get("answer", "抱歉,未能生成回答"))
# 显示指标
col1, col2, col3 = st.columns(3)
with col1:
st.metric("处理时间", f"{response.get('processing_time', 0):.2f}秒")
with col2:
st.metric("来源数量", response.get("retrieved_count", 0))
with col3:
st.metric("使用模型", response.get("model", "unknown"))
# 显示来源
sources = response.get("sources", [])
if sources:
with st.expander(f"📚 参考来源 ({len(sources)}个)"):
for i, source in enumerate(sources, 1):
score = source.get("score", 0)
score_color = "🟢" if score > 0.8 else "🟡" if score > 0.6 else "🔴"
st.markdown(f"""
<div class="source-box">
<strong>来源 {i} {score_color} 相关度: {score:.2%}</strong><br>
<small>文件: {source.get('metadata', {}).get('source', '未知')}</small><br>
<small>{source.get('content', '')}</small>
</div>
""", unsafe_allow_html=True)
# 添加助手消息到历史
st.session_state.messages.append({
"role": "assistant",
"content": response.get("answer", ""),
"sources": sources,
"processing_time": response.get("processing_time", 0),
"timestamp": time.time()
})
except Exception as e:
st.error(f"❌ 查询失败: {str(e)}")
def analytics_dashboard():
"""分析面板"""
st.header("📊 系统分析")
if st.session_state.messages:
# 计算统计数据
total_queries = len([m for m in st.session_state.messages if m["role"] == "user"])
avg_response_time = sum([
m.get("processing_time", 0) for m in st.session_state.messages
if m["role"] == "assistant"
]) / max(1, len([m for m in st.session_state.messages if m["role"] == "assistant"]))
col1, col2, col3 = st.columns(3)
with col1:
st.metric("总查询数", total_queries)
with col2:
st.metric("平均响应时间", f"{avg_response_time:.2f}秒")
with col3:
st.metric("对话轮数", len(st.session_state.messages))
# 查询历史
st.subheader("📝 最近查询")
for msg in st.session_state.messages[-5:]:
if msg["role"] == "user":
st.text(f"❓ {msg['content'][:100]}...")
elif msg["role"] == "assistant":
st.text(f"🤖 {msg['content'][:100]}...")
st.caption(f"处理时间: {msg.get('processing_time', 0):.2f}秒")
st.divider()
# 上传历史
if st.session_state.upload_history:
st.subheader("📤 上传历史")
for upload in st.session_state.upload_history[-3:]:
status_icon = "⏳" if upload.get("status") == "processing" else "✅"
st.text(f"{status_icon} {upload.get('filename')} -> {upload.get('collection')}")
st.caption(f"时间: {upload.get('timestamp')}")
def main():
"""主函数"""
init_session_state()
display_header()
# 创建标签页
tab1, tab2, tab3 = st.tabs(["💬 智能问答", "📤 文档上传", "📊 系统分析"])
with tab1:
chat_interface()
with tab2:
document_upload_section()
with tab3:
analytics_dashboard()
# 侧边栏
sidebar_controls()
if __name__ == "__main__":
# 检查后端连接
try:
response = requests.get(f"{API_BASE_URL}/health", timeout=5)
if response.status_code != 200:
st.warning("⚠️ 后端服务可能未启动,请确保已运行后端服务")
except:
st.error("❌ 无法连接到后端服务,请检查:")
st.code("cd backend && python app.py", language="bash")
main()
3.8 部署和运行脚本
docker-compose.yml
version: '3.8'
services:
# Ollama服务
ollama:
image: ollama/ollama:latest
container_name: deepseek-ollama
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
command: serve
restart: unless-stopped
# 向量数据库(Chroma)
chromadb:
image: chromadb/chroma:latest
container_name: deepseek-chromadb
ports:
- "8001:8000"
volumes:
- chroma_data:/chroma/chroma
environment:
- IS_PERSISTENT=TRUE
- PERSIST_DIRECTORY=/chroma/chroma
- ANONYMIZED_TELEMETRY=FALSE
restart: unless-stopped
# 后端API服务
backend:
build:
context: ./backend
dockerfile: Dockerfile
container_name: deepseek-backend
ports:
- "8000:8000"
volumes:
- ./knowledge_base:/app/knowledge_base
- ./backend:/app
environment:
- OLLAMA_HOST=http://ollama:11434
- CHROMA_HOST=http://chromadb:8000
- API_TOKEN=${API_TOKEN:-dev-token-2024}
- PYTHONPATH=/app
depends_on:
- ollama
- chromadb
restart: unless-stopped
# 前端Streamlit应用
frontend:
build:
context: ./frontend
dockerfile: Dockerfile
container_name: deepseek-frontend
ports:
- "8501:8501"
volumes:
- ./frontend:/app
environment:
- API_BASE_URL=http://backend:8000
- API_TOKEN=${API_TOKEN:-dev-token-2024}
depends_on:
- backend
restart: unless-stopped
# 监控面板(可选)
monitoring:
image: grafana/grafana:latest
container_name: deepseek-monitoring
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana/provisioning:/etc/grafana/provisioning
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin}
restart: unless-stopped
volumes:
ollama_data:
chroma_data:
grafana_data:
后端Dockerfile backend/Dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
curl \
wget \
git \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "app.py"]
前端Dockerfile frontend/Dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
curl \
wget \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE 8501
# 启动命令
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8501/_stcore/health || exit 1
CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"]
启动脚本 start.sh
#!/bin/bash
# Deepseek知识库系统启动脚本
set -e
echo "🚀 启动 Deepseek 知识库系统..."
# 检查Docker是否安装
if ! command -v docker &> /dev/null; then
echo "❌ Docker未安装,请先安装Docker"
exit 1
fi
# 检查Docker Compose是否安装
if ! command -v docker-compose &> /dev/null; then
echo "❌ Docker Compose未安装,请先安装Docker Compose"
exit 1
fi
# 创建必要的目录
mkdir -p knowledge_base/documents
mkdir -p monitoring/grafana/provisioning
# 创建环境文件
if [ ! -f .env ]; then
echo "📝 创建环境配置文件..."
cat > .env << EOF
# Deepseek知识库系统环境配置
API_TOKEN=your-secure-token-$(date +%s)
GRAFANA_PASSWORD=admin123
# Ollama配置
OLLAMA_MODEL=deepseek-coder:6.7b
OLLAMA_NUM_GPU=1
# 后端配置
EMBEDDING_MODEL=all-MiniLM-L6-v2
CHUNK_SIZE=1000
CHUNK_OVERLAP=200
EOF
echo "✅ 环境文件已创建,请根据需要修改 .env"
fi
# 启动服务
echo "🔧 启动Docker服务..."
docker-compose up -d
# 等待服务启动
echo "⏳ 等待服务启动..."
sleep 10
# 检查服务状态
echo "📊 服务状态检查:"
for service in ollama chromadb backend frontend; do
if docker ps | grep -q $service; then
echo "✅ $service 运行正常"
else
echo "❌ $service 启动失败"
fi
done
# 显示访问信息
echo ""
echo "=========================================="
echo "🌐 系统访问信息:"
echo "🔗 前端界面: http://localhost:8501"
echo "🔗 后端API: http://localhost:8000"
echo "🔗 API文档: http://localhost:8000/docs"
echo "🔗 Ollama API: http://localhost:11434"
echo "🔗 监控面板: http://localhost:3000 (admin/admin123)"
echo ""
echo "🔑 API令牌: 查看 .env 文件中的 API_TOKEN"
echo "=========================================="
echo ""
echo "📋 常用命令:"
echo " • 查看日志: docker-compose logs -f"
echo " • 停止服务: docker-compose down"
echo " • 重启服务: docker-compose restart"
echo " • 更新镜像: docker-compose pull && docker-compose up -d"
echo ""
echo "🎉 系统启动完成!请访问上述链接开始使用。"
停止脚本 stop.sh
#!/bin/bash
echo "🛑 停止 Deepseek 知识库系统..."
docker-compose down
echo "✅ 系统已停止"
更新脚本 update.sh
#!/bin/bash
echo "🔄 更新 Deepseek 知识库系统..."
# 拉取最新镜像
docker-compose pull
# 停止并重新启动服务
docker-compose down
docker-compose up -d
echo "✅ 系统更新完成"
四、完整的使用指南
4.1 快速开始
- 克隆项目
git clone https://github.com/yourusername/deepseek-knowledge-base.git
cd deepseek-knowledge-base
- 安装依赖
# 安装Python依赖
pip install -r backend/requirements.txt
pip install -r frontend/requirements.txt
# 安装Ollama
curl -fsSL https://ollama.ai/install.sh | sh
# 下载Deepseek模型
ollama pull deepseek-coder:6.7b
- 启动服务
# 方法1: 使用Docker Compose
chmod +x start.sh
./start.sh
# 方法2: 手动启动
# 终端1: 启动Ollama
ollama serve
# 终端2: 启动后端
cd backend && python app.py
# 终端3: 启动前端
cd frontend && streamlit run app.py
- 访问系统
- 前端界面: http://localhost:8501
- API文档: http://localhost:8000/docs
4.2 配置说明
环境变量配置
# .env 文件示例
API_TOKEN=your-secret-token-here
OLLAMA_HOST=http://localhost:11434
CHROMA_HOST=http://localhost:8000
EMBEDDING_MODEL=all-MiniLM-L6-v2
CHUNK_SIZE=1000
CHUNK_OVERLAP=200
模型配置
# 可选的Deepseek模型
ollama pull deepseek-coder:6.7b # 7B参数基础版
ollama pull deepseek-coder:6.7b-instruct # 指令调优版
ollama pull deepseek-coder:6.7b-q4_K_M # 4位量化版(显存占用小)
ollama pull deepseek-coder:6.7b-fp16 # 16位浮点版(精度高)
4.3 API使用示例
import requests
import json
# 配置
API_URL = "http://localhost:8000"
API_TOKEN = "your-token"
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
# 1. 健康检查
response = requests.get(f"{API_URL}/health")
print("健康状态:", response.json())
# 2. 上传文档
files = {"file": open("document.pdf", "rb")}
data = {"collection_name": "research"}
upload_response = requests.post(
f"{API_URL}/upload",
files=files,
data=data,
headers={"Authorization": f"Bearer {API_TOKEN}"}
)
print("上传结果:", upload_response.json())
# 3. 查询知识库
query_data = {
"question": "什么是机器学习?",
"collection_name": "research",
"top_k": 5
}
query_response = requests.post(
f"{API_URL}/query",
json=query_data,
headers=headers
)
result = query_response.json()
print("答案:", result["answer"])
print("来源:", len(result["sources"]), "个文档")
4.4 故障排除
常见问题
- Ollama服务无法启动
# 检查端口占用
netstat -tlnp | grep 11434
# 重启Ollama
pkill ollama
ollama serve
# 检查日志
journalctl -u ollama -f
- GPU内存不足
# 使用量化模型
ollama pull deepseek-coder:6.7b-q4_K_M
# 限制显存使用
export CUDA_VISIBLE_DEVICES=0
export OLLAMA_LOAD_IN_8BIT=true
- 向量数据库连接失败
# 重置ChromaDB
rm -rf chroma_db
# 重新启动服务
4.5 性能优化
硬件优化
# 启用GPU加速
export CUDA_VISIBLE_DEVICES=0
export TF_FORCE_GPU_ALLOW_GROWTH=true
# 设置合理的批处理大小
export OLLAMA_BATCH_SIZE=8
export EMBEDDING_BATCH_SIZE=32
软件优化
# 调整RAG参数
config = {
"chunk_size": 800, # 较小的块适用于精准检索
"chunk_overlap": 150, # 适当重叠保持上下文
"top_k": 3, # 减少检索数量提高速度
"score_threshold": 0.6, # 降低阈值获取更多结果
}
# 启用缓存
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_embedding(text: str):
return embedding_service.encode_single(text)
五、扩展功能
5.1 多语言支持
# 添加多语言嵌入模型
class MultiLanguageEmbeddingService:
def __init__(self):
self.models = {
"en": SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2"),
"zh": SentenceTransformer("BAAI/bge-small-zh-v1.5"),
"ja": SentenceTransformer("sonoisa/sentence-bert-base-ja-mean-tokens-v2")
}
def detect_language(self, text: str) -> str:
# 使用langdetect库
from langdetect import detect
try:
return detect(text)
except:
return "en"
def encode(self, texts: List[str]) -> List[List[float]]:
# 根据语言选择模型
results = []
for text in texts:
lang = self.detect_language(text)
model = self.models.get(lang, self.models["en"])
embedding = model.encode([text])[0]
results.append(embedding.tolist())
return results
5.2 实时监控
# 集成Prometheus监控
from prometheus_client import start_http_server, Counter, Histogram
# 定义指标
QUERY_COUNT = Counter('rag_query_total', 'Total RAG queries')
QUERY_DURATION = Histogram('rag_query_duration_seconds', 'RAG query duration')
EMBEDDING_TIME = Histogram('embedding_duration_seconds', 'Embedding generation time')
class MonitoredRAGService(RAGService):
@QUERY_DURATION.time()
def query(self, *args, **kwargs):
QUERY_COUNT.inc()
return super().query(*args, **kwargs)
# 启动监控服务器
start_http_server(9090)
5.3 权限管理
# 基于角色的访问控制
from enum import Enum
from fastapi import Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
class Role(Enum):
ADMIN = "admin"
EDITOR = "editor"
VIEWER = "viewer"
def check_permission(
credentials: HTTPAuthorizationCredentials = Security(HTTPBearer()),
required_role: Role = Role.VIEWER
):
# 验证令牌并检查角色
token = credentials.credentials
# 从数据库或JWT中获取用户角色
user_role = get_user_role(token)
if user_role.value < required_role.value:
raise HTTPException(
status_code=403,
detail=f"需要 {required_role.value} 权限"
)
return token
六、生产部署建议
6.1 安全配置
# 1. 使用HTTPS
# 配置Nginx反向代理
sudo apt install nginx
sudo certbot --nginx -d yourdomain.com
# 2. 设置防火墙
sudo ufw allow 443/tcp
sudo ufw allow 80/tcp
sudo ufw enable
# 3. 定期备份
# 备份脚本
#!/bin/bash
BACKUP_DIR="/backup/deepseek"
DATE=$(date +%Y%m%d_%H%M%S)
# 备份向量数据库
tar -czf $BACKUP_DIR/chroma_$DATE.tar.gz chroma_db/
# 备份文档
tar -czf $BACKUP_DIR/documents_$DATE.tar.gz knowledge_base/documents/
# 备份配置
cp .env $BACKUP_DIR/env_$DATE.bak
# 保留最近7天备份
find $BACKUP_DIR -type f -mtime +7 -delete
6.2 性能监控
# monitoring/prometheus/prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'deepseek-backend'
static_configs:
- targets: ['backend:8000']
- job_name: 'ollama'
static_configs:
- targets: ['ollama:11434']
6.3 高可用部署
# docker-compose-ha.yml
version: '3.8'
services:
ollama:
image: ollama/ollama:latest
deploy:
mode: replicated
replicas: 2
resources:
limits:
memory: 8G
placement:
constraints:
- node.labels.gpu==true
volumes:
- ollama_data:/root/.ollama
backend:
image: your-registry/deepseek-backend:latest
deploy:
mode: replicated
replicas: 3
resources:
limits:
memory: 2G
environment:
- OLLAMA_HOST=http://ollama:11434
chromadb:
image: chromadb/chroma:latest
deploy:
mode: replicated
replicas: 2
volumes:
- chroma_data:/chroma/chroma
总结
本指南提供了完整的Deepseek本地部署方案,从环境搭建到应用开发的全流程。通过Ollama框架部署Deepseek模型,结合向量数据库构建私有知识库,实现了基于RAG的智能问答系统。
主要特点:
- 完整的部署方案:支持Docker和本地部署
- 模块化设计:各组件解耦,易于扩展
- 高性能检索:基于ChromaDB的向量检索
- 现代化前端:Streamlit构建的交互界面
- 生产就绪:包含监控、安全、备份等生产功能
- 易于扩展:支持多语言、多模型、多数据源
后续改进方向:
- 添加更多文档格式支持(PPT、HTML等)
- 实现增量学习和模型微调
- 添加多用户协作功能
- 集成更多LLM模型(Llama、Qwen等)
- 开发移动端应用
这个系统可以广泛应用于企业知识管理、学术研究、个人学习笔记等场景,为用户提供安全、高效、智能的知识检索和问答服务。
更多推荐


所有评论(0)