利用 MinIO、Weaviate 和 Langchain 优化 AI 数据处理,构建检索增强生成 (RAG) 管道

作为一名专注于 MinIO 人工智能集成的开发者,我一直在探索如何将我们的工具无缝集成到现代人工智能架构中,以提高效率和可扩展性。在本文中,我们将深入探讨 MinIO 与检索增强生成 (RAG) 管道和 Weaviate 向量存储的集成,并使用 LangChain。我们的目标是创建一个强大的数据处理框架,不仅能够提高工作流程效率,还能有效地管理完整的数据生命周期。
RAG 简介
检索增强生成(RAG)是一种变革性的人工智能方法,它将从数据库中检索相关信息与生成模型相结合,以生成既有信息量又与上下文相关的响应。这种方法通过利用外部知识来源增强了人工智能生成细致入微的答案的能力,从而显著提高了生成内容的质量。
RAG 通过一个过程运行,该过程首先识别并从数据库或知识库中检索相关信息,然后使用这些信息来告知和生成响应。其检索策略的灵活性,从密集向量搜索到基于关键词的方法,使 RAG 能够适应各种应用程序,确保生成的响应不仅在上下文上准确,而且是最新的。
使用 RAG 的优势包括:无需重新训练模型即可动态更新源知识、通过可追溯的来源增强人工智能的响应质量以及减少错误。但是,RAG 系统的有效性很大程度上取决于检索准确性和生成模型的局限性之间的平衡,例如处理大型上下文或避免“幻觉”内容。
要深入了解 RAG 在人工智能生成内容中的机制和应用,arXiv 等资源提供了全面的调查和研究。此外,Papers with Code 和 Elastic Search Labs 等平台提供了有关 RAG 操作细微差别的实用见解,解决了其部署的挑战和注意事项。
所讨论技术的概述
以下是开发概念验证的要素:
- Weaviate:充当强大的向量搜索引擎,通过语义搜索功能实现高效的数据检索。这项技术在 RAG 管道中发挥着重要作用,可以根据查询的上下文快速准确地获取相关数据。
- MinIO:作为高性能对象存储系统,对于存储和管理人工智能和 RAG 管道各个阶段的数据至关重要。MinIO 的可扩展性和可靠性确保了高效处理大型数据集,支持人工智能模型对海量信息的需求。
- LangChain:提供将人工智能模型与 RAG 管道其他组件集成的基本框架。它允许精确控制人工智能模型、数据检索过程和存储系统之间的交互,方便自定义开发人工智能解决方案。
通过这种方法,我们将探索在 RAG 管道内将 MinIO 与 Weaviate 和 LangChain 集成的潜力。
为什么 MinIO 在人工智能数据处理中至关重要
MinIO 作为对象存储解决方案,为管理人工智能操作中的数据提供了关键基础设施。与将文件组织成层次结构的传统数据存储不同,MinIO 将数据视为对象。这种抽象至关重要,因为人工智能处理数据不仅仅作为文件或文档,而是作为包含元数据的对象。这种基于对象的方法简化了交互,允许人工智能系统通过统一的 API 在细粒度级别管理和理解数据。
通过桶简化数据管理
MinIO 架构的核心是其桶级组织,这简化了复杂的数据管理环境。MinIO 中的桶有助于对对象数据及其关联的元数据进行分段和管理,维护诸如版本控制、对象锁定和自定义保留策略等基本特征。这种分段对于在人工智能应用程序中执行数据治理和操作控制至关重要。
使用 Webhook 增强人工智能集成
在 MinIO 中使用 Webhook 为人工智能集成开辟了一条强大的途径。Webhook 允许在数据在存储环境中发生变化时执行 Lambda 函数,提供了一种动态方法来触发人工智能驱动的流程。此功能在需要实时数据处理的场景中特别有用,例如在动态 ETL 管道中。我之前的一篇文章“动态 ETL 管道:使用 Unstructured-IO 为 MinIO 和 Weaviate 提供 Web 数据以增强人工智能”中详细介绍了此示例,该文章演示了如何使用自定义对象填充 MinIO 和 Weaviate,通过人工智能增强功能将 URL 转换为查询生成的内容。
对象级数据灵活性
在桶的有序结构中,MinIO 可以存储各种类型的数据,从原始和半结构化数据(如日志和媒体)到结构化数据(如代码和文档)。这种多功能性类似于在存储库中管理代码的方式;但是,MinIO 通过将每个项目视为一个包含其生命周期和元数据的完整对象来进一步扩展此功能。随着人工智能技术的不断发展,LangChain 等框架将利用这些对象数据原则,增强 MinIO 在现代人工智能生态系统中的相关性。使用 MinIO 桶存储人工智能特性和配置的能力也带来了另一个重要优势,特别是对于构建自定义人工智能执行引擎的开发人员而言。
MinIO 在 RAG 管道中的关键集成点
在 (RAG) 管道中,MinIO 可以执行多个关键功能,从而提高管道的效率、可扩展性和可靠性。以下是 MinIO 在这些复杂框架中可以发挥的关键作用的更深入探讨
- 集中式原始数据管理:首先,MinIO 在集中和扩展原始数据存储方面发挥着至关重要的作用。它是整个管道的支柱,确保原始数据集随时可用于后续处理和分析。这种集中式方法不仅简化了数据访问,还巩固了 RAG 管道运行的基础。
- 增强数据准备:数据通过管道的过程涉及大量清理和预处理,MinIO 的强大版本控制和易访问机制使这些任务更加高效。此功能确保已清理和准备好的数据以有序的方式存储,准备进行下一步转换,从而加快数据准备阶段。
- 简化中间过程:随着数据经历各种转换和拆分,MinIO 中的中间数据工件存储变得不可或缺。通过管理这些数据处理阶段,MinIO 支持无缝工作流程,确保每个数据片段都被记录在案并可用于进一步处理。
- 动态提示管理:RAG 管道中查询的动态特性要求灵活的提示管理,这是 MinIO 擅长处理的功能。通过存储提示或查询模板,MinIO 使能够生成既相关又针对特定需求量身定制的查询,从而增强了管道的响应能力。
- 增强数据处理:对于增强数据存储,MinIO 提供可扩展的解决方案,以满足通过管道丰富的数据工件的存储需求。这确保了对于生成细致入微的响应至关重要的增强内容得到有效管理和存储,并随时准备用于内容创建。
- 存档生成结果:将存档生成结果存储在 MinIO 中以供历史分析和轻松检索是另一个关键集成点。此功能不仅保留了输出以供将来参考,还提供了对生成内容演变的见解,支持持续改进。
- 模型和配置版本控制:MinIO 对模型和工件版本控制的支持对于促进 RAG 管道内的实验和开发至关重要。此功能允许跟踪迭代和配置,从而更轻松地恢复更改或探索新方向而不会丢失进度。
- 备份和恢复:在人工智能驱动系统中,备份和恢复超越了灾难预防。对于人工智能操作,特别是那些涉及 Weaviate 等向量存储数据库的操作,能够有效保存和恢复数据至关重要。此功能不仅与防止数据丢失有关,还与保留人工智能系统的“记忆”有关。
人工智能技术本身并不具备记住其处理的每个数据片段的能力;因此,实施强大的备份和恢复协议对于维护数据驱动见解的连续性和完整性至关重要。MinIO 在此方面发挥着至关重要的作用,它提供了一个弹性的基础设施来备份和恢复这些关键数据结构。
之前关于如何使用 MinIO S3 存储桶备份 Weaviate的讨论,阐明了这些概念的实际应用。MinIO 提供的功能不仅可以防止数据丢失,还可以确保 AI 系统能够回忆和再生其“记忆”——向量存储类,从而保持操作一致性并增强系统的整体弹性。
使用 MinIO 优化 RAG
本指南涵盖了初始化与 Weaviate 和 MinIO 的连接。访问 MinIO 博客资产存储库;您可以在这里找到原始 LangChain Weaviate-RAG 脚本,以及演示 MinIO Python SDK 的有效性的笔记本,以及如何将其应用于优化Weaviate-RAG流程。
以下代码块是来自 GitHub 的LangChain 的“rag-weaviate”示例。从这一点开始,我们可以深入源代码,并开始逻辑地演示如何将 MinIO 集成到这个预先存在的 Weaviate RAG 管道中。
import os
from langchain_community.chat_models import ChatOpenAI
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Weaviate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain.text_splitter import RecursiveCharacterTextSplitter
if os.environ.get("WEAVIATE_API_KEY", None) is None:
raise Exception("Missing `WEAVIATE_API_KEY` environment variable.")
if os.environ.get("WEAVIATE_ENVIRONMENT", None) is None:
raise Exception("Missing `WEAVIATE_ENVIRONMENT` environment variable.")
WEAVIATE_INDEX_NAME = os.environ.get("WEAVIATE_INDEX", "langchain-test")
### Ingestion via WebBasedLoader
# Load
loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
data = loader.load()
### Split text via RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)
all_splits = text_splitter.split_documents(data)
### Add to vectorDB
# vectorstore = Weaviate.from_documents(
# documents=all_splits, embedding=OpenAIEmbeddings(), index_name=WEAVIATE_INDEX_NAME
# )
# retriever = vectorstore.as_retriever()
vectorstore = Weaviate.from_existing_index(WEAVIATE_INDEX_NAME, OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
### RAG prompt
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
### RAG pipeline
model = ChatOpenAI()
chain = (
RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
| prompt
| model
| StrOutputParser()
)
# Add typing for input
class Question(BaseModel):
__root__: str
chain = chain.with_types(input_type=Question)
原始 LangChain Weaviate RAG 脚本
在 LangChain-Weaviate RAG 管道中集成 MinIO SDK
以上代码是我们的起点,并为我们提供了一个画布,可以传达几个更简单的集成点。
初始化和设置
首先设置MinIO 客户端,此设置是基础,因为它支持 RAG 管道中的整个数据管理流程。
from minio import Minio
# Initialize the MinIO client with appropriate access credentials
minio_client = Minio(
"play.min.io:443",
access_key="minioadmin",
secret_key="minioadmin",
secure=True
)
通过 minio_client 连接到 play.min.io:443 服务器
修改数据摄取
不要使用默认的WebBaseLoader进行数据摄取,而是转换为从 MinIO 直接检索数据的方法。此更改简化了数据访问流程,这对于从一开始就提高管道的效率至关重要。
from io import BytesIO
# Define a function to fetch data directly from a MinIO bucket
def fetch_data_from_s3(bucket_name, object_name):
response = minio_client.get_object(bucket_name, object_name)
data = BytesIO(response.read())
response.close()
return data.getvalue().decode('utf-8')
# Example usage
bucket_name = "hydrate-bucket"
object_name = "urls.txt"
data = fetch_data_from_s3(bucket_name, object_name)
从存储桶中获取数据的代码片段
有关为 MinIO Python SDK 和 Weaviate 填充数据的更多演示... 请参阅博客资产存储库中的笔记本此处。
处理和存储分割数据
检索原始数据后,使用 LangChain 中的`RecursiveCharacterTextSplitter`将数据分割成可管理的块,为更深入的分析和处理做好准备。
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Split the retrieved data into smaller text chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)
all_splits = text_splitter.split_documents(data)
# Store the processed data splits back into MinIO for easy retrieval and further use
def store_data_in_s3(bucket_name, object_name, content):
content_bytes = content.encode('utf-8')
minio_client.put_object(bucket_name, object_name, BytesIO(content_bytes), len(content_bytes))
# Example of storing each split in MinIO
for index, split in enumerate(all_splits):
store_data_in_s3("clean-bucket", f"split_{index}.txt", split)
保存分割的代码片段
有关使用 MinIO Python SDK 保存分割的更多演示,请参阅博客资产存储库中的笔记本此处。
检索和利用 RAG 提示
从 MinIO 中检索必要的RAG 提示,这些提示对于指导 RAG 管道中 AI 模型的响应生成至关重要。
# Fetch the RAG prompt stored in a MinIO bucket
# Function to retreive rag-prompt.txt from prompt-bucket
def get_prompt_from_minio(client, bucket_name, object_name):
"""
Retrieve a text file from a specified MinIO bucket and return its content as a string.
:param client: Minio client instance
:param bucket_name: Name of the MinIO bucket
:param object_name: Object name in the bucket (e.g., 'rag-prompt.txt')
:return: Content of the prompt as a string
"""
try:
# Get object data from the specified bucket
response = client.get_object(bucket_name, object_name)
data = response.read() # Read data from the response
prompt_content = data.decode('utf-8') # Decode bytes to string
return prompt_content
except Exception as e:
print(f"Failed to retrieve prompt: {e}")
return None
# Usage
prompt = get_prompt_from_minio(minio_client, "prompt-bucket", "rag-prompt.txt")
if prompt:
print(f"Retrieved prompt content: {prompt}")
else:
print("No content retrieved.")
提示检索代码片段
有关使用 MinIO Python SDK 进行提示检索的更多演示,请参阅博客资产存储库中的笔记本此处。
执行 RAG 流程链
所有元素到位后,以下是如何使用配置的 LangChain 框架执行 RAG 流程的RetrievalQA 链的示例,利用检索器和语言模型生成上下文相关的答案。
from langchain.chains import RetrievalQA
# Setup the RetrievalQA chain with the language model and the vector store retriever
qa_chain = RetrievalQA.from_chain_type(
llm,
retriever=vectorstore.as_retriever(),
chain_type_kwargs={"prompt": prompt}
)
# Execute the RAG process with a sample query
question = "What are the approaches to Task Decomposition?"
result = qa_chain({"query": question})
print(result["result"])
RetrievalQA 链代码片段
由于 MinIO 提供的基础设施,开发自定义集成 AI 的可能性是无限的,并且 MinIO 可以提供。这种方法突出了 MinIO 在 RAG 管道中的多功能作用及其简化操作的潜力,为管理 AI 驱动的数据处理和内容生成任务的生命周期提供了一个实用且高效的解决方案。
集成 MinIO 的 RAG 管道的评估
利用 MinIO、Weaviate 和 LangChain 的改进型 RAG 管道从详细和有条理的评估流程中获益匪浅。以下是如何简化此方法
Ragas 框架应用
Ragas 作为我们评估的核心,为检索和生成提供全面的指标
- 上下文相关性和召回率:确保检索到的信息全面且与查询相关。
- 答案的真实性和相关性:验证生成的答案在事实上的准确性,并直接解决提出的问题。
Ragas 的 LLM 驱动的评估提供了协调的分数,反映了整体 QA 性能。其对 LLM 的细致使用包括将答案解析为针对上下文的可验证陈述,将预测的潜在问题与实际问题进行比较以确定相关性,并确保检索到的上下文中存在所有支持信息。
使用 LangSmith 进行持续评估
LangSmith 通过提供以下功能增强了 Ragas
- 一个用于创建和管理测试数据集以及运行评估的平台。
- 用于可视化和分析结果的工具,使 Ragas 指标透明且可操作。
- 用于使用真实世界数据增强评估的功能,从生产日志中添加测试示例。
这些工具共同提供了一种迭代和动态的评估方法,能够提供实时见解并持续改进 RAG 管道。
这种评估策略强调了管道的功能和适应性,旨在维护一个健壮而高效的系统。通过关注性能指标和持续评估,它为持续改进和优化集成 RAG 管道提供了一条途径
进一步探索 RAG 和 AI
为了更深入地了解检索增强生成 (RAG) 和相关技术,我整理了以下资源,这些资源在我自己的探索中很有帮助
- Hugging Face 的 RAG 文档: Hugging Face 提供的关于 RAG 模型的详细文档。
- 使用 Ragas 和 LangSmith 评估 RAG: 使用 Ragas 和 LangSmith 评估 RAG 系统的指南。
- 用于大型语言模型的检索增强生成:综述: 一篇关于将检索机制与大型语言模型相结合的进展的调查论文。
- 用于 AI 生成内容的检索增强生成:综述: 本文全面考察了 RAG 系统在生成 AI 内容方面的有效性。
- ARES:用于检索增强生成系统的自动化评估框架: ARES 微调语言模型以自主评估 RAG 系统,减少对人工注释的依赖。
- RaLLe:一个用于开发和评估检索增强大型语言模型的框架: RaLLe 提供了构建和测试 R-LLM 的工具,增强了检索和生成过程的透明度和优化。
最终思考:使用 MinIO 增强 AI 工作流程
当我们探索将 MinIO 与检索增强生成 (RAG) 管道和使用 Langchain 的 Weaviate 向量存储集成时,很明显,这种组合为 AI 数据处理提供了重大改进。通过利用 MinIO 强大的存储功能,以及 Weaviate 高效的向量存储和灵活的 Langchain 框架,我们可以简化工作流程并在整个生命周期内更有效地管理数据。
集成的影响超出了当前的增强功能,为更广泛的 AI 领域内的创新开辟了新途径。它有望扩展 AI 系统的功能,尤其是在生成细致入微、上下文丰富的响应方面,这可能会变得更加普遍。工具的持续发展和改进不仅可以简化现有流程,而且还可以为 AI 应用的新突破铺平道路。
对于那些热衷于深入研究蓬勃发展的 RAG 技术及其应用的人,请在MinIO Slack 频道上与我们互动,以便我们可以培养一个协作的环境,促进探索和发展。未来的旅程将利用尖端技术的持续改进和集成来释放 AI 的新功能。