使用 Ray Train 和 MinIO 进行分布式训练

Distributed Training with Ray Train and MinIO

大多数机器学习项目从单线程概念验证开始,其中每个任务必须在下一个任务开始之前完成。下面描述的单线程 ML 管道就是一个例子。

但是,在某些时候,你会发现上面的管道无法满足你的需求。这可能是由于数据集不再适合单个进程的内存。当你的模型变得复杂,实验需要数小时甚至数天才能完成时,你也会发现这个管道无法满足你的需求。为了解决这些问题,你可以将工作分布到多个进程并添加并行性。要完全分布上面的管道,需要进行两个修改

1. 分布式数据处理 

2. 分布式模型训练。

在单线程管道中,数据处理(或者在机器学习领域通常称为数据预处理)通常通过将整个数据集加载到内存中并在将数据传递给模型进行训练之前对其进行转换来完成。但是,一旦你想要利用分布式训练技术,这应该发生变化。数据预处理可以在训练循环中增量完成。在我关于 Ray Data 的上一篇文章中,我展示了如何使用可以映射到预处理任务或 Actor 的 Ray 数据集来分布式数据预处理。具体来说,我展示了如何创建 Ray 数据集并将其映射到处理任务。然后,我们通过迭代数据集并观察每个迭代执行的预处理任务来测试该数据集。此外,我还展示了如何使用 MinIO 作为数据集的来源,该数据集太大而无法完全加载到内存中。这需要查询 MinIO 以获取用于训练的对象列表,并将对象检索编码到我们的预处理任务中。如果你还没有阅读这篇文章,请现在快速阅读一下。 

在这篇文章中,我将展示如何实现完全分布式 ML 训练管道所需的第二个修改。我将展示如何分布式模型训练并在训练循环中使用映射的数据集。在运行需要很长时间才能完成的训练函数时,最好在每个 epoch 之后对模型进行检查点。检查点也是你检索最终完全训练好的模型的方式。这篇文章将向你展示如何将模型检查点保存到 MinIO。

可以在这里找到包含这篇文章中所有代码的完整运行示例。

可视化分布式 ML 管道

在实现分布式管道时,可视化将要构建的内容会有所帮助。下图是完整的分布式管道的可视化,包括分布式预处理,这在我的上一篇文章中已介绍。在介绍分布式模型训练所需的编码任务时,请参考此图。

上面的图表可能看起来很奇怪,因为用于预处理的任务(或 Actor)是从负责分布式训练的 Worker 调用。但是,请记住我上面的简短描述 - 预处理任务映射到数据集,并且在我们从训练循环中迭代数据集时调用这些任务。

在我们开始编码之前,让我们更深入地了解 Ray Worker。

Ray Worker

Ray Worker 是一个进程,它执行包含你的训练逻辑的 Python 函数。此函数通常包含用于你的损失函数和优化器的设置代码,然后是训练模型的 epoch 循环。Ray Train 通过在集群中创建多个 Worker 来分布式模型训练。在生产环境中,你的集群可以是 Kubernetes 集群。在开发机器上,Ray Train 将为每个 Worker 创建一个进程。Worker 的数量决定了并行程度,它是可配置的。

第一个编码任务是创建一个能够在 Ray Worker 中运行的训练函数。

创建分布式训练函数

下面显示了一个使用 Ray Train(和 Ray Data)的训练函数。我突出显示了用于促进分布式训练的 Ray 函数的使用。令人惊讶的是,与在单线程中运行的函数相比,这需要最小的代码更改。 

def train_func_per_worker(training_parameters):
 
  # 训练模型并记录训练指标。
  model = tu.MNISTModel(training_parameters['input_size'], 

                         training_parameters['hidden_sizes'],
                        training_parameters['output_size'])
  model = ray.train.torch.prepare_model(model)

  # 获取训练 Worker 的数据集切片。
  train_data_shard = train.get_dataset_shard('train')

  loss_func = nn.NLLLoss()
  optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'], 

                         momentum=training_parameters['momentum'])

  metrics = {}
  batch_size_per_worker = training_parameters['batch_size_per_worker']
  for epoch in range(training_parameters['epochs']):
      total_loss = 0
      batch_count = 0
      for batch in train_data_shard.iter_torch_batches(batch_size=batch_size_per_worker):
          # Get the images and labels from the batch.
          images, labels = batch['X'], batch['y']
          labels = labels.type(torch.LongTensor)   # casting to long
          images, labels = images.to(device), labels.to(device)

          # Flatten MNIST images into a 784 long vector.
          images = images.view(images.shape[0], -1)
     
          # Training pass
          optimizer.zero_grad()          
          output = model(images)

          loss = loss_func(output, labels)
         
          # This is where the model learns by backpropagating
          loss.backward()
         
          # And optimizes its weights here
          optimizer.step()
         
          total_loss += loss.item()
          batch_count += 1

      metrics = {'training_loss': total_loss/batch_count}
      checkpoint = None
      if train.get_context().get_world_rank() == 0:
          temp_dir = os.getcwd()
          torch.save(model.module.state_dict(), os.path.join(temp_dir, 'mnist_model.pt'))
          checkpoint = Checkpoint.from_directory(temp_dir)
      train.report(metrics, checkpoint=checkpoint)

此函数中第一次使用 ray 框架是通过使用 ray.train.torch.prepare_model() 函数来准备模型进行分布式训练。此函数创建了一个新的模型,该模型能够与在其他 Worker 中以相同方式创建的模型同步梯度和参数。

接下来,上面的函数使用 train.get_dataset_shard() 函数获取数据切片。你使用的 Worker 越多,此切片就越小 - 你的训练速度就越快。最后,使用数据集切片的 iter_torch_batches() 方法返回 PyTorch 张量批次。如果你的 Worker 意外失败,请尝试使用较小的 batch_size。在单台机器上运行分布式训练时,内存不足错误很常见。它们也可能是由于使用过多 Worker 造成的。

总之,每个 Worker 在启动分布式训练作业时都会执行此训练函数。每个 Worker 将创建一个模型副本,该副本可以与其他 Worker 的模型同步,并且每个 Worker 将获得他们自己的数据切片。

启动分布式 Worker

我们将使用下面的函数来创建本地集群并使用所需数量的 worker 进行配置。请注意,当我初始化 Ray 时,我必须告诉它安装 MinIO SDK。Ray 将自动安装自身和 PyTorch。如果您正在使用任何其他库,请将它们添加到 pip 列表中。Ray Train 初始化后,此函数将检索我在上一篇文章中介绍的 Ray 数据集。 

分布式训练的设置使用 TorchTrainer 类完成 - 它的构造函数传递了我们之前编写的训练函数、训练参数、训练数据集、一个缩放配置对象和一个运行配置对象。

def distributed_training(training_parameters, num_workers: int, use_gpu: bool):
  logger = du.create_logger()

 
  logger.info('Initializing Ray.')
  initialize_ray()

  train_data, test_data, load_time_sec = du.get_ray_dataset(training_parameters)

  # 缩放配置
  scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)

  # 初始化 Ray TorchTrainer
  start_time = time()
  trainer = TorchTrainer(
      train_loop_per_worker=train_func_per_worker,
      train_loop_config=training_parameters,
      datasets={'train': train_data},
      scaling_config=scaling_config,
      run_config=get_minio_run_config()
  )
  result = trainer.fit()
  training_time_sec = (time()-start_time)

  logger.info(result)
  logger.info(f'Load Time (in seconds) = {load_time_sec}')
  logger.info(f'Training Time (in seconds) = {training_time_sec}')
 
  model = tu.MNISTModel(training_parameters['input_size'], 

                         training_parameters['hidden_sizes'], 

                         training_parameters['output_size'])
  with result.checkpoint.as_directory() as checkpoint_dir:
      model.load_state_dict(torch.load(os.path.join(checkpoint_dir, "model.pt")))
  tu.test_model(model, test_data)
 
  ray.shutdown()

缩放配置对象(从 ScaleConfig 类创建)告诉 Ray Train 我们想要多少个 worker 以及是否要使用 GPU。run_config 参数决定 Ray Train 将把指标和检查点发送到哪里。我将在后面的部分展示如何使用 run_config 将数据发送到 MinIO。

一旦所有这些信息都设置在 TorchTrainer 对象中,就可以调用它的 fit() 方法,Ray Train 将创建 worker 并在 worker 中运行训练函数。此方法将在所有 worker 完成之前阻塞。返回值将是一个字典,包含训练的指标和检查点。

让我们更详细地讨论指标和检查点。

报告指标和检查点

在训练函数的底部,您将看到下面的代码片段。调用 train.report() 并传入您的指标和检查点,是 Ray Train 将这些信息从 worker 发送回您的控制代码的方式。

metrics = {'training_loss': total_loss/batch_count}
checkpoint = None
if train.get_context().get_world_rank() == 0:
  temp_dir = os.path.join(os.getcwd(), 'checkpoint')
  torch.save(model.module.state_dict(), os.path.join(temp_dir, 'mnist_model.pt'))
  checkpoint = Checkpoint.from_directory(temp_dir)
train.report(metrics, checkpoint=checkpoint)

此代码仅从“世界排名”为零的 worker 创建检查点。“世界排名”仅仅是识别 worker 的一种方式。请记住,每个 worker 的模型在训练过程中都会与其他 worker 同步。因此,让每个 worker 序列化模型并返回它是一种浪费资源的行为。模型也可能很大,因此不必要的检查点会浪费存储空间。

来自 train.report() 的数据将位于 train.fit() 的返回值中。当您打印这些数据时(下面的示例),您只会看到来自一个 worker 的信息,而且这些数据只会是最后一次 epoch 发送的值。这对于检查点来说是合理的,因为大多数情况下您只需要模型的最终状态。然而,对于指标来说,这很不方便。我喜欢查看每个 epoch 的损失值,以确定是否需要更多 epoch 来提高精度,或者是否已经超过了最佳损失值。

Result(
  metrics={'loss_0_4': 0.9962198138237},
  path='/Users/keithpij/ray_results/TorchTrainer_2023-12-14_08-43-53/TorchTrainer_d3b70_00000_0_2023-12-14_08-43-56',
  filesystem='local',
  checkpoint=Checkpoint(filesystem=local, path=/Users/keithpij/ray_results/TorchTrainer_2023-12-14_08-43-53/TorchTrainer_d3b70_00000_0_2023-12-14_08-43-56/checkpoint_000004)
)

上面示例输出中显示的检查点是已序列化到临时目录的模型的引用。让我们看看如何将检查点发送到 MinIO。

将检查点发送到 MinIO

Ray Train 运行会生成一个包含报告指标、检查点和其他工件的历史记录。其中一些工件是 Ray Train 日志,可以帮助您追踪问题。默认情况下,这些信息会保存在临时目录中。但是,您可以将其配置为保存在 MinIO 中。这可以通过之前显示的 TorchTrainer 对象的 run_config 参数来实现。以下函数将生成一个运行配置,该配置将把运行过程中创建的信息发送到 MinIO。train.RunConfig() 函数的 storage_path 参数是一个 MinIO 存储桶。

def get_minio_run_config():
  import s3fs
  import pyarrow.fs

  s3_fs = s3fs.S3FileSystem(
      key = os.environ['MINIO_ACCESS_KEY'],
      secret = os.environ['MINIO_SECRET_ACCESS_KEY'],
      endpoint_url = os.environ['MINIO_URL']
  )
  custom_fs = pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(s3_fs))

  run_config = train.RunConfig(storage_path='ray-train', storage_filesystem=custom_fs)
  return run_config

以下代码片段展示了在创建 TorchTrainer 对象时如何使用此函数。

trainer = TorchTrainer(
  train_loop_per_worker=train_func_per_worker,
  train_loop_config=training_parameters,
  datasets={'train': train_data},
  scaling_config=scaling_config,
  run_config=get_minio_run_config()  # train.RunConfig(storage_path=os.getcwd(), name="ray_experiments")
  )
  result = trainer.fit()

从检查点加载模型

最后我们将从检查点加载一个训练过的模型。如果要在生产环境中提供模型,您需要执行此操作。或者,您也可以使用模型在训练期间没有看到的测试数据来测试模型。在本帖的代码示例中,我使用测试集测试了模型。

result = trainer.fit()
 
model = tu.MNISTModel(training_parameters['input_size'], 

                      training_parameters['hidden_sizes'], 

                      training_parameters['output_size'])

with result.checkpoint.as_directory() as checkpoint_dir:
  model.load_state_dict(torch.load(os.path.join(checkpoint_dir, "model.pt")))
tu.test_model(model, test_data)

摘要

在这篇文章中,我完成了之前文章中开始的内容,展示了如何使用 Ray Data 分发在训练模型之前需要进行的任何预处理。我展示了如何分发模型的训练。我还展示了如何配置 Ray Train 将指标和检查点发送到 MinIO。最后,我展示了如何加载检查点,以便您可以测试和部署模型。

示例代码可以通过替换数据访问函数和模型用作分布式训练项目的模板。如果您想进一步讨论,请随时联系我们,我们的 general Slack channelhello@min.io