用 MinIO 赢得 RAG 权利

Earn your RAG-ing rights with MinIO

人们常说,在人工智能时代,数据就是你的护城河。为此,构建一个生产级别的 RAG 应用需要一个合适的 数据基础设施,用于存储、版本控制、处理、评估和查询构成你的专有语料库的数据块。由于 MinIO 采用数据优先的方法来构建人工智能,对于此类项目,我们默认的初始基础设施建议是建立一个现代数据湖 (MinIO) 和一个向量数据库。虽然可能需要在此过程中插入其他辅助工具,但这两个基础设施单元是基础。它们将成为将你的 RAG 应用投入生产过程中所遇到的几乎所有任务的重心。

但你陷入了困境。你以前听说过 LLM 和 RAG 这些术语,但由于未知,你并没有深入研究。但如果有一个“Hello World”或样板应用可以帮助你入门,难道不妙吗?

别担心,我之前也和你一样。所以在这篇博文中,我们将演示如何使用 MinIO 使用商品硬件构建一个基于检索增强生成 (RAG) 的聊天应用。

  • 使用 MinIO 存储所有文档、处理后的数据块以及使用向量数据库的嵌入。
  • 使用 MinIO 的存储桶通知功能,在向存储桶添加或删除文档时触发事件。
  • Webhook 消费事件并使用 Langchain 处理文档,并将元数据和分块文档保存到元数据存储桶。
  • 为新添加或删除的分块文档触发 MinIO 存储桶通知事件。
  • 一个 Webhook 消费事件并生成嵌入,并将其保存到持久化在 MinIO 中的向量数据库 (LanceDB) 中。

使用的主要工具

  • MinIO - 对象存储,用于持久化所有数据
  • LanceDB - 无服务器开源向量数据库,将数据持久化到对象存储中
  • Ollama - 在本地运行 LLM 和嵌入模型 (与 OpenAI API 兼容)
  • Gradio - 用于与 RAG 应用交互的界面
  • FastAPI - 用于接收来自 MinIO 的存储桶通知并公开 Gradio 应用的服务器
  • LangChain & Unstructured - 用于从我们的文档中提取有用的文本并将其分块以进行嵌入

使用的模型

  • LLM - Phi-3-128K (3.8B 参数)
  • 嵌入 - Nomic Embed Text v1.5 (Matryoshka 嵌入/ 768 维度,8K 上下文)

启动 MinIO 服务器

如果你还没有 MinIO 二进制文件,可以从 这里 下载。

# 运行分离的 MinIO

!minio server ~/dev/data --console-address :9090 &

启动 Ollama 服务器 + 下载 LLM 和嵌入模型

这里 下载 Ollama。

# 启动服务器

!ollama serve

# 下载 Phi-3 LLM

!ollama pull phi3:3.8b-mini-128k-instruct-q8_0

# 下载 Nomic Embed Text v1.5

!ollama pull nomic-embed-text:v1.5

# 列出所有模型

!ollama ls

使用 FastAPI 创建一个基本的 Gradio 应用来测试模型

LLM_MODEL = "phi3:3.8b-mini-128k-instruct-q8_0"

EMBEDDING_MODEL = "nomic-embed-text:v1.5"

LLM_ENDPOINT = "http://localhost:11434/api/chat"

CHAT_API_PATH = "/chat"



def llm_chat(user_question, history)

     history = history or []

     user_message = f"**你**: {user_question}"

     llm_resp = requests.post(LLM_ENDPOINT,

                              json={"model": LLM_MODEL,

                                   "keep_alive": "48h", # 将模型保留在内存中 48 小时

                                   "messages": [

                                       {"role": "user",

                                        "content": user_question

                                        }

                                   ]},

                              stream=True)

     bot_response = "**AI:** "

     for resp in llm_resp.iter_lines()

         json_data = json.loads(resp)

         bot_response += json_data["message"]["content"]

         yield bot_response

import json

import gradio as gr

import requests

from fastapi import FastAPI, Request, BackgroundTasks

from pydantic import BaseModel

import uvicorn

import nest_asyncio


app = FastAPI()


with gr.Blocks(gr.themes.Soft()) as demo

     gr.Markdown("## 使用 MinIO 的 RAG")

     ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="清除")

     ch_interface.chatbot.show_label = False

     ch_interface.chatbot.height = 600


demo.queue()

    


if __name__ == "__main__"

     nest_asyncio.apply()

     app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH)

     uvicorn.run(app, host="0.0.0.0", port=8808)

测试嵌入模型

import numpy as np


EMBEDDING_ENDPOINT = "http://localhost:11434/api/embeddings"

EMBEDDINGS_DIM = 768


def get_embedding(text)

     resp = requests.post(EMBEDDING_ENDPOINT,

                          json={"model": EMBEDDING_MODEL,

                               "prompt": text})

     return np.array(resp.json()["embedding"][:EMBEDDINGS_DIM], dtype=np.float16)

## 使用示例文本进行测试

get_embedding("什么是 MinIO?")

摄取管道概述

创建 MinIO 存储桶

使用 mc 命令或从 UI 中进行

  • custom-corpus - 用于存储所有文档
  • warehouse - 用于存储所有元数据、数据块和向量嵌入

!mc alias set 'myminio' 'http://localhost:9000' 'minioadmin' 'minioadmin'

!mc mb myminio/custom-corpus

!mc mb myminio/warehouse

创建消费来自 custom-corpus 存储桶的存储桶通知的 Webhook

import json

import gradio as gr

import requests

from fastapi import FastAPI, Request

from pydantic import BaseModel

import uvicorn

import nest_asyncio


app = FastAPI()


@app.post("/api/v1/document/notification")

async def receive_webhook(request: Request)

     json_data = await request.json()

     print(json.dumps(json_data, indent=2))


with gr.Blocks(gr.themes.Soft()) as demo

     gr.Markdown("## 使用 MinIO 的 RAG")

     ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="清除")

     ch_interface.chatbot.show_label = False


demo.queue()

    


if __name__ == "__main__"

     nest_asyncio.apply()

     app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH)

     uvicorn.run(app, host="0.0.0.0", port=8808)

## 使用示例文本进行测试

get_embedding("什么是 MinIO?")

创建 Webhook 事件

在控制台中,转到事件 -> 添加事件目标 -> Webhook

使用以下值填写字段并点击保存

标识符 - doc-webhook

端点 - http://localhost:8808/api/v1/document/notification

在提示时,点击顶部的重新启动 MinIO。

(注意:你也可以使用 mc 来完成此操作)

在控制台中,转到存储桶 (管理员) -> custom-corpus -> 事件

使用以下值填写字段并点击保存

ARN - 从下拉列表中选择 doc-webhook

选择事件 - 选中 PUT 和 DELETE

(注意:你也可以使用 mc 来完成此操作)

我们已经设置了第一个 Webhook。

现在通过添加和删除对象进行测试。

从文档中提取数据并进行分块

我们将使用 Langchain 和 Unstructured 从 MinIO 中读取对象并将文档拆分为多个数据块。

from langchain_text_splitters import RecursiveCharacterTextSplitter

from langchain_community.document_loaders import S3FileLoader


MINIO_ENDPOINT = "http://localhost:9000"

MINIO_ACCESS_KEY = "minioadmin"

MINIO_SECRET_KEY = "minioadmin"



# 使用 chunk_size 个字符数从给定文档中拆分文本

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024,

                                                chunk_overlap=64,

                                                length_function=len)



def split_doc_by_chunks(bucket_name, object_key)

     loader = S3FileLoader(bucket_name,

                           object_key,

                           endpoint_url=MINIO_ENDPOINT,

                           aws_access_key_id=MINIO_ACCESS_KEY,

                           aws_secret_access_key=MINIO_SECRET_KEY)

     docs = loader.load()

     doc_splits = text_splitter.split_documents(docs)

     return doc_splits

# 测试分块

split_doc_by_chunks("custom-corpus", "The-Enterprise-Object-Store-Feature-Set.pdf")

将分块逻辑添加到 Webhook

将分块逻辑添加到 Webhook,并将元数据和数据块保存到 warehouse 存储桶

导入 urllib.parse

导入 s3fs


METADATA_PREFIX = "metadata"


# 使用 s3fs 从 MinIO 保存和删除对象

s3 = s3fs.S3FileSystem()



# 拆分文档并将元数据保存到仓库桶

def create_object_task(json_data)

    for record in json_data["Records"]

        bucket_name = record["s3"]["bucket"]["name"]

        object_key = urllib.parse.unquote(record["s3"]["object"]["key"])

        print(record["s3"]["bucket"]["name"],

              record["s3"]["object"]["key"])


        doc_splits = split_doc_by_chunks(bucket_name, object_key)


        for i, chunk in enumerate(doc_splits)

            source = f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}/chunk_{i:05d}.json"

            with s3.open(source, "w") as f

                f.write(chunk.json())

    return "任务完成!"



def delete_object_task(json_data)

    for record in json_data["Records"]

        bucket_name = record["s3"]["bucket"]["name"]

        object_key = urllib.parse.unquote(record["s3"]["object"]["key"])

        s3.delete(f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}", recursive=True)

    return "任务完成!"

使用新逻辑更新 FastAPI 服务器

import json

import gradio as gr

import requests

from fastapi import FastAPI, Request, BackgroundTasks

from pydantic import BaseModel

import uvicorn

import nest_asyncio


app = FastAPI()


@app.post("/api/v1/document/notification")

async def receive_webhook(request: Request, background_tasks: BackgroundTasks)

     json_data = await request.json()

    if json_data["EventName"] == "s3:ObjectCreated:Put"

        print("新对象已创建!")

        background_tasks.add_task(create_object_task, json_data)

    if json_data["EventName"] == "s3:ObjectRemoved:Delete"

        print("对象已删除!")

        background_tasks.add_task(delete_object_task, json_data)

    return {"status": "success"}


with gr.Blocks(gr.themes.Soft()) as demo

     gr.Markdown("## 使用 MinIO 的 RAG")

     ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="清除")

     ch_interface.chatbot.show_label = False


demo.queue()

    


if __name__ == "__main__"

     nest_asyncio.apply()

     app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH)

     uvicorn.run(app, host="0.0.0.0", port=8808)

添加新的 webhook 来处理文档元数据/分块

现在我们有了第一个 webhook,下一步是获取所有带有元数据的分块,生成嵌入并将它们存储在向量数据库中。

import json

import gradio as gr

import requests

from fastapi import FastAPI, Request, BackgroundTasks

from pydantic import BaseModel

import uvicorn

import nest_asyncio


app = FastAPI()




@app.post("/api/v1/metadata/notification")

async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks)

     json_data = await request.json()

     print(json.dumps(json_data, indent=2))


@app.post("/api/v1/document/notification")

async def receive_webhook(request: Request, background_tasks: BackgroundTasks)

     json_data = await request.json()

    if json_data["EventName"] == "s3:ObjectCreated:Put"

        print("新对象已创建!")

        background_tasks.add_task(create_object_task, json_data)

    if json_data["EventName"] == "s3:ObjectRemoved:Delete"

        print("对象已删除!")

        background_tasks.add_task(delete_object_task, json_data)

    return {"status": "success"}


with gr.Blocks(gr.themes.Soft()) as demo

     gr.Markdown("## 使用 MinIO 的 RAG")

     ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="清除")

     ch_interface.chatbot.show_label = False


demo.queue()

    


if __name__ == "__main__"

     nest_asyncio.apply()

     app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH)

     uvicorn.run(app, host="0.0.0.0", port=8808)

创建 Webhook 事件

在控制台中,转到事件 -> 添加事件目标 -> Webhook

使用以下值填写字段并点击保存

标识符 - metadata-webhook

端点 - http://localhost:8808/api/v1/metadata/notification

提示时点击顶部的重新启动 MinIO

(注意:你也可以使用 mc 来完成此操作)

在控制台中转到桶(管理员)-> 仓库 -> 事件

使用以下值填写字段并点击保存

ARN - 从下拉菜单中选择 metadata-webhook

前缀 - metadata/

后缀 - .json

选择事件 - 选中 PUT 和 DELETE

(注意:你也可以使用 mc 来完成此操作)

我们已经设置了第一个 Webhook。

现在通过在 custom-corpus 中添加和删除对象来测试此 webhook 是否被触发

在 MinIO 中创建 LanceDB 向量数据库

现在我们已经有了基本的 webhook,让我们在 MinIO 仓库桶中设置 lanceDB 向量数据库,我们将在其中保存所有嵌入和额外的元数据字段

导入 os

导入 lancedb



# 为 lanceDB 设置这些环境变量以连接到 MinIO

os.environ["AWS_DEFAULT_REGION"] = "us-east-1"

os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY

os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_KEY

os.environ["AWS_ENDPOINT"] = MINIO_ENDPOINT

os.environ["ALLOW_HTTP"] = "True"



db = lancedb.connect("s3://warehouse/v-db/")

# 列出现有表格

db.table_names()

# 使用 pydantic 架构创建一个新表格

from lancedb.pydantic import LanceModel, Vector

导入 pyarrow as pa


DOCS_TABLE = "docs"

EMBEDDINGS_DIM = 768


table = None



class DocsModel(LanceModel)

    parent_source: str # 实际对象/文档来源

    source: str # 分块/元数据来源

    text: str # 分块文本

    vector: Vector(EMBEDDINGS_DIM, pa.float16()) # 要存储的向量



def get_or_create_table()

    global table

    if table is None and DOCS_TABLE not in list(db.table_names())

        return db.create_table(DOCS_TABLE, schema=DocsModel)

    if table is None

        table = db.open_table(DOCS_TABLE)

    return table

# 检查是否成功

get_or_create_table()

# 列出现有表格

db.table_names()

将存储/删除数据从 lanceDB 添加到 metadata-webhook

导入 multiprocessing


EMBEDDING_DOCUMENT_PREFIX = "search_document"


# 添加队列,将已处理的元数据保存在内存中

add_data_queue = multiprocessing.Queue()

delete_data_queue = multiprocessing.Queue()


def create_metadata_task(json_data)

    for record in json_data["Records"]

        bucket_name = record["s3"]["bucket"]["name"]

        object_key = urllib.parse.unquote(record["s3"]["object"]["key"])

        print(bucket_name,

              object_key)

        with s3.open(f"{bucket_name}/{object_key}", "r") as f

            data = f.read()

            chunk_json = json.loads(data)

            embeddings = get_embedding(f"{EMBEDDING_DOCUMENT_PREFIX}: {chunk_json['page_content']}")

            add_data_queue.put({

                "text": chunk_json["page_content"],

                "parent_source": chunk_json.get("metadata", "").get("source", ""),

                "source": f"{bucket_name}/{object_key}",

                "vector": embeddings

            })

    return "元数据创建任务完成!"



def delete_metadata_task(json_data)

    for record in json_data["Records"]

        bucket_name = record["s3"]["bucket"]["name"]

        object_key = urllib.parse.unquote(record["s3"]["object"]["key"])

        delete_data_queue.put(f"{bucket_name}/{object_key}")

    return "元数据删除任务完成!"

添加一个调度程序来处理队列中的数据

from apscheduler.schedulers.background import BackgroundScheduler

导入 pandas as pd


def add_vector_job()

    data = []

    table = get_or_create_table()


    while not add_data_queue.empty()

        item = add_data_queue.get()

        data.append(item)


    if len(data) > 0

        df = pd.DataFrame(data)

        table.add(df)

        table.compact_files()

        print(len(table.to_pandas()))



def delete_vector_job()

    table = get_or_create_table()

    source_data = []

    while not delete_data_queue.empty()

        item = delete_data_queue.get()

        source_data.append(item)

    if len(source_data) > 0

        filter_data = ", ".join([f'"{d}"' for d in source_data])

        table.delete(f'source IN ({filter_data})')

        table.compact_files()

        table.cleanup_old_versions()

        print(len(table.to_pandas()))



scheduler = BackgroundScheduler()


scheduler.add_job(add_vector_job, 'interval', seconds=10)

scheduler.add_job(delete_vector_job, 'interval', seconds=10)

使用向量嵌入更改更新 FastAPI

import json

import gradio as gr

import requests

from fastapi import FastAPI, Request, BackgroundTasks

from pydantic import BaseModel

import uvicorn

import nest_asyncio


app = FastAPI()



@app.on_event("startup")

async def startup_event()

    get_or_create_table()

    if not scheduler.running

        scheduler.start()



@app.on_event("shutdown")

async def shutdown_event()

    scheduler.shutdown()


@app.post("/api/v1/metadata/notification")

async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks)

     json_data = await request.json()

    if json_data["EventName"] == "s3:ObjectCreated:Put"

        print("新元数据已创建!")

        background_tasks.add_task(create_metadata_task, json_data)

    if json_data["EventName"] == "s3:ObjectRemoved:Delete"

        print("元数据已删除!")

        background_tasks.add_task(delete_metadata_task, json_data)

    return {"status": "success"}


@app.post("/api/v1/document/notification")

async def receive_webhook(request: Request, background_tasks: BackgroundTasks)

     json_data = await request.json()

    if json_data["EventName"] == "s3:ObjectCreated:Put"

        print("新对象已创建!")

        background_tasks.add_task(create_object_task, json_data)

    if json_data["EventName"] == "s3:ObjectRemoved:Delete"

        print("对象已删除!")

        background_tasks.add_task(delete_object_task, json_data)

    return {"status": "success"}


with gr.Blocks(gr.themes.Soft()) as demo

     gr.Markdown("## 使用 MinIO 的 RAG")

     ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="清除")

     ch_interface.chatbot.show_label = False

     ch_interface.chatbot.height = 600


demo.queue()

    


if __name__ == "__main__"

     nest_asyncio.apply()

     app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH)

     uvicorn.run(app, host="0.0.0.0", port=8808)

现在我们已经有了摄取管道,让我们集成最终的 RAG 管道。

添加向量搜索功能

现在我们已经将文档摄取到 lanceDB 中,让我们添加搜索功能

EMBEDDING_QUERY_PREFIX = "search_query"


def search(query, limit=5)

    query_embedding = get_embedding(f"{EMBEDDING_QUERY_PREFIX}: {query}")

    res = get_or_create_table().search(query_embedding).metric("cosine").limit(limit)

    return res

# 让我们测试一下是否有效


res = search("什么是 MinIO 企业对象存储精简版?")

res.to_list()

提示 LLM 使用相关文档

RAG_PROMPT = """

DOCUMENT

{documents}


QUESTION

{user_question}


INSTRUCTIONS

使用上面的 DOCUMENT 文本详细回答用户的 QUESTION。

保持您的回答以 DOCUMENT 中的事实为基础。不要使用诸如“文档指出”之类的句子来引用文档。

如果 DOCUMENT 不包含回答 QUESTION 的事实,则只回复“对不起!我不知道”。

"""

context_df = []


def llm_chat(user_question, history)

     history = history or []

    global context_df

    # 搜索相关文档分块

    res = search(user_question)

    documents = " ".join([d["text"].strip() for d in res.to_list()]) 

    # 将分块传递给 LLM 以获得有事实依据的响应

     llm_resp = requests.post(LLM_ENDPOINT,

                              json={"model": LLM_MODEL,

                                   "messages": [

                                       {"role": "user",

                                        "content": RAG_PROMPT.format(user_question=user_question, documents=documents)

                                        }

                                   ],

                                   "options": {

                                       # "temperature": 0,

                                       "top_p": 0.90,

                                   }},

                              stream=True)

     bot_response = "**AI:** "

     for resp in llm_resp.iter_lines()

         json_data = json.loads(resp)

         bot_response += json_data["message"]["content"]

         yield bot_response

    context_df = res.to_pandas()

    context_df = context_df.drop(columns=['source', 'vector'])



def clear_events()

    global context_df

    context_df = []

    return context_df

更新 FastAPI 聊天端点以使用 RAG

import json

import gradio as gr

import requests

from fastapi import FastAPI, Request, BackgroundTasks

from pydantic import BaseModel

import uvicorn

import nest_asyncio


app = FastAPI()



@app.on_event("startup")

async def startup_event()

    get_or_create_table()

    if not scheduler.running

        scheduler.start()



@app.on_event("shutdown")

async def shutdown_event()

    scheduler.shutdown()


@app.post("/api/v1/metadata/notification")

async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks)

     json_data = await request.json()

    if json_data["EventName"] == "s3:ObjectCreated:Put"

        print("新元数据已创建!")

        background_tasks.add_task(create_metadata_task, json_data)

    if json_data["EventName"] == "s3:ObjectRemoved:Delete"

        print("元数据已删除!")

        background_tasks.add_task(delete_metadata_task, json_data)

    return {"status": "success"}


@app.post("/api/v1/document/notification")

async def receive_webhook(request: Request, background_tasks: BackgroundTasks)

     json_data = await request.json()

    if json_data["EventName"] == "s3:ObjectCreated:Put"

        print("新对象已创建!")

        background_tasks.add_task(create_object_task, json_data)

    if json_data["EventName"] == "s3:ObjectRemoved:Delete"

        print("对象已删除!")

        background_tasks.add_task(delete_object_task, json_data)

    return {"status": "success"}


with gr.Blocks(gr.themes.Soft()) as demo

     gr.Markdown("## 使用 MinIO 的 RAG")

     ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="清除")

     ch_interface.chatbot.show_label = False

     ch_interface.chatbot.height = 600

    gr.Markdown("### 提供的上下文")

    context_dataframe = gr.DataFrame(headers=["parent_source", "text", "_distance"], wrap=True)

    ch_interface.clear_btn.click(clear_events, [], context_dataframe)


    @gr.on(ch_interface.output_components, inputs=[ch_interface.chatbot], outputs=[context_dataframe])

    def update_chat_context_df(text)

        global context_df

        if context_df is not None

            return context_df

        return ""


demo.queue()

    


if __name__ == "__main__"

     nest_asyncio.apply()

     app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH)

     uvicorn.run(app, host="0.0.0.0", port=8808)

您是否能够完成并实现基于 MinIO 作为数据湖后端的 RAG 聊天?在不久的将来,我们将围绕同一主题举办网络研讨会,我们将为您现场演示如何构建这个基于 RAG 的聊天应用程序。

RAGs-R-Us

作为专注于 MinIO 人工智能集成的开发人员,我一直在探索如何将我们的工具无缝集成到现代人工智能架构中,以提高效率和可扩展性。在这篇文章中,我们向您展示了如何将 MinIO 与检索增强生成 (RAG) 集成以构建聊天应用程序。这只是冰山一角,可以帮助您在构建 RAG 和 MinIO 的更多独特用例方面的探索中获得优势。现在您拥有构建它的基础。让我们一起行动吧!

如果您对 MinIO RAG 集成有任何疑问,请务必在 Slack 上联系我们!