使用 MinIO、Langchain 和 Ray Data 构建分布式嵌入子系统

Build a Distributed Embedding Subsystem with MinIO, Langchain, and Ray Data

嵌入子系统是实现检索增强生成 (RAG) 所需的四个子系统之一。它将您的自定义语料库转换为可用于语义搜索的向量数据库。其他子系统包括用于创建自定义语料库的数据管道、用于查询向量数据库以向用户查询添加更多上下文信息的检索器,以及最后用于托管您的大型语言模型 (LLM) 并根据用户的查询和向量数据库中找到的上下文生成答案的服务子系统。下图显示了这四个子系统在检索增强生成过程中的协同工作方式。

在这篇文章中,我想要重点关注嵌入子系统。在这个子系统中,构成您组织自定义语料库的文档从其原生格式转换为文本,被分割成更小的块,然后为每个块创建嵌入(这通常是一个维度在数百范围内的向量)。嵌入创建完成后,原始块和向量都会存储在向量数据库中。

嵌入子系统在概念上很容易理解,而且实现一个用于嵌入简单文本文件的简单脚本也相当直观。但是,如果您需要为您的组织实现一个嵌入子系统,那么您该如何为您的组织做出正确的设计决策,以及如何处理随着需求增长而带来的复杂性呢?以下列出了几个设计决策和现实世界的复杂性。

  • 如何有效地运行多个实验来测试不同的配置选项?
  • 如何处理文档中的表格和图像?
  • 如何将嵌入子系统部署到生产环境?
  • 如何处理需要嵌入的大量文档?
  • 哪种向量数据库最佳?
  • 文档、嵌入模型和 LLM 最佳的存储选项是什么?

解决这些问题的第一步是使用能够在您的工程工作站以及生产环境中运行的现代工具。具体而言,我们将使用 MinIO 进行所有存储,使用 Langchain 来实现低代码的文档解析解决方案(我还会提供一些比 Langchain 更善于处理图像和表格的选项),以及使用 Ray Data 将分块和嵌入功能分布到集群中。毫无疑问,分布式技术是我们解决方案的基础。您不仅可以利用 商品硬件 设置进行并行处理,从而获得高吞吐量,而且该解决方案是云原生的,使其能够 跨云移植 并在本地运行。

让我们从为我们的实验设置一个自定义语料库开始。

在 MinIO 中设置自定义语料库

如上所示,自定义语料库由一个数据管道创建,该管道将可能位于您组织中多个门户网站中的文档聚合到 MinIO 中。创建文档管道是另一篇文章的主题——所以现在,我们将手动将一些文档放到 MinIO 存储桶中。为了简单起见,我只使用文本文档。但是,以下是一些处理多个文件格式和文档中非文本内容的提示。首先,查看 Unstructured 的库 以进行分区、清理和提取。其次,如果您只处理 PDF 文件,请查看 Open-Parse 库。我们在之前的博文中介绍过 Open-Parse,文章标题为 使用 Open-Parse 智能分块提高 RAG 性能.

下面的屏幕截图显示了我们的自定义语料库。我从 古腾堡计划 下载了四本被认为是经典的流行书籍的文本版本。

  • 人性的论述 - 大卫·休谟
  • 孙子兵法 - 孙武
  • 化身博士的奇怪案例 - 罗伯特·路易斯·史蒂文森
  • 海底两万里 - 儒勒·凡尔纳

现在我们有了自定义语料库,我们可以设置一个向量数据库来保存嵌入。

设置 MinIO 和向量数据库

我将使用的向量数据库是 Pgvector。Pgvector 是 PostgreSQL 的一个开源扩展,允许用户在数据库中存储、搜索和分析向量数据。这篇文章的 代码下载 包含一个 docker-compose 文件,其中包含 MinIO、Pgvector 和 pgAdmin。在 docker-compose 文件所在的同一目录中运行下面的命令,将以容器的形式启动这三项服务。

docker-compose up -d

还有一个 init.sql 文件(如下所示)。docker-compose 文件将此文件映射到容器的启动目录。这将导致文件中的 SQL 运行,从而在 Postgres 中创建向量扩展和一个名为“embeddings”的表,该表具有 SQL 文件中显示的字段。

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE IF NOT EXISTS embeddings (
id SERIAL PRIMARY KEY,
embedding vector,
text text,
created_at timestamptz DEFAULT now()
);

将嵌入模型保存到 MinIO

我们将使用的嵌入模型来自 Hugging Face 的开源模型。详细信息如下所示。在运行实验时指定特定版本始终是一个好主意。

模型名称:intfloat/multilingual-e5-small 修订版:ffdcc22a9a5c973ef0470385cef91e1ecb461d9f

不要被模型的名称所迷惑。它一点也不小。它有 1.4GB。我们需要下载此模型并将其上传到 MinIO。这是一个一次性设置任务,以便在分布式环境中为实验准备此模型。不幸的是,我们需要的 Hugging Face 函数(snapshot_download)没有 S3 接口,因此我们将使用 MinIO Python SDK 将模型上传到 MinIO。另一个复杂之处在于 Hugging Face 模型不是单个文件。它是一组文件,这些文件会被下载到指定的目录中。我们必须将整个目录上传到 MinIO,并使用 MinIO 路径保留文件夹结构。这使用下面显示的“upload_model_to_minio”函数完成。

from huggingface_hub import snapshot_download

def upload_model_to_minio(bucket_name: str, full_model_name: str, revision: str) -> None:
  '''
  Download a model from Hugging Face and upload it to MinIO. This function will use
  the current systems temp directory to temporarily save the model.
  '''

  # Create a local directory for the model.
  #home = str(Path.home())
  temp_dir = tempfile.gettempdir()
  base_path = f'{temp_dir}{os.sep}hf-models'
  os.makedirs(base_path, exist_ok=True)

  # Get the user name and the model name.
  tmp = full_model_name.split('/')
  user_name = tmp[0]
  model_name = tmp[1]

  # The snapshot_download will use this pattern for the path name.
  model_path_name=f'models--{user_name}--{model_name}'
  # The full path on the local drive.
  full_model_local_path = base_path + os.sep + model_path_name + os.sep + 'snapshots' +
                          os.sep + revision
  # The path used by MinIO.
  full_model_object_path = model_path_name + '/snapshots/' + revision

  print(f'Starting download from HF to {full_model_local_path}.')
  snapshot_download(repo_id=full_model_name, revision=revision, cache_dir=base_path)

  print('Uploading to MinIO.')
  upload_local_directory_to_minio(full_model_local_path, bucket_name,
                                  full_model_object_path)
  shutil.rmtree(full_model_local_path)

运行以下命令将使用此函数将我们的模型上传到名为“hf-models”的存储桶中。

MODELS_BUCKET = 'hf-models'
EMBEDDING_MODEL = 'intfloat/multilingual-e5-small'
EMBEDDING_MODEL_REVISION = 'ffdcc22a9a5c973ef0470385cef91e1ecb461d9f'

upload_model_to_minio(MODELS_BUCKET, EMBEDDING_MODEL, EMBEDDING_MODEL_REVISION)

嵌入函数库

当您使用 Ray Data 等库来分发数据处理时(在本例中,它是文本的块化以及为每个块生成嵌入),您实际上只是在编排执行过程中一项任务的简单函数调用。下面列出了我们从 MinIO 存储桶中的文档列表创建嵌入所需的所有函数,以及它们的參数和返回值。如您所见,我们拥有从文档集合创建嵌入所需的一切。

create_logger() -> logging.Logger

创建一个 Python 日志记录器,用于将调试、信息、错误、警告和严重消息发送到日志记录存储库。

download_model_from_minio(bucket_name: str, full_model_name: str, revision: str) -> str

从 MinIO 下载模型到当前系统的临时目录。一旦它加载到内存中,它就会被删除。

get_document_from_minio(bucket_name: str, object_name: str) -> str

从 MinIO 下载单个文档并将其保存到当前系统的临时目录。

get_object_list(bucket_name: str) -> List[str]

返回指定存储桶中的对象列表。此列表将发送到 Ray Data,Ray Data 会在集群中的所有 Ray 参与者之间均匀地分发它。

save_embeddings_to_vectordb(chunks, embeddings) -> None:

将嵌入以及文本块保存到向量数据库中。

upload_local_directory_to_minio(local_path:str, bucket_name:str , minio_path:str) -> None

将指定本地目录的内容上传到 MinIO,并将文件夹结构保留为指定存储桶中的路径。

upload_model_to_minio(bucket_name: str, full_model_name: str, revision: str) -> None

从 Hugging Face 下载模型到当前系统的临时目录,然后将模型上传到指定的存储桶,保留文件夹结构作为指定存储桶内的路径。

一个简单的嵌入子系统

让我们使用上面的函数并创建一个简单的非分布式脚本。下面的代码将为罗伯特·路易斯·史蒂文森的《化身博士》创建嵌入。

首先,我们需要下载我们要使用的嵌入模型并将其保存到 MinIO。这是一项一次性任务;你不需要每次你想嵌入新的一批模型或运行实验时都这样做。

MODELS_BUCKET = 'hf-models'
EMBEDDING_MODEL = 'intfloat/multilingual-e5-small' # 用于将文本块转换为向量嵌入的嵌入模型。
EMBEDDING_MODEL_REVISION = 'ffdcc22a9a5c973ef0470385cef91e1ecb461d9f'

eu.upload_model_to_minio(MODELS_BUCKET, EMBEDDING_MODEL, EMBEDDING_MODEL_REVISION)

接下来,我们需要从 MinIO 下载我们的模型,实例化它,创建一个分块器(或拆分器),创建嵌入并将它们保存到我们的 pgvector 数据库。

CHUNK_SIZE = 1000         # 将被转换为向量嵌入的文本块大小
CHUNK_OVERLAP = 10
DIMENSION = 384           # 嵌入大小

model_path = eu.download_model_from_minio(MODELS_BUCKET, EMBEDDING_MODEL, 

                                          EMBEDDING_MODEL_REVISION)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
embedding_model = SentenceTransformer(model_path, device=device)
chunker = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, 

                                         chunk_overlap=CHUNK_OVERLAP, length_function=len)

temp_file = eu.get_document_from_minio(BUCKET_NAME, 

                                       'The Strange Case of Dr Jekyll and Mr Hyde.txt')
file = open(temp_file, 'r')
data = file.read()
chunks = chunker.split_text(data)
print('Number of chunks:', len(chunks))
print('Length of the first chunk:', len(chunks[0]))

embeddings = embedding_model.encode(chunks, batch_size=BATCH_SIZE).tolist()
print('Number of embeddings:', len(embeddings))
print('Length of the first embedding:', len(embeddings[0]))

eu.save_embeddings_to_vectordb(chunks, embeddings)

请注意,如果我们有 GPU 可用,则可以使用 GPU。另外,一切都是配置驱动的,因此运行不同的实验只是更改配置以反映您希望运行的实验的问题。 这包括更改您希望使用的嵌入模型。

以下是一个 pgAdmin 的屏幕截图,显示了我们新创建的嵌入。

现在我们已经有了为单个文档创建嵌入的简单脚本,下一步是将此代码迁移到在集群中运行的框架中。 这将允许并行嵌入整个语料库。 我们将使用 Ray Data 来做到这一点。

分布式嵌入子系统

分布式嵌入子系统的第一步是将所有工作放到一个像函数一样工作的类中。 这是使用 Python 的“__call__”内置方法完成的。 (这是 Ray Data 的要求。)我们的类如下所示。

class Embed:

  def __init__(self):
      device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
     
      model_path = eu.download_model_from_minio(MODELS_BUCKET, EMBEDDING_MODEL, 

                                                 EMBEDDING_MODEL_REVISION)
      self.embedding_model = SentenceTransformer(model_path, device=device)
      self.splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, 

                                                      chunk_overlap=CHUNK_OVERLAP, 

                                                      length_function=len)
SentenceTransformer 类使使用嵌入模型变得容易。此外,我们正在使用 RecursiveCharacterTextSplitter 来自 LangChain 的工具来拆分或分块我们的文档。它递归地根据字符列表(我们使用其默认列表)拆分文本,从列表中的第一个字符开始,如果第一个拆分太大,则继续下一个字符。目标是将相关的文本片段保持在一起,保留它们的语义关系。所有这些设置工作仅在创建 Embed 对象时执行一次。我们本可以使用一个简单的函数来分发工作,但这种设置工作必须针对每个批次完成,而这在您需要进行设置工作时不是正确的设计。

接下来,我们需要初始化 Ray 集群。

ray.init(
  #address="ray://ray-cluster-kuberay-head-svc:10001",
  runtime_env={
      "env_vars": {
          "MINIO_URL": MINIO_URL,
          "MINIO_ACCESS_KEY": MINIO_ACCESS_KEY,
          "MINIO_SECRET_KEY": MINIO_SECRET_KEY,
          "MINIO_SECURE": str(MINIO_SECURE),
          "PGVECTOR_HOST": os.environ['PGVECTOR_HOST'],
          "PGVECTOR_DATABASE": os.environ['PGVECTOR_DATABASE'],
          "PGVECTOR_USER": os.environ['PGVECTOR_USER'],
          "PGVECTOR_PASSWORD": os.environ['PGVECTOR_PASSWORD'],
          "PGVECTOR_PORT": os.environ['PGVECTOR_PORT'],
      },
      "pip": [             
          "datasets==2.19.0",
          "huggingface_hub==0.22.2",
          "minio==7.2.7",
          "psycopg2-binary==2.9.9",
          "pyarrow==16.0.0",
          "sentence-transformers==3.0.1",
          "torch==2.3.0",
          "transformers==4.40.1",
      ]
  }
)

在我们的演示中,我们正在创建一个本地 Ray 实例。我没有使用 Kubernetes 集群。这是在迁移到真实集群之前使代码工作的最佳方法。我们还没有创建任何 Ray Actor,但我们正在发送 Ray 配置信息,这些信息告诉 Ray 每个 Actor 需要哪些环境变量和库。

接下来,我们创建一个 Ray 数据集来保存我们想要发送到 Embed 类实例的所有数据,这些实例将在每个 Ray Actor 中运行。在我们的例子中,每个 Ray Actor 将接收存储在 MinIO 中的对象引用列表(文档的路径)。我们将使用我们上面描述的函数库中的 “get_object_list” 函数。从 “ray.data.from_items()” 返回的 Ray 数据集包含逻辑,该逻辑将把此列表转换为较小的批次,以便在启动分布式嵌入过程时将其发送到每个 Actor。

# 嵌入类需要 bucket_name 和 document_name 对 - 因此将 bucket name 添加到列表中的每个条目。
document_list = eu.get_object_list(BUCKET_NAME)
list_for_ray = [[BUCKET_NAME, doc] for doc in document_list]

ray_ds = ray.data.from_items(list_for_ray)
print(type(ray_ds))
print(ray_ds.schema)

我们几乎准备好进行一些分布式计算,但还有一个编码任务需要完成。我们需要将 Ray 数据集映射到我们的 Embed 类,并告诉 Ray 如何为该工作负载设置我们之前初始化的集群。这是使用 Ray 数据集的 “map_batches” 方法完成的。您可以向 “map_batches” 发送函数或可调用类。如果您发送函数,Ray 数据将使用无状态 Ray 任务。对于类,Ray 数据将使用有状态 Ray Actor。

ds_embed = ray_ds.map_batches(
  Embed,
  concurrency=ACTOR_POOL_SIZE,
  batch_size=BATCH_SIZE,  # 最大化 GPU 利用率的大批量大小。
  #num_gpus=1,            # 每个 Actor 1 个 GPU。
  num_cpus=1,             # 每个 Actor 1 个 CPU。
)

请注意,我们正在传入需要为每个 Actor 实例化的 Embed 类。我们还指定了 Actor 的数量、对每个 Actor 的调用的批次大小,以及最后每个 Actor 可访问的 GPU 和 CPU 的数量。map_batches 方法返回另一个 Ray 数据集 (ds_embed),该数据集包含所有 Actor 的所有返回值。这是一个来自 Embed 中 “__call__” 方法的返回值的集合。

最后,我们准备启动分布式嵌入作业。您可能已经注意到,之前的命令运行得非常快。这是因为还没有发生任何计算。Ray 中的转换 (map_batch 被认为是一种转换) 是 “延迟的”。它们只有在您通过迭代数据集、保存数据集或检查数据集的属性来触发数据消耗时才会执行。因此,我们需要向 ds_embed 请求我们 Actor 的返回值。这是在下面完成的。下面的代码段将需要一些时间来运行。

def ray_data_task(ds_embed):
  results = []
  for row in ds_embed.iter_rows():
      documents = row['documents']
      timings = row['timings']
      results.append((documents, timings))
  return results

results = ray_data_task(ds_embed)

results

就是这样。我们完成了。一旦上面的代码完成,您将看到类似于下面显示的输出。

[('A Treatise of Human Nature.txt', 75.08733916282654),
('The Art of War.txt', 21.960258722305298),
('The Strange Case of Dr Jekyll and Mr Hyde.txt', 10.052802085876465),
('Twenty Thousand Leagues under the Sea.txt', 39.24100613594055)]

总结

在这篇文章中,我们构建了一个分布式嵌入子系统,它可以在工程工作站和完全分布式的云原生生产环境中运行。所呈现的代码具有以下直接解决我们在介绍中确定的复杂性和现实世界问题的优势。

  • 可以有效地运行实验,从而可以彻底测试不同的配置选项。
  • 除了配置选项之外,还应尝试解析选项。这将使您能够处理多种文件类型并处理文档中的非文本数据。
  • 当使用此处显示的代码时,您的生产环境将运行工程师用于测试和实验的相同代码。
  • 分布式嵌入子系统可以在集群中运行。集群可以快速扩展以处理需要作为批处理处理的大量文档,并且还可以针对实时工作负载进行扩展。
  • 本文中介绍的代码封装了向量数据库调用,使工程师可以交换不同的产品。
  • MinIO 是生成式 AI 的最佳存储解决方案。正如我们在本文中看到的那样,嵌入模型和文档必须存储在 高速、可扩展的存储解决方案中。

如果您有任何问题,请务必在 Slack 上联系我们!

加入我们的 Slack 加入我们的 Slack