使用 MLRun 和 MinIO 进行模型训练和 MLOps

Model Training and MLOps using MLRun and MinIO

我之前关于 MLRun 的帖子 中,我们使用 docker-compose 文件创建了 MLRun UI、MLRun API 服务、Nuclio、MinIO 和 Jupyter 服务的容器,搭建了开发环境。 容器启动后,我们运行了一个简单的烟雾测试来确保一切正常。 我们创建了一个简单的无服务器函数,该函数记录其输入并连接到 MinIO 获取存储桶列表。

以下是我们将构建的内容和 docker-compose 文件内容的示意图。 我扩展了 MLRun 文档中提供的 docker-compose 文件,使其包含 API 服务和 MinIO。 本文将接着上一篇博文继续介绍。 我们将创建一个另一个无服务器函数来训练模型。 记住,MLRun 的目标是消除对样板代码的需求,因此我们也应该看到训练模型本身所需的代码减少。 如果你想一想训练模型时编写的代码——尤其是在 PyTorch 中——无论用于训练模型的数据和模型本身是什么,它基本上都是一样的。 换句话说,它是样板代码。 从理论上讲,它可以用一些额外的配置来代替。 此外,部署代码本身应该很简单。 在上一篇博文中,我们已经看到了一些示例。 但是,部署驱动模型训练的函数稍微复杂一些——因此我们需要更多 MLRun 功能来实现这一点。 本文随附的代码(下一节中描述)包含允许使用无法放入内存的数据集训练模型的实用程序。 我在这里不会详细介绍这些技术——那是另一篇博文的内容。

关于代码下载

本文中展示的所有代码(以及更多)都在 这里。 为了简洁起见,代码下载中的许多实用程序函数没有在本文的代码清单中显示。 但是,许多是用于将 PyTorch 与 MinIO 集成的通用实用程序。 在本节中,我将列出我希望您在使用 MinIO 存储数据集的所有 ML 项目中都能找到有用的重要实用程序。

MinIO 实用程序 (data_utilities.py)

  • get_bucket_list - 返回 MinIO 中的存储桶列表。
  • get_image_from_minio - 从 MinIO 返回图像。 使用 PIL 将对象转换回其原始格式。
  • get_object_from_minio - 从 MinIO 返回对象——按原样。
  • get_object_list - 从存储桶获取对象列表。
  • image_to_byte_stream - 将 PIL 图像转换为字节流,以便可以将其发送到 MinIO。
  • load_mnist_to_minio - 此函数将 MNIST 数据集加载到 MinIO 存储桶中。 每个图像都被发送为一个单独的对象。 此方法对于在集群中进行性能测试很有用,因为它会创建许多小图像。
  • put_image_to_minio - 将图像字节流作为对象放到 MinIO 中。

PyTorch 实用程序 (torch_utilities.py)

  • create_minio_data_loaders - 创建一个 PyTorch 数据加载器,其中包含 MinIO 存储桶中 MNIST 对象的列表。 有助于模拟无法放入内存的数据集的模型训练测试。
  • create_memory_loaders - 创建一个 PyTorch 数据加载器,其中 MNIST 图像加载到内存中。
  • MNISTModel - 本文使用的模型。
  • ConvNet - 这是读者可以尝试的另一个模型——它为 MNIST 数据集创建卷积神经网络 (CNN)。

将现有代码迁移到 MLRun

让我们看看如何将现有代码迁移到 MLRun 中作为无服务器函数运行。 如果你有很多模型的训练代码,但没有时间更改代码以利用 MLRun 的所有功能,那么这可能是一种方法。 这是迭代迁移到 MLRun 的一种可行方法——让所有 ML 代码由 MLRun 管理——然后添加代码以利用其他功能。

考虑下面的代码,它在 MNIST 数据集上训练模型。(为了简洁起见,省略了导入和日志记录代码。 代码下载 包含所有导入和日志记录。)它是一个脚本,你可以从任何终端应用程序运行。 “train_model” 函数检索数据,创建模型,然后将控制权转交给 “epoch_loop” 函数进行训练。

def train_model(loader_type: str='memory', bucket_name: str=None, training_parameters: Dict=None):

    # 加载数据并记录加载指标。
    if loader_type == 'memory':
        train_loader, test_loader, _ = tu.create_memory_data_loaders(training_parameters['batch_size'])
    elif loader_type == 'minio-by-batch':
        train_loader, test_loader, _ = tu.create_minio_data_loaders(bucket_name,
                                              training_parameters['batch_size'])
    else:
        raise Exception('未知的加载器类型。必须是 "memory" 或 "minio-by-batch"')
   
    # 创建模型。
    model = tu.MNISTModel(training_parameters['input_size'],

                          training_parameters['hidden_sizes'],
                          training_parameters['output_size'])

    # Train the model.
    start_time = time()
    epoch_loop(model, train_loader, training_parameters)
    training_time_sec = (time()-start_time)

    # Test the model and log the accuracy as a metric.
    testing_metrics = tu.test_model_local(model, test_loader, training_parameters['device'])

   
def epoch_loop(model: nn.Module, loader: DataLoader, training_parameters: Dict[str, Any]) -> Dict[str, Any]:

    # Create the loss and optimizer functions.
    loss_func = nn.NLLLoss()
    optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'],
                          momentum=training_parameters['momentum'])

    # Epoch loop
    for epoch in range(training_parameters['epochs']):
        total_loss = 0
        for images, labels in loader:
            # Move tensors to the specified device.
            images = images.to(training_parameters['device'])
            labels = labels.to(training_parameters['device'])
           
            # Flatten MNIST images into a 784 long vector.
            images = images.view(images.shape[0], -1)
       
            # Training pass
            optimizer.zero_grad()
            output = model(images)
            loss = loss_func(output, labels)
           
            # Backward pass
            loss.backward()
           
            # And optimizes its weights here
            optimizer.step()
           
            total_loss += loss.item()

        print("Epoch {} - Training loss: {}".format(epoch+1, total_loss/len(loader)))


if __name__ == "__main__":

    # training configuration
    training_parameters = {
        'batch_size': 32,
        'device': torch.device('cuda:0' if torch.cuda.is_available() else 'cpu'),
        'dropout_input': 0.2,
        'dropout_hidden': 0.5,
        'epochs': 2,
        'input_size': 784,
        'hidden_sizes': [1024, 1024, 1024, 1024],
        'lr': 0.025,
        'momentum': 0.5,
        'output_size': 10,
        'smoke_test_size': -1
        }

    train_model(loader_type='memory', bucket_name='mnist'

                training_parameters=training_parameters)

如果我们想在 MLRun 中将此函数作为无服务器函数运行,我们只需要用 “mlrun.handler()” 装饰器装饰 “train_model()” 函数,如下所示。

@mlrun.handler()       
def train_model(loader_type: str='memory', bucket_name: str=None, training_parameters: Dict=None) - > None:
    .....

接下来,我们需要设置 MLRun 环境,创建一个项目,将训练函数注册到 MLRun,并运行函数。我们将从运行在我们 docker-compose 部署中的 Jupyter Notebook 中驱动此操作。下面显示了为此操作所需的单元格。(您可以在 代码下载 中的 mnist_training_setup.ipynb 笔记本中找到此代码。)在此简单演示中使用的超参数是硬编码的;但是,如果您正在构建一个用于生产的模型,则应该使用超参数搜索来找到最佳值。

import os
import mlrun

# 设置环境:
mlrun.set_environment(env_file='mlrun.env')

# 创建项目:
project_name='mnist-training'
project_dir = os.path.abspath('./')
project = mlrun.get_or_create_project(project_name, project_dir, user_project=False)

# 创建训练函数。
trainer = project.set_function(
    "mnist_training_with_mlrun.py", name="trainer", kind="job",
    image="mlrun/mlrun",
    requirements=["minio", "torch", "torchvision"],
    handler="train_model_with_mlrun"
)

# 超参数
training_parameters = {
    'batch_size': 32,
    'device': 'cpu',
    'dropout_input': 0.2,
    'dropout_hidden': 0.5,
    'epochs': 2,
    'input_size': 784,
    'hidden_sizes': [1024, 1024, 1024, 1024],
    'lr': 0.025,
    'momentum': 0.5,
    'output_size': 10,
    'smoke_test_size': -1
    }

# 运行函数。
trainer_run = project.run_function(
    "trainer",
    inputs={"bucket_name": "mnist", "loader_type": "memory"},
    params={"training_parameters": training_parameters},
    local=True
)

这就是在 MLRun 中运行现有代码所需做的全部操作。但是,如果你仔细查看 epoch_loop() 函数,你会发现它是一个模板代码。几乎每个 PyTorch 项目都有一个类似的函数,无论模型是什么,或者用于训练模型的数据是什么。让我们来看看如何使用 MLRun 来删除这个函数。

优化训练代码

我们可以使用 MLRun 的 mlrun_torch.train() 函数来移除上面显示的调用 epoch_loop 函数。该函数的导入以及修改后的 train_model() 函数如下所示。一个“accuracy()”函数也传递给 mlrun_torch.train()。

import mlrun.frameworks.pytorch as mlrun_torch

@mlrun.handler()
def train_model_with_mlrun(context: mlrun.MLClientCtx, loader_type: str='memory', bucket_name: str=None, training_parameters: Dict=None):
  logger = du.create_logger()
  logger.info(loader_type)
  logger.info(bucket_name)
  logger.info(training_parameters)

  # Load the data and log loading metrics.
  if loader_type == 'memory':
      train_loader, test_loader, _ = tu.create_memory_data_loaders(training_parameters['batch_size'])
  elif loader_type == 'minio-by-batch':
      train_loader, test_loader, _ = tu.create_minio_data_loaders(bucket_name, training_parameters['batch_size'])
  else:
      raise Exception('Unknown loader type. Must be "memory" or "minio-by-batch"')
 
  # Train the model and log training metrics.
  logger.info('Creating the model.')  
  model = tu.MNISTModel(training_parameters['input_size'], training_parameters['hidden_sizes'], training_parameters['output_size'])

  loss_func = nn.NLLLoss()
  optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'], momentum=training_parameters['momentum'])

  # Train the model.
  logger.info('Training the model.')  
  mlrun_torch.train(
      model=model,
      training_set=train_loader,
      loss_function=loss_func,
      optimizer=optimizer,
      validation_set=test_loader,
      metric_functions=[accuracy],
      epochs=training_parameters['epochs'],
      custom_objects_map={"torch_utilities.py": "MNISTModel"},
      custom_objects_directory=os.getcwd(),
      context=context,
  )


def accuracy(y_pred, y_true):
  ps = torch.exp(y_pred)
  pred_label = ps.argmax(1)
  return (sum(pred_label == y_true) / y_true.size()[0]).item()

在迁移你的代码以使用该函数时,有几点需要记住。

  • 首先,如果你在“epoch_loop()”函数中声明了你的损失函数和你的优化器,那么你需要将这些声明移动,因为它们必须传递给“mlrun_torch.train()”。
  • 其次,如果你在你的 epoch 循环中执行任何转换,你将需要将它们移动到你的数据处理逻辑中,或者,更确切地说,是一个数据管道。如果你是执行图像增强和特征工程,那么这一点尤其重要。
  • 最后,为了让你的模型针对它在训练期间没有见过的数据集进行测试,你只需要提供一个 accuracy() 函数,如上所示。

虽然该函数减少了你必须编写的代码,但它最大的好处是指标跟踪和工件管理。让我们快速浏览一下 MLRun UI,看看从我们的第一个完全管理的运行中保存了什么,我们训练了一个模型。

查看运行

MLRun UI 的主页显示所有项目的列表。对于每个项目,你可以看到失败和正在运行的任务数量。

深入你的项目,你将看到更多关于你的项目的细节。

点击一个特定的任务,将把你带到最新的运行。在那里,你可以查看参数、输入、工件、结果和代码记录的任何输出。

总结和下一步

在这篇文章中,我从我上一次关于设置 MLRun 的帖子开始。我展示了如何使用 MLRun 来托管现有的模型代码,并且只需要进行最小的更改。但是,利用 MLRun 的跟踪功能的最佳方法是让 MLRun 管理你的模型的训练。

虽然将现有的代码迁移到 MLRun 是可能的,但这项技术并没有完全利用 MLRun 的自动化跟踪功能。更好的方法是使用 MLRun 的“mlrun_torch.train()”函数。这使得 MLRun 可以完全管理训练 - 工件、输入参数和指标将被记录下来。

如果你已经使用 MLRun 到此,请考虑接下来使用分布式训练和大型语言模型。

如果你有任何问题,请务必在Slack上与我们联系!MLOps Live Slack 工作空间的“mlrun”频道也是一个很棒的地方,如果你遇到问题,可以在这里获得帮助。