使用 Kafka 和 MinIO 的 AI 数据工作流

AI Data Workflows with Kafka and MinIO

MinIO 企业对象存储 是创建和执行复杂数据工作流的基础组件。此事件驱动功能的核心是使用 Kafka 的 MinIO 存储桶通知。MinIO 企业对象存储会为所有 HTTP 请求(如 PUTPOSTCOPYDELETEGETHEADCompleteMultipartUpload)生成事件通知。您可以使用这些通知来触发相应的应用程序、脚本和 Lambda 函数,以便在对象上传触发事件通知后采取操作。

事件通知为多个微服务之间的交互和协作提供了一种松耦合范式。在此范式中,微服务不会彼此直接调用,而是使用事件通知进行通信。发送通知后,发送服务可以返回其任务,而接收服务则采取行动。这种隔离级别使代码更易于维护——更改一项服务不需要更改其他服务,因为它们通过通知而不是直接调用进行通信。

有几个用例依赖于 MinIO 企业对象存储事件通知来执行数据工作流。例如,我们可以使用存储在 MinIO 企业对象存储中的对象的原始数据来运行 AI/ML 管道。

  • 处理数据的管道将在添加原始对象时触发。
  • 基于添加的对象,模型将运行。
  • 最终模型可以保存到 MinIO 企业对象存储中的存储桶中,然后其他应用程序可以将其用作最终产品。

构建工作流

我们将使用 MinIO 企业对象存储和 Kafka 为一个假设的图像调整大小应用程序构建一个示例工作流。它基本上接收传入的图像并根据某些应用程序规范调整其大小,然后将其保存到另一个存储桶中,以便可以提供服务。在现实世界中,这可能用于调整图像大小并使其可供移动应用程序使用,或者只是调整图像大小以减轻动态调整图像大小时对资源造成的压力。

它有几个组件,Kafka 和 MinIO 企业对象存储一起用于支持此复杂的工作流。

  • MinIO 企业对象存储,生产者:传入的原始对象存储在 MinIO 企业对象存储中。每次添加对象时,它都会向 Kafka 发送一条消息以代理特定主题。
  • Kafka,代理:代理维护队列的状态,存储传入的消息,并使其可供使用者使用。
  • MinIO 企业对象存储,消费者:消费者将实时读取队列中的这些消息,处理原始数据,并将其上传到 MinIO 企业对象存储存储桶。

MinIO 企业对象存储是所有这些的基础,因为它在此工作流中既是生产者又是消费者。

使用 Kubernetes 集群

我们需要一个 Kubernetes 集群来运行我们的服务。您可以使用任何 Kubernetes 集群,但在此示例中,我们将使用 kind 集群。如果尚未安装 Kind,请按照此处的 快速入门指南 获取说明。使用以下 kind 集群配置来构建一个简单的单主多工作节点 Kubernetes 集群。

将此 yaml 保存为 kind-config.yaml

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker

启动集群(这可能需要几分钟)

$ kind create cluster --name minio-kafka --config kind-config.yaml

… [截断]


设置 kubectl 上下文 "kind-minio-kafka"
现在 可以 使用 您的 集群,方法如下:

kubectl cluster-info --context kind-minio-kafka


… [截断]

验证集群是否已启动

$ kubectl get no
名称                        状态   角色           年龄   版本
minio-kafka-control-plane   就绪    控制平面   43s   v1.24.0
minio-kafka-worker          就绪    <none>          21s   v1.24.0
minio-kafka-worker2         就绪    <none>          21s   v1.24.0
minio-kafka-worker3         就绪    <none>          21s   v1.24.0

安装 Kafka

在运行 Kafka 之前,需要先运行一些支持服务。这些服务包括:

  • Certmanager
  • Zookeeper

让我们在 Kubernetes 集群中安装 cert-manager

$ kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.6.2/cert-manager.yaml

检查状态以验证 cert-manager 资源是否已创建。

$ kubectl get ns
NAME                 STATUS   AGE
cert-manager         Active   6s
default              Active   20m
kube-node-lease      Active   20m
kube-public          Active   20m
kube-system          Active   20m
local-path-storage   Active   20m

$ kubectl get po -n cert-manager
NAME                                       READY   STATUS    RESTARTS   AGE
cert-manager-74f9fd7fb6-kqhsq              1/1     Running   0          14s
cert-manager-cainjector-67977b8fcc-k49gj   1/1     Running   0          14s
cert-manager-webhook-7ff8d87f4-wg94l       1/1     Running   0          14s

使用 Helm Chart 安装 Zookeeper。如果您尚未安装 Helm,可以按照 Helm 文档 中提供的安装指南进行操作。

$ helm repo add pravega https://charts.pravega.io
"pravega" 添加 您的 存储库

$ helm repo update

$ helm install zookeeper-operator --namespace=zookeeper --create-namespace pravega/zookeeper-operator

… [已截断]


$ kubectl --namespace zookeeper create -f - <<EOF
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
    name: zookeeper
    namespace: zookeeper
spec:
    replicas: 1
EOF

您应该会看到类似于以下的输出,这意味着集群创建正在进行中。

zookeepercluster.zookeeper.pravega.io/zookeeper created

验证 Zookeeper 运算符和集群 Pod 是否都在运行。

$ kubectl -n zookeeper get po
NAME                                  READY   STATUS    RESTARTS   AGE
zookeeper-0                           1/1     Running   0          31s
zookeeper-operator-5857967dcc-kfxxt   1/1     Running   0          3m4s

现在我们已经完成了所有前提条件,让我们安装实际的 Kafka 集群组件。

Kafka 有一个名为 **Koperator** 的运算符,我们将使用它来管理我们的 Kafka 安装。Kafka 集群启动大约需要 4-5 分钟。

$ kubectl create --validate=false -f https://github.com/banzaicloud/koperator/releases/download/v0.21.2/kafka-operator.crds.yaml

$ helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com/


$ helm repo update
… [已截断]

$ helm install kafka-operator --namespace=kafka --create-namespace banzaicloud-stable/kafka-operator

… [已截断]

$ kubectl create -n kafka -f https://raw.githubusercontent.com/banzaicloud/koperator/master/config/samples/simplekafkacluster.yaml

运行 kubectl -n kafka get po 以确认 Kafka 已启动。Kafka 需要几分钟才能运行。请等待,然后再继续。

配置 Kafka 主题

在 MinIO 中配置之前,让我们先配置主题;主题是先决条件。

创建一个名为 my-topic 的主题

$ kubectl apply -n kafka -f - <<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
    name: my-topic
spec:
    clusterRef:
        name: kafka
    name: my-topic
    partitions: 1
    replicationFactor: 1
    config:
        "retention.ms": "604800000"
        "cleanup.policy": "delete"
EOF

它应该返回以下输出。如果未返回,则主题创建不成功。如果创建不成功,请等待几分钟,Kafka 集群上线后再次运行。

kafkatopic.kafka.banzaicloud.io/my-topic created

接下来的几个步骤需要一个 Kafka pod 的 IP 和端口。

获取 IP 的方法:

$ kubectl -n kafka describe po kafka-0- | grep -i IP:
IP:           10.244.1.5
  IP:           10.244.1.5

注意:您的 IP 会有所不同,可能与上面显示的不一致。

我们感兴趣的端口有几个:

$ kubectl -n kafka get po kafka-0- -o yaml | grep -iA1 containerport
    - containerPort: 29092
      name: tcp-internal
--
    - containerPort: 29093
      name: tcp-controller


… [已截断]

  • Tcp-internal 29092:当您作为消费者处理传入 Kafka 集群的消息时,会使用此端口。
  • Tcp-controller 29093:当生产者(如 MinIO)想要向 Kafka 集群发送消息时,会使用此端口。

这些 IP 和端口在您自己的设置中可能会更改,因此请确保获取集群的正确值。

安装 MinIO

我们将在与其他资源相同的 Kubernetes 集群中,在其自己的命名空间中安装 MinIO。

获取 MinIO 仓库:

$ git clone https://github.com/minio/operator.git

应用资源以安装 MinIO:

$ kubectl apply -k operator/resources


$ kubectl apply -k operator/examples/kustomization/tenant-lite

验证 MinIO 是否已启动并运行。您可以获取 MinIO 控制台的端口,在本例中为 9443

$ kubectl -n tenant-lite get svc | grep -i console

storage-lite-console             ClusterIP   10.96.0.215     <none>        9443/TCP   6m53s

设置 Kubernetes 端口转发:这里我们为主机选择了端口 `39443`,但这可以是任何端口,只需确保在通过 Web 浏览器访问控制台时使用相同的端口。

$ kubectl -n tenant-lite port-forward svc/storage-lite-console 39443:9443


从 127.0.0.1:39443 转发到 9443

从 [::1]:39443 转发到 9443

使用以下凭据通过 Web 浏览器访问

URL:`https://localhost:39443`

用户:`minio`

密码:`minio123`

配置 MinIO 生产者

我们将配置 MinIO 使用 mc 管理工具将事件发送到我们之前创建的 Kafka 集群中的 my-topic。

我在这里启动了一个 Ubuntu Pod,以便我有一个干净的工作区来进行操作,更重要的是,我将能够访问集群中的所有 Pod,而无需为每个单独的服务进行 `port-forward`。

$ kubectl apply -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
  name: ubuntu
  labels:
    app: ubuntu
spec:
  containers:
  - image: ubuntu
    command:
      - "sleep"
      - "604800"
    imagePullPolicy: IfNotPresent
    name: ubuntu
  restartPolicy: Always
EOF

进入 Ubuntu Pod 以确保它已启动

$ kubectl exec -it ubuntu -- /bin/bash


root@ubuntu:/#

如果您看到任何以 `root@ubuntu:/` 为前缀的命令,则表示它正在此 ubuntu Pod 内部运行。

使用以下命令获取 `mc` 二进制文件并进行安装

root@ubuntu:/# apt-get update
apt-get -y install wget
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
mv mc /usr/local/bin/

验证安装是否正确

root@ubuntu:/# mc --version


mc version RELEASE.2022-08-05T08-01-28Z (commit-id=351d021b924b4d19f1eb716b9e2bd74644c402d8)

运行时:go1.18.5 linux/amd64

版权所有 (c) 2015-2022 MinIO, Inc.

许可证 GNU AGPLv3 <https://gnu.ac.cn/licenses/agpl-3.0.html>

配置mc管理员以使用我们的 MinIO 集群

  • mc alias set <别名> <minio_租户URL> <minio_用户名> <minio_密码>

在我们的例子中,这将转换为

root@ubuntu:/# mc alias set myminio https://minio.tenant-lite.svc.cluster.local minio minio123


已成功添加 `myminio`。

通过运行以下命令验证配置是否按预期工作;您应该看到类似于 8 个驱动器在线,0 个驱动器离线 的内容

root@ubuntu:/# mc admin info myminio


… [已截断]

存储池:
  1st, 擦除集: 1, 磁盘 每个 擦除集: 8

8 个驱动器 联机, 0 个驱动器 脱机

通过mc admin在 MinIO 中设置 Kafka 配置。您需要使用以下命令进行自定义

root@ubuntu:/# mc admin config set myminio \
notify_kafka:1 \
brokers="10.244.1.5:29093" \
topic="my-topic" \
tls_skip_verify="off" \
queue_dir="" \
queue_limit="0" \
sasl="off" \
sasl_password="" \
sasl_username="" \
tls_client_auth="0" \
tls="off" \
client_tls_cert="" \
client_tls_key="" \
version="" --insecure

以下几个配置需要特别注意

  • brokers="10.244.1.5:29093": 这些是 Kafka 服务器,格式为 server1:port1,server2:port2,serverN:portN。注意:如果您决定指定多个 Kafka 服务器,则需要提供所有服务器的 IP 地址;如果只提供部分列表,配置将失败。您可以只指定单个服务器,但缺点是如果该服务器宕机,则配置将无法感知集群中的其他 Kafka 服务器。如前所述,Kafka 有两个端口:TCP-internal 29092TCP-controller 29093。由于我们正在将 MinIO 配置为 Producer,因此我们将使用 29093
  • topic="my-topic": 主题名称应与我们在 Kafka 集群中之前创建的主题相匹配。提醒一下,MinIO 不会自动创建此主题;它必须预先存在。
  • notify_kafka:1: 这是用于稍后实际添加事件的配置名称。

请访问我们的 文档,以获取有关这些参数的更多详细信息。

如果配置成功,您应该会看到以下输出

已成功 应用 设置。

然后,根据需要重新启动 admin 服务

root@ubuntu:/# mc admin service restart myminio


已成功发送重启命令到 `myminio`。按 Ctrl-C 退出或等待查看重启过程状态。

....

`myminio` 已在 2 秒内成功重启

在 MinIO 中创建一个名为 images 的存储桶。原始对象将存储在此处。

root@ubuntu:/# mc mb myminio/images --insecure


存储桶 `myminio/images` 创建成功。

我们希望将发送到队列的消息限制为仅 .jpg 图片;如有需要,可以扩展此限制,例如,如果希望根据其他文件扩展名(如 .png)触发消息。

root@ubuntu:/# mc event add  myminio/images arn:minio:sqs::1:kafka --suffix .jpg


已成功 添加 arn:minio:sqs::1:kafka


# 验证是否已正确添加
root@ubuntu:/# mc event list myminio/images


arn:minio:sqs::1:kafka   s3:ObjectCreated:*,s3:ObjectRemoved:*,s3:ObjectAccessed:*   过滤器: suffix=".jpg"

有关如何使用 MinIO 配置 Kafka 的更多详细信息,请访问我们的 文档

构建 MinIO 消费者

如果我们确实有一个脚本来使用 MinIO 生成的这些事件并对这些对象执行某些操作,那就太好了。那么为什么不这样做呢?这样,我们就有了工作流程的整体视图。

在仍然登录到我们的 ubuntu pod 的情况下,安装我们的脚本运行所需的 `python3` 和 `python3-pip`。由于这是 Ubuntu minimal,我们还需要 `vim` 来编辑我们的脚本。

root@ubuntu:/# apt-get -y install python3 python3-pip vim

对于我们的 Python 消费者脚本,我们需要通过 `pip` 安装一些 Python 包。

root@ubuntu:/# pip3 install minio kafka-python

正在收集 minio

  正在下载 minio-7.1.11-py3-none-any.whl (76 KB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 76.1/76.1 KB 4.1 MB/s eta 0:00:00

正在收集 kafka-python

  正在下载 kafka_python-2.0.2-py2.py3-none-any.whl (246 KB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 246.5/246.5 KB 8.6 MB/s eta 0:00:00

正在收集 certifi

  正在下载 certifi-2022.6.15-py3-none-any.whl (160 KB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 160.2/160.2 KB 11.6 MB/s eta 0:00:00

正在收集 urllib3

  正在下载 urllib3-1.26.11-py2.py3-none-any.whl (139 KB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 139.9/139.9 KB 18.4 MB/s eta 0:00:00

正在安装收集的包:kafka-python、urllib3、certifi、minio

成功安装 certifi-2022.6.15 kafka-python-2.0.2 minio-7.1.11 urllib3-1.26.11

如果您看到以上消息,则表示我们已成功安装脚本所需的依赖项。

我们将在此处显示整个脚本,然后引导您了解正在使用的不同组件。现在将此脚本保存为 `minio_consumer.py`

from minio import Minio
import urllib3

from kafka import KafkaConsumer
import json

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "minio.tenant-lite.svc.cluster.local",
  "minio_username": "minio",
  "minio_password": "minio123",
  "kafka_servers""10.244.1.5:29092",
  "kafka_topic":    "my-topic", # This needs to be created manually
}

# Since we are using self-signed certs we need to disable TLS verification
http_client = urllib3.PoolManager(cert_reqs='CERT_NONE')
urllib3.disable_warnings()

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=True,
              access_key=config["minio_username"],
              secret_key=config["minio_password"],
              http_client = http_client
              )

# Create destination bucket if it does not exist
if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

# Initialize Kafka consumer
consumer = KafkaConsumer(
  bootstrap_servers=config["kafka_servers"],
  value_deserializer = lambda v: json.loads(v.decode('ascii'))
)

consumer.subscribe(topics=config["kafka_topic"])

try:
  print("Ctrl+C to stop Consumer\n")
  for message in consumer:
    message_from_topic = message.value

    request_type = message_from_topic["EventName"]
    bucket_name, object_path = message_from_topic["Key"].split("/", 1)

    # Only process the request if a new object is created via PUT
    if request_type == "s3:ObjectCreated:Put":
      minio_client.fget_object(bucket_name, object_path, object_path)
     
      print("- Doing some pseudo image resizing or ML processing on %s" % object_path)


      minio_client.fput_object(config["dest_bucket"], object_path, object_path)

      print("- 已将处理后的对象 '%s' 上传到目标存储桶 '%s'" % (object_path, config["dest_bucket"]))

except KeyboardInterrupt

  print("\n消费者已停止。")

  • 我们将导入之前在流程中安装的 pip 包

from minio import Minio
import urllib3

from kafka import KafkaConsumer
import json

  • 为了避免每次修改代码中的参数,我们在这个配置字典中列出了一些常用的可配置参数。

config = {
  "dest_bucket":    "processed", # 此存储桶将自动创建
  "minio_endpoint": "minio.tenant-lite.svc.cluster.local",
  "minio_username": "minio",
  "minio_password": "minio123",
  "kafka_servers""10.244.1.5:29092",
  "kafka_topic":    "my-topic", # 此主题需要手动创建

  • 我们启动的 MinIO 集群使用的是自签名证书。尝试连接时,我们需要确保它接受自签名证书。

http_client = urllib3.PoolManager(cert_reqs='CERT_NONE')
urllib3.disable_warnings()

  • 我们将检查用于存储处理后数据的目标存储桶是否存在;如果不存在,我们将创建它。

if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("目标存储桶 '%s' 已创建" % (config["dest_bucket"]))

  • 配置要连接的 Kafka 代理以及要订阅的主题。

consumer = KafkaConsumer(
  bootstrap_servers=config["kafka_servers"],
  value_deserializer = lambda v: json.loads(v.decode('ascii'))
)

consumer.subscribe(topics=config["kafka_topic"])

  • 当您停止消费者时,通常会输出堆栈跟踪,因为消费者旨在持续运行并消费消息。此代码将允许我们干净地退出消费者。

try:
  print("Ctrl+C to stop Consumer\n")


… [截断]


except KeyboardInterrupt:
  print("\nConsumer stopped.")

如前所述,我们将持续等待,监听主题上的新消息。一旦我们获得一个主题,我们将将其分解成三个部分。

  • request_type:HTTP请求类型:GET、PUT、HEAD。
  • bucket_name:添加新对象的存储桶名称。
  • object_path:在存储桶中添加对象的完整路径。

  for message in consumer:
    message_from_topic = message.value

    request_type = message_from_topic["EventName"]
    bucket_name, object_path = message_from_topic["Key"].split("/", 1)

  • 每次您发出任何请求时,MinIO都会向主题添加一条消息,该消息将由我们的`minio_consumer.py`脚本读取。因此,为了避免无限循环,让我们仅在添加新对象时进行处理,在本例中为请求类型PUT。

if request_type == "s3:ObjectCreated:Put":
      minio_client.fget_object(bucket_name, object_path, object_path)

  • 这是您添加自定义代码以构建机器学习模型、调整图像大小和处理ETL/ELT作业的地方。

      print("- Doing some pseudo image resizing or ML processing on %s" % object_path)

  • 处理完对象后,它将上传到我们之前配置的目标存储桶。如果存储桶不存在,我们的脚本将自动创建它。

minio_client.fput_object(config["dest_bucket"], object_path, object_path)
      print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))

就是这样。除了少量样板代码,我们主要做了两件事

  • 监听 Kafka 主题上的消息
  • 将对象放入 MinIO 存储桶中

脚本并不完美——您需要添加一些额外的错误处理,但它非常简单易懂。其余部分您可以使用自己的代码库进行修改。更多详细信息,请访问我们的 MinIO Python SDK 文档

消费 MinIO 事件

我们已经构建了它,现在让我们看看它的实际运行效果。创建两个终端

  • 终端 1 (T1):运行 minio_consumer.py 的 Ubuntu Pod
  • 终端 2 (T2):带有 mc 的 Ubuntu Pod。

打开 T1 并运行我们之前编写的 minio_consumer.py 脚本,使用 python3。如果在任何时候您想退出脚本,您可以键入 Ctrl+C

root@ubuntu:/# python3 minio_consumer.py


Ctrl+C 停止消费者

现在让我们打开 T2 并使用 mc 将一些对象放入我们之前创建的 MinIO images 存储桶中。

首先创建一个测试对象

root@ubuntu:/# touch rose.jpg
root@ubuntu:/# echo "a" > rose.jpg

将测试对象上传到 images 存储桶中的几个不同路径

root@ubuntu:/# mc cp rose.jpg myminio/images --insecure


/rose.jpg:            2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 55 B/s 0s


root@ubuntu:/# mc cp rose.jpg myminio/images/deeper/path/rose.jpg --insecure


/rose.jpg:            2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 63 B/s 0s

在我们的另一个终端 T1 中,MinIO 消费者脚本正在运行,您应该会看到一些类似于以下的消息

root@ubuntu:/# python3 minio_consumer.py


… [截断]


- 对 rose.jpg 执行一些伪图像调整大小或机器学习处理

- 已将处理后的对象 'rose.jpg' 上传到目标存储桶 'processed'

- 对 deeper/path/rose.jpg 执行一些伪图像调整大小或机器学习处理

- 已将处理后的对象 'deeper/path/rose.jpg' 上传到目标存储桶 'processed'

我们应该验证处理后的对象是否也已上传到 processed 存储桶

root@ubuntu:/# mc ls myminio/processed

[2022-08-12 01:03:46 UTC]     2B STANDARD rose.jpg


root@ubuntu:/# mc ls myminio/processed/deeper/path

[2022-08-12 01:09:04 UTC]     2B STANDARD rose.jpg

如您所见,我们已成功地将对象从未处理的原始数据上传到处理后的存储桶。

使用通知在 MinIO Enterprise 对象存储上构建工作流

我们在这里展示的只是一个示例,说明您可以使用此工作流实现什么。通过利用 Kafka 的持久消息传递和 MinIO Enterprise 对象存储的弹性存储,您可以构建复杂的人工智能应用程序,这些应用程序由能够 扩展 并跟上工作负载的架构支持,例如

  • 机器学习模型
  • 图像调整大小
  • 处理 ETL/ELT 作业

不过,不要仅仅相信我们的说法——自己动手构建它。您可以 在此处 下载 MinIO Enterprise 对象存储,并可以 在此处 加入我们的 Slack 频道。