上一章我们把各种文档洗干净切好了,存成了一个标准的 JSON 文件,但 JSON 文件还是文本,计算机看不懂没法直接拿来回答问题,这一章的任务就是:把文本变成向量,存进数据库,然后提供检索功能,我将这些功能写进了vector_store.py里

简单说,这个文件负责三件事:

读取上一章生成的json文件processing_results.json

把里面的文本切分成小段,用BGE模型转成向量,存到Chroma里

通过search_documents、search_with_score这些函数增强检索,让后面的问答模块能根据问题找到最相关的文档片段

4.1 加载预处理数据

第一步当然是读数据,通过load_processed_json函数检查文件是否存在,然后加载 JSON

def load_processed_json() -> List[Dict[str, Any]]:
    if not PROCESSED_JSON_PATH.exists():
        raise FileNotFoundError(f"处理后的数据文件未找到:{PROCESSED_JSON_PATH},请先运行data_preprocess.py")
    with open(PROCESSED_JSON_PATH, 'r', encoding='utf-8') as f:
        data = json.load(f)
    print(f"已加载{len(data)}个文档的JSON数据")
    return data

4.2 文档对象转换

LangChain有一套自己的文档类Document,后面很多操作都要用它,所以我把JSON里的每个文档转成Document对象

def json_to_documents(json_data: List[Dict[str, Any]]) -> List[Document]:
    documents = []
    for item in json_data:
        file_name = item['file_name']
        data = item['data']
        raw_text = data['raw_text']
        metadata = data.get('metadata', {})

        # 生成文档ID
        text_hash = hashlib.md5(raw_text.encode()).hexdigest()[:8]
        doc_id = f"{file_name}_{text_hash}"

        doc = Document(
            page_content=raw_text,
            metadata={
                'source': file_name,
                'doc_id': doc_id,
                'file_type': metadata.get('type', 'unknown'),
                'file_size': metadata.get('file_size', 0),
                **metadata
            }
        )
        documents.append(doc)

        # 如果原文档里有表格,把每个表格也单独作为一个文档
        if data.get('tables'):
            for table_idx, table in enumerate(data['tables']):
                table_text = json.dumps(table, ensure_ascii=False, indent=2)
                table_doc = Document(
                    page_content=f"表格数据:\n{table_text}",
                    metadata={
                        'source': file_name,
                        'doc_id': f"{doc_id}_table_{table_idx}",
                        'file_type': metadata.get('type', 'unknown'),
                        'content_type': 'table',
                        'table_index': table_idx,
                        **metadata
                    }
                )
                documents.append(table_doc)
    return documents

如果没给文档生成ID,重复运行程序时,向量库里会有大量重复文档,所以加了doc_id,基于文件名和内容哈希生成,保证同样的内容只会存一次(不过这里只是生成 ID,并没有做去重,实际上去重可以在存储时利用 ID 实现,但 Chroma 的 from_documents 会直接添加,不会去重。如果想真正去重,需要额外逻辑,这里先不管)。
另外,我把表格单独抽出来作为一个文档,因为很多时候用户想查的是表格里的数据,如果只存正文可能查不到,单独存一份检索效果更好。

4.3 文档拆分

原始文档可能很长,直接整个存成向量,检索时容易把不相关的内容也带进来,而且大模型的上下文窗口有限,太长也塞不下,所以需要把文档切成小段

def split_documents(documents: List[Document]) -> List[Document]:
    text_splitter=RecursiveCharacterTextSplitter(
        chunk_size=500,      # 每个块500字符
        chunk_overlap=50,    # 块间重叠50字符
        separators=["\n\n", "\n", "。", ";", ",", " ", ""],
        length_function=len,
    )
    splits=text_splitter.split_documents(documents)
    print(f"文档已拆分为{len(splits)}个片段")
    return splits

RecursiveCharacterTextSplitter是LangChain提供的一个智能分块器,它会优先按段落 (\n\n) 切,不行再按句子 (\n、。) 切,最后按词切,这样能最大程度保证语义完整。
重叠 50 字符是为了防止正好把一句话从中间切断了,丢失上下文,比如前一段末尾是张三的月薪是,后一段开头是15000元,如果不重叠这两部分就分开了,检索时可能找不到完整的答案。

4.4 向量存储管理

这是整个文件的核心函数:get_vector_store,它负责获取一个Chroma向量库实例,并且有缓存、有加载、有重建的逻辑

_vector_store: Optional[Chroma] = None

def get_vector_store(force_recreate: bool = False) -> Chroma:
    global _vector_store
    if _vector_store is not None and not force_recreate:
        return _vector_store

    print("初始化向量存储...")
    CHROMA_PERSIST_DIR.mkdir(parents=True, exist_ok=True)

    embedding_model = get_embeddings()  
    should_recreate = force_recreate

    # 尝试加载现有的向量库
    if not should_recreate and CHROMA_PERSIST_DIR.exists() and any(CHROMA_PERSIST_DIR.iterdir()):
        try:
            _vector_store = Chroma(
                persist_directory=str(CHROMA_PERSIST_DIR),
                embedding_function=embedding_model,
                collection_name=CHROMA_COLLECTION_NAME,
            )
            if _vector_store._collection.count() > 0:
                print(f"从持久化目录加载向量存储)
                return _vector_store
            else:
                print("加载的向量存储为空,将重新创建")
                should_recreate = True
        except Exception as e:
            print(f"加载失败:{e},将重新创建")
            should_recreate = True

    # 创建新的向量库
    print("正在创建新的向量存储...")
    json_data = load_processed_json()
    documents = json_to_documents(json_data)
    splits = split_documents(documents)

    _vector_store = Chroma.from_documents(
        documents=splits,
        embedding=embedding_model,
        persist_directory=str(CHROMA_PERSIST_DIR),
        collection_name=CHROMA_COLLECTION_NAME,
    )
    return _vector_store

全局变量 _vector_store:单例模式,避免重复初始化。第一次调用后会缓存,后面直接返回
持久化目录:CHROMA_PERSIST_DIR是向量库存盘的地方。第一次运行时会创建,后面直接从磁盘加载,省去重新计算向量的时间
优先加载现有库:如果目录里有数据,就尝试加载。加载后检查是否为空,防止坏数据
强制重建:force_recreate=True可以强制重新生成向量库,比如文档更新了,需要重建

4.5 检索功能

有了向量库,接下来就是检索,我设计了两个检索函数:
search_documents:只返回文档列表,不带分数

def search_documents(query: str, top_k: int = 5, filter_dict: Optional[Dict] = None) -> List[Document]:
    vector_store = get_vector_store()
    if filter_dict:
        docs = vector_store.similarity_search(query, k=top_k, filter=filter_dict)
    else:
        docs = vector_store.similarity_search(query, k=top_k)
    print(f"检索到{len(docs)}条相关文档")
    return docs

search_with_score:返回含文档和相似度分数的列表,分数是L2距离,越小越相似,有时候需要知道相关程度,比如排序或阈值过滤

def search_with_score(query: str, top_k: int = 5, filter_dict: Optional[Dict] = None) -> List[tuple]:
    vector_store = get_vector_store()
    if filter_dict:
        docs_with_scores = vector_store.similarity_search_with_score(query, k=top_k, filter=filter_dict)
    else:
        docs_with_scores = vector_store.similarity_search_with_score(query, k=top_k)
    return docs_with_scores

两个函数都支持filter_dict过滤,比如只查PDF文件:{"file_type": "pdf"},比如我只想查法律条文,可以指定file_type 为txt文件

4.6 格式化与统计

format_docs:把检索到的文档格式化成易读的字符串,带上来源信息,方便后面展示给用户

def format_docs(docs: List[Document], include_metadata: bool = True) -> str:
    formatted = []
    for i, doc in enumerate(docs):
        content = doc.page_content.strip()
        if include_metadata:
            source = doc.metadata.get('source','未知来源')
            file_type = doc.metadata.get('file_type','未知类型')
            header = f"[{i+1}]来源: {source} ({file_type})"
            formatted.append(f"{header}\n{content}")
        else:
            formatted.append(f"{i+1}.{content}")
    return "\n\n"+"\n\n".join(formatted) +"\n"

get_collection_stats:获取向量库的统计信息,比如总文档数、各类型文件分布,可以用来监控或调试

def get_collection_stats() -> Dict[str, Any]:
    try:
        vector_store = get_vector_store()
        count = vector_store._collection.count()
        # 收集文件类型分布
        file_types = {}
        all_docs = vector_store._collection.get(include=['metadatas'])
        if all_docs and all_docs['metadatas']:
            for meta in all_docs['metadatas']:
                if meta and 'file_type' in meta:
                    ft = meta['file_type']
                    file_types[ft] = file_types.get(ft, 0) + 1
        return {
            'total_documents':count,
            'collection_name':CHROMA_COLLECTION_NAME,
            'persist_directory':str(CHROMA_PERSIST_DIR),
            'file_types':file_types
        }
    except Exception as e:
        print(f"获取统计信息失败:{e}")
        return {'error':str(e)}

4.7 重建向量库

当数据更新后,需要重建向量库,rebuild_vector_store 函数强制重新创建,并且输出统计信息

def rebuild_vector_store():
    try:
        print("开始强制重建向量数据库...")
        vector_store = get_vector_store(force_recreate=True)
        stats = get_collection_stats()
        print(f"向量数据库重建完成: {stats}")
        return True
    except Exception as e:
        print(f"重建失败: {e}")
        return False

这样,每次数据预处理跑完后,可以调用这个函数重建,保证向量库最新

4.8 自测代码

if __name__ == "__main__":
    print("=" * 60)
    print("向量数据库模块测试")
    print("=" * 60)

    # 测试初始化
    start = time.time()
    vector_store = get_vector_store()
    stats = get_collection_stats()
    print(f"初始化成功,耗时: {time.time()-start:.2f}秒")
    print(f"统计信息: {stats}")

    # 测试几个查询
    test_queries = ["周一科技公司的注册资本是多少?", "周三酒店的客房出租率是多少?"]
    for query in test_queries:
        docs_with_scores = search_with_score(query, top_k=2)
        if docs_with_scores:
            for doc, score in docs_with_scores:
                source = doc.metadata.get('source', '未知')
                print(f"[{source}] 相似度: {1-score:.4f}")
        else:
            print("未找到")

    # 测试过滤查询
    docs = search_documents("酒店数据", filter_dict={"file_type": "pdf"})
    print(f"过滤查询到{len(docs)}个PDF文档")

运行结果:

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐