利用 Kafka Schema Registry 和 MinIO 充分发挥流式处理的优势

现代数据湖/湖仓通过流式数据进行填充,成为许多企业寻求控制其数据并将其应用于解决业务问题的重点。数据驱动的企业需要对昨天和今天的自己有深刻的了解,才能在明天蓬勃发展。云原生事件流的最新技术是 Apache Kafka,它使用 MinIO 等对象存储端点。
MinIO 是 Kafka 架构的理想补充,因为它结合了可扩展性和性能。这些属性使架构师或开发人员能够在其流式数据湖/湖仓之上构建任何他们想要的东西,而 S3 API 意味着他们可以使用云原生分析和 AI/ML 框架为其应用程序提供动力。MinIO 充分利用底层硬件(请参阅 选择适用于 MinIO 部署的最佳硬件)以提供最大的性能——我们已 对它进行基准测试,在 GET 上达到 325 GiB/s(349 GB/s),在 PUT 上达到 165 GiB/s(177 GB/s),仅使用 32 个节点的现成 NVMe SSD。 擦除编码 使 MinIO 成为一个持久且弹性的分布式对象存储解决方案,而丰富的 企业集成 意味着您的数据湖存储可以无缝地融入现有基础设施。
Apache Kafka 是流式数据架构的关键元素。从最基本的意义上说,Kafka 是一个分布式事件流平台,由一组称为代理的进程组成。生产者将事件发送到代理,代理会根据时间保留事件,从而使消费者能够异步读取和处理事件。我们在 如何设置 Kafka 并将数据流传输到 Kubernetes 中的 MinIO 中进行了更详细的介绍,其中我们向您展示了如何使用 Kafka 连接器将事件直接流传输到 MinIO。
该帖子解释了流传输数据的最简单方法,因此您可以快速启动并运行,但请务必记住,对于涉及大量工作负载的生产使用案例而言,它可能不够高效且性能不足。前面提供的快速且简陋示例的一个缺点是它缺乏自动验证数据、演进流模式和添加其他下游消费者的方式。这些都不会成为开发或测试环境中的问题,但在生产环境中会带来重大挑战。当您添加多个 Kafka 开发人员,每个开发人员对流进行自己的更改时,这些问题会从轻微问题变为重大问题。结果可能相当严重:当 Kafka 主题更改模式时——添加新列、删除现有列或修改给定列的数据类型——消费者可能没有意识到这些更改,可能导致数据损坏。
在拥有多个开发人员和众多最终用户的企业环境中,主题很容易崩溃。当主题崩溃时,数据将停止流入数据湖。团队必须停止他们正在做的事情,并开始进行故障排除。他们必须找出原始模式的编写原因以及哪个后续更改导致了它崩溃,然后手动更新模式。同样,这在开发/测试中是可以的,但在生产环境中是完全不可接受的。
问题变为——生产级架构是什么样的,才能确保与 Kafka 主题相关的流没有破坏性更改?这是本文其余部分的重点。
Kafka Schema Registry 来救援
Kafka Schema Registry 是 Apache Kafka 生态系统中的一个组件,它为 Kafka 生产者和消费者提供集中式模式管理服务。它允许生产者为他们生产的数据注册模式,并允许消费者检索和使用这些模式来进行数据验证和反序列化。Schema Registry 有助于确保通过 Kafka 交换的数据符合预定义的模式,从而实现跨不同系统和应用程序的数据一致性、兼容性和演进。
使用 Avro 或其他模式格式时,必须仔细管理模式并对其进行演进。Kafka Schema Registry 通过对每个模式进行版本控制并将新模式与以前版本进行比较,来启用模式兼容性检查。所需的兼容性类型(向后兼容、向前兼容、完全兼容、无兼容性等)决定了 Kafka Schema Registry 如何评估每个新模式。未通过兼容性检查的新模式将从服务中删除。
使用 Kafka Schema Registry 的一些主要优势包括
- 模式演进:随着数据格式和要求随着时间的推移而演进,生产者和消费者通常需要对其数据模式进行更改。Kafka Schema Registry 提供对模式演进的支持,允许生产者注册模式的新版本,同时保持与现有消费者的兼容性。消费者可以检索适合反序列化的适当模式版本,确保即使在发生模式更改时也能正确处理数据。
- 数据验证:Kafka Schema Registry 通过允许生产者使用预定义的数据类型、字段名称和其他约束来注册模式,从而实现数据验证。然后,消费者可以检索并使用这些模式来验证传入的数据,确保数据符合预期的结构和格式。这有助于防止数据处理错误并提高数据质量。
- 模式管理:Kafka Schema Registry 为管理模式提供了一个集中式存储库,从而更容易跟踪、版本控制和管理更改。生产者和消费者可以通过简单的 API 注册、检索和管理模式,从而实现集中式的模式治理和管理。
- 互操作性:Kafka Schema Registry 通过提供一种标准化的方式来定义和管理数据模式,从而促进不同生产者和消费者之间的互操作性。用不同编程语言编写或使用不同序列化框架的生产者和消费者可以使用一个共同的模式注册表来确保整个生态系统中的数据一致性和兼容性。
- 向后兼容和向前兼容:Kafka Schema Registry 允许生产者注册向后兼容和向前兼容的模式,从而实现对数据模式的平滑升级和更改,而不会中断现有的生产者和消费者。向后兼容性确保旧的消费者仍然可以处理使用新模式生成的数据,而向前兼容性允许新的消费者处理使用旧模式生成的数据。
Strimzi Operator 尚未提供 Schema Registry,因此我们将使用 Confluent Helm 存储库中可用的那个。
在本博文中,我们将执行以下操作
- 使用 Helm 图表设置 Kafka Schema Registry
- 创建和部署一个使用 Apache Avro 模式并发送事件的示例生产者
- 构建一个具有 Avro 依赖项的 KafkaConnect 容器
- 使用上述容器部署 KafkaConnect
- 部署一个 Kafka 连接器,它从 Kafka Schema Registry 读取模式,从生产者消费主题事件并将数据存储到 MinIO 中的 Parquet 格式
设置 Schema Registry
我们将使用以下命令克隆 Confluent Helm 存储库
使用以下命令使用 Helm 图表安装 Schema Registry,我们需要提供已部署的现有 Kafka 集群的引导服务器端点,才能成功安装
您可以通过检查日志来查看 Schema Registry 是否已启动并运行,如下所示
创建 Avro 主题
接下来,我们将为 Kafka 主题 nyc-avro-topic
创建一个 YAML 文件并应用它。您可以找到示例代码 here。
带有 Avro 架构的生产者
我们将创建一个简单的 Python 生产者,然后使用 Kafka 架构注册表注册 Avro 架构,并发送 Kafka 主题事件。这将基于我们在之前博客文章中已经拥有的生产者。您可以找到示例代码 here。
然后,添加我们将构建 Docker 镜像所需的依赖项和 Dockerfile (code).
使用上面的 Dockerfile 构建并推送生产者的 Docker 镜像到您的 Docker 注册表,或者您可以使用 Openlake 中提供的 openlake/kafka-demo-avro-producer 镜像。
让我们创建一个 YAML 文件,该文件将生产者在 Kubernetes 集群中作为作业部署 (code)
部署 avro-producer.yaml
文件
您可以使用以下命令检查日志
构建 Kafka Connect 镜像
让我们构建一个 Kafka Connect 镜像,其中包含 S3 和 Avro 依赖项 (code)
使用上面的 Dockerfile 构建并推送生产者的 Docker 镜像到您的 Docker 注册表,或者您可以使用 MinIO Openlake 存储库中提供的 镜像。
在我们部署 KafkaConnect
之前,我们需要先创建存储主题(如果尚未存在),以便 KafkaConnect
按预期工作。
部署 Kafka Connect
创建一个 Kafka Connect 的 YAML 文件,该文件使用上面的镜像,并在 Kubernetes 中部署它。KafkaConnect 将拥有 1 个副本,并使用我们在之前 博客文章 中创建的存储主题。您可以找到示例代码 here。
注意:spec.template.connectContainer.env
中定义了凭据,以便 KafkaConnect 将数据存储到我们的 MinIO 集群中。其他细节,例如 endpoint_url
和 bucket_name
将是 KafkaConnector.key.converter
的一部分,value.converter
指向 AvroConverter (io.confluent.connect.avro.AvroConverter
)
部署 Kafka Sink 连接器
现在我们已经启动并运行了 Kafka Connect,下一步是部署 Sink 连接器,它将轮询 nyc-avro-topic
并将数据存储到 MinIO 存储桶 openlake-tmp
中,格式为 Parquet。让我们来看看配置
connector.class
- 指定 Sink 连接器将使用的连接器类型。在本例中,它是 io.confluent.connect.s3.S3SinkConnector
store.url
- 您要将 KafkaConnect 中的数据存储到的 MinIO 端点 URL
storage.class
- 指定要使用的存储类。在本例中,由于我们存储在 MinIO 中,因此将使用 io.confluent.connect.s3.storage.S3Storage
format.class
- 数据将以何种格式存储到 MinIO 中,由于我们想存储 Parquet,因此将使用 io.confluent.connect.s3.format.parquet.ParquetFormat
实现
value.converter
- 由于我们想将二进制数据转换为 Avro,因此将使用 io.confluent.connect.avro.AvroConverter
parquet.codec
- 指定我们想要用于 Parquet 文件的压缩类型,在本例中,我们将使用 snappy
schema.registry.url
- 指定连接器可以从中拉取、验证架构和反序列化生产者数据的端点
您可以在这里找到示例代码,然后应用它。
如果一切顺利,我们很快就会看到通过执行以下命令将文件添加到 Minio openlake-tmp 存储桶中
我们目前的设置比我们在如何在 Kubernetes 中设置 Kafka 并将数据流式传输到 MinIO中之前的基本设置快得多、更健壮,并且存储效率更高。您可以尝试同时运行生产者和连接器,以查看性能和内存利用率的差异。
现在,我们拥有一个端到端设置,可以有效地使用 Avro 模式生成数据 Kafka 主题,并将其直接以 Parquet 格式消费到 MinIO 中。
实验性:Iceberg
最近,getindata
为 Kafka 添加了 Iceberg 连接器支持,您可以在存储库getindata/kafka-connect-iceberg-sink中找到它。下面我们将探讨如何将nyc-avro-topic
数据直接存储为 Iceberg 表到 MinIO 中。根据我们的测试,我们认为这仍然是实验性的,尚未准备好投入生产,但请尝试一下,以了解未来的情况。
Iceberg Kafka Connect
让我们创建一个具有 Iceberg 依赖项的 KafkaConnect。在部署之前,请确保编辑spec.config.build.output.image
和spec.config.build.output.pushSecret
以指向您的 Docker 仓库。(代码)
然后部署我们新的 KafkaConnect CRD
部署 Iceberg Sink 连接器
现在我们已经部署了 Iceberg KafkaConnect,让我们部署 KafkaConnector,它将直接将 Iceberg 表存储到 MinIO 中。Iceberg 支持不同的目录类型,因此您需要选择满足您需求的目录。有三种目录可用作连接器,您将在下面找到示例配置。
Hadoop Iceberg Sink 连接器
此示例显示了如何使用Hadoop 目录
在 MinIO 中创建和维护 Iceberg 表。
Hive Iceberg Sink 连接器
此示例显示了如何使用Hive 目录
在 MinIO 中创建和维护 Iceberg 表。
注意:iceberg.uri
、iceberg.catalog-impl
、iceberg.table-default.write.data.path
、iceberg.table-default.write.metadata.path
是 Iceberg Hive 目录工作所需的。
Nessie Iceberg Sink 连接器
此示例显示了如何使用Nessie 目录
在 MinIO 中创建和维护 Iceberg 表。
注意:iceberg.uri
、iceberg.ref
、iceberg.catalog-impl
是使 Iceberg Nessie 目录与 MinIO 一起工作所需的关键更改。
使用以下任何命令部署您选择的 Iceberg 目录的 KafkaConnector,默认情况下,Hadoop 目录已在下面启用。
Kafka 和 Iceberg 与 MinIO
这篇博文向您展示了如何构建一个生产级端到端架构,将数据从 Kafka 直接流式传输到 MinIO,作为 Iceberg 表。如前所述,用于 Kafka 的 Iceberg 连接器是实验性的,根据我们最初的实验,它尚未准备好投入生产。这可能会很快改变,因为正在积极开发中。如果您已经设置了 Spark 并希望找到一个用于将 Iceberg 表存储在 MinIO 中的生产就绪解决方案,您可以探索Spark 流式传输。
Kafka 和 MinIO 都是软件定义的,为流式数据湖提供了一个可移植的多云解决方案。它们可以在任何地方运行 - 本地、公有/私有云和边缘 - 为支持云原生分析和 AI/ML 应用程序的事件流架构提供基础,无论它们在哪里。现在您可以自由地构建您想要的任何东西,无论您在哪里。
立即下载 MinIO,开始构建您的云原生数据湖。