基于GPT搭建私有知识库聊天机器人(三)向量数据训练
阅读原文时间:2023年07月16日阅读:1

在前面的文章中,我们介绍了实现原理和基本环境安装。本文将重点介绍数据训练的流程,以及如何加载、切割、训练数据,并使用向量数据库Milvus进行数据存储。

在本文中,我们使用了Milvus作为向量数据库。读者可以参考之前的文章《基于GPT搭建私有知识库聊天机器人(二)环境安装》来准备其他基础环境。

数据训练的流程包括准备PDF文档、上传至系统文件目录、开始训练、加载文件内容、内容切割和存储至向量数据库。下面是整个流程的流程图:

3.1 上传文件至系统文件目录

@app.route('/upload', methods=['GET', 'POST'])
def index():
    if request.method == 'POST':
        # 获取文本内容
        text = request.form.get('name')
        # 获取文件内容
        file = request.files.get('file')
        if file:
            # 保存文件到服务器
            filename = file.filename
            file.save(os.path.join(KNOWLEDGE_FOLDER, text, filename))
            file_path = os.path.join(KNOWLEDGE_FOLDER, text, filename)
        else:
            file_path = None

        return jsonify({'message': '上传成功', 'fileServicePath': file_path})

    return render_template('index.html')

3.2 加载文件内容

# 映射文件加载
LOADER_MAPPING = {
    ".csv": (CSVLoader, {}),
    ".docx": (Docx2txtLoader, {}),
    ".doc": (UnstructuredWordDocumentLoader, {}),
    ".docx": (UnstructuredWordDocumentLoader, {}),
    ".enex": (EverNoteLoader, {}),
    ".eml": (MyElmLoader, {}),
    ".epub": (UnstructuredEPubLoader, {}),
    ".html": (UnstructuredHTMLLoader, {}),
    ".md": (UnstructuredMarkdownLoader, {}),
    ".odt": (UnstructuredODTLoader, {}),
    ".pdf": (PDFMinerLoader, {}),
    ".ppt": (UnstructuredPowerPointLoader, {}),
    ".pptx": (UnstructuredPowerPointLoader, {}),
    ".txt": (TextLoader, {"encoding": "utf8"}),
}

def load_single_document(file_path: str) -> List[Document]:
    ext = "." + file_path.rsplit(".", 1)[-1]
    if ext in LOADER_MAPPING:
        loader_class, loader_args = LOADER_MAPPING[ext]
        loader = loader_class(file_path, **loader_args)
        return loader.load()

    raise ValueError(f"文件不存在 '{ext}'")

# 加载文件
def load_documents_knowledge(source_dir: str, secondary_directories: str) -> List[Document]:
    """
    Loads all documents from the source documents directory, ignoring specified files
    """
    all_files = []
    for ext in LOADER_MAPPING:
        all_files.extend(

            glob.glob(os.path.join(source_dir, secondary_directories, f"**/*{ext}"), recursive=True)
        )
    filtered_files = [file_path for file_path in all_files if file_path]

    with Pool(processes=os.cpu_count()) as pool:
        results = []
        with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
            for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)):
                results.extend(docs)
                pbar.update()

    return results

3.3 内容切割

text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
texts = text_splitter.split_documents(documents)

3.4 存储至向量数据库

Milvus.from_documents(
        texts,
        collection_name=collection_name,
        embedding=embeddings,
        connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT}
    )

3.5 全部代码

#!/usr/bin/env python3
import glob
import os
import shutil
from multiprocessing import Pool
from typing import List

from dotenv import load_dotenv
from langchain.docstore.document import Document
from langchain.document_loaders import (
    CSVLoader,
    EverNoteLoader,
    PDFMinerLoader,
    TextLoader,
    UnstructuredEmailLoader,
    UnstructuredEPubLoader,
    UnstructuredHTMLLoader,
    UnstructuredMarkdownLoader,
    UnstructuredODTLoader,
    UnstructuredPowerPointLoader,
    UnstructuredWordDocumentLoader, )
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Milvus
from tqdm import tqdm

load_dotenv(".env")

MILVUS_HOST = os.environ.get('MILVUS_HOST')
MILVUS_PORT = os.environ.get('MILVUS_PORT')
source_directory = os.environ.get('SOURCE_DIRECTORY', 'source_documents')
KNOWLEDGE_FOLDER = os.environ.get('KNOWLEDGE_FOLDER')
KNOWLEDGE_FOLDER_BK = os.environ.get('KNOWLEDGE_FOLDER_BK')
chunk_size = 500
chunk_overlap = 50

# Custom document loaders
class MyElmLoader(UnstructuredEmailLoader):
    """在默认值不起作用时回退到文本纯"""

    def load(self) -> List[Document]:
        """EMl没有 html 使用text/plain"""
        try:
            try:
                doc = UnstructuredEmailLoader.load(self)
            except ValueError as e:
                if 'text/html content not found in email' in str(e):
                    # Try plain text
                    self.unstructured_kwargs["content_source"] = "text/plain"
                    doc = UnstructuredEmailLoader.load(self)
                else:
                    raise
        except Exception as e:
            # Add file_path to exception message
            raise type(e)(f"{self.file_path}: {e}") from e

        return doc

# 映射文件加载
LOADER_MAPPING = {
    ".csv": (CSVLoader, {}),
    # ".docx": (Docx2txtLoader, {}),
    ".doc": (UnstructuredWordDocumentLoader, {}),
    ".docx": (UnstructuredWordDocumentLoader, {}),
    ".enex": (EverNoteLoader, {}),
    ".eml": (MyElmLoader, {}),
    ".epub": (UnstructuredEPubLoader, {}),
    ".html": (UnstructuredHTMLLoader, {}),
    ".md": (UnstructuredMarkdownLoader, {}),
    ".odt": (UnstructuredODTLoader, {}),
    ".pdf": (PDFMinerLoader, {}),
    ".ppt": (UnstructuredPowerPointLoader, {}),
    ".pptx": (UnstructuredPowerPointLoader, {}),
    ".txt": (TextLoader, {"encoding": "utf8"}),
}

def load_single_document(file_path: str) -> List[Document]:
    ext = "." + file_path.rsplit(".", 1)[-1]
    if ext in LOADER_MAPPING:
        loader_class, loader_args = LOADER_MAPPING[ext]
        loader = loader_class(file_path, **loader_args)
        return loader.load()

    raise ValueError(f"文件不存在 '{ext}'")

def load_documents_knowledge(source_dir: str, secondary_directories: str) -> List[Document]:
    """
    Loads all documents from the source documents directory, ignoring specified files
    """
    all_files = []
    for ext in LOADER_MAPPING:
        all_files.extend(

            glob.glob(os.path.join(source_dir, secondary_directories, f"**/*{ext}"), recursive=True)
        )
    filtered_files = [file_path for file_path in all_files if file_path]

    with Pool(processes=os.cpu_count()) as pool:
        results = []
        with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
            for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)):
                results.extend(docs)
                pbar.update()

    return results

def process_documents_knowledge(secondary_directories: str) -> List[Document]:
    """
    加载文档并拆分为块
    """
    print(f"加载文件目录: {KNOWLEDGE_FOLDER}")
    documents = load_documents_knowledge(KNOWLEDGE_FOLDER, secondary_directories)
    if not documents:
        print("没有文件需要加载")
        exit(0)
    print(f"加载 {len(documents)} 文件从 {KNOWLEDGE_FOLDER}")
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    texts = text_splitter.split_documents(documents)
    print(f"切割 {len(texts)} 文本块 (最大. {chunk_size} tokens 令牌)")
    return texts

def main_knowledge(collection_name: str):
    # Create embeddings
    embeddings = OpenAIEmbeddings()

    texts = process_documents_knowledge(collection_name)

    Milvus.from_documents(
        texts,
        collection_name=collection_name,
        embedding=embeddings,
        connection_args={"host": MILVUS_HOST, "port": MILVUS_PORT}
    )

在本文中,我们详细介绍了基于GPT搭建私有知识库聊天机器人的数据训练过程,包括数据训练的依赖、流程和代码展示。数据训练是搭建聊天机器人的重要步骤,希望本文能对读者有所帮助。在下一篇文章中,我们将介绍如何使用训练好的模型进行聊天机器人的测试和使用。