使用 Apache Nifi 的 MinIO 事件通知

Apache Nifi 是当今最流行的开源数据流引擎之一。Nifi 支持几乎所有主要的企业数据系统,并允许用户创建有效、快速且可扩展的信息流系统。使用 Nifi 创建数据流系统很简单,并且有一个明确的路径来添加对尚未作为 Nifi 处理器提供的系统的支持。所有这些都推动了 Nifi 的大规模采用。
几个 MinIO 客户在他们的用例中利用 Nifi。这些客户正在利用 MinIO 构建高性能数据湖,通常将多个不同的数据集合成在一起。Nifi 允许这些客户将这些数据路由到相关的最终消费者。
一种常见的模式是使用 Nifi 中的 MinIO 对象元数据来创建自定义流。
具体用例可能有所不同,例如,有人可能希望对上传到 MinIO 的 csv
和 json
文件进行不同的处理,另一些人可能希望隔离 jpg,png
和 pdf
文件,还有些人可能希望仅将 json
文件转换为 parquet
并将其存储回 MinIO - 等等。
在这篇文章中,我将解释如何设置 Nifi 以监听 MinIO 事件通知。然后,我们将看到如何通过 Nifi 处理器解析 MinIO 事件 json
。然后,我们将从事件 json
中过滤用户定义的元数据标头。最后,我们将看到如何在事件 json
中存在或不存在标头的情况下采取下一步操作。
先决条件
- 正在运行的 MinIO 服务器,并已使用 mc 配置 配置事件通知。
- 正在运行的 Apache Nifi,以及访问 Nifi GUI 的权限。
启动用于 Webhook 的 Nifi 处理器
我们将使用 MinIO Webhook 事件通知将 Apache Nifi 配置为事件目标。为此,首先在 Nifi GUI 中创建一个 ListenHTTP
处理器。然后配置它以监听特定端口。请参阅下面的处理器详细信息

配置 MinIO 事件通知
创建处理器后,为我们刚刚创建的 webhook 服务器配置 MinIO 事件通知。
mc mb myminio/source
mc admin config set myminio notify_webhook:nifi endpoint=http://localhost:8086/contentListener
mc admin service restart myminio
mc event add myminio/source arn:minio:sqs::nifi:webhook --event put
在这里,我们配置 MinIO,以便在桶 source
上发生 put
事件时,将通知发送到 http://localhost:8086/contentListener
。
Nifi ListenHTTP
处理器正在等待 http://localhost:8086/contentListener
上的事件,如上一步中配置的那样。
添加 EvaluateJsonPath 处理器
现在 MinIO 和 Nifi 之间已经建立了通信,下一步是使用 EvaluateJsonPath
Nifi 处理器。我们使用它来解析 MinIO 事件通知 json 有效负载,并识别它是否包含某个用户定义的元数据标头。
如果存在标头 X-Amz-Meta-key1
,我们将继续执行下一步,否则我们将在此处丢弃流。我们还获取对象和桶名称,以便将其传递到下一步。
这是数据流中的关键步骤。
在我的示例中,我查找标头 X-Amz-Meta-key1
。您可以在此处调整元数据字段以适合您的用例。

最后一步
如果 EvaluateJsonPath
Nifi 处理器找到了我们正在查找的标头,我们将转到下一步。在本例中,我选择从 MinIO 获取对象。我们使用 FetchS3Object
处理器来执行此操作。
当然,您可以根据您的具体用例在此处使用其他处理器。

为了完整起见,如果对象获取成功,我们将文件保存在本地驱动器上,否则我们将记录错误。完整的流程如下所示

结论
客户不可避免地会面临数据流挑战,而 Apache Nifi 已成为解决此类挑战的流行选择。我们在 MinIO 越来越多地看到 Nifi 被用作数据流编排器来构建快速、可扩展且有效的管道的用例。
在这篇文章中,我们看到了如何使用事件通知和内置 Nifi 处理器来构建基于 MinIO 和 Nifi 的数据流系统。我们看到了数据流如何检查事件通知有效负载并识别是否存在某个标头。根据此过滤器事件的结果,我们添加了另一个步骤来从 MinIO 获取对象并将其保存在本地。
自己试试。如果您还没有 MinIO,您可以从这里下载。如果您需要一些帮助,请查看我们的文档。您也可以查看我们的公共 Slack 频道。