Spark 结构化流与 Kafka 和 MinIO

Apache Kafka 是领先的开源分布式事件流平台,用于构建数据管道、流式分析、数据集成和应用程序。企业喜欢 Kafka 是因为它具有高可用性、高吞吐量和可扩展性。他们也喜欢使用 Apache Spark 处理数据和构建分析,因为它速度快、分布式且容错。Spark 经过多年的发展,增加了诸如 Spark SQL 之类的新功能,这是一个用于使用关系查询进行结构化数据处理的模块。Spark 结构化流式处理构建在 Spark SQL API 之上,用于数据流处理。将 Kafka 与 Spark 结构化流式处理相结合,使开发人员能够像在静态数据上编写批处理计算一样表达流式计算。
Kafka 和 Spark 结构化流式处理一起用于构建由流数据馈送的数据湖/湖仓,并提供实时业务洞察。为数据湖馈送数据只是等式的一部分——为了最大程度地利用数据湖,底层对象存储必须具有高可用性、高性能、可扩展性和 API 驱动。
MinIO 是流数据湖的绝佳选择。行业领先的 S3 API 兼容性让开发人员确信他们可以使用自己的自定义软件、云原生分析或 AI/ML 而不存在问题。MinIO 充分利用底层硬件(参见为您的 MinIO 部署选择最佳硬件)以提供尽可能高的性能——我们已对它进行了基准测试,在 GET 操作上达到 325 GiB/s(349 GB/s),在 PUT 操作上达到 165 GiB/s(177 GB/s),仅使用 32 个现成的 NVMe SSD 节点。
Spark 流式处理
Apache Spark Streaming 是一个强大且可扩展的流处理框架,它是更大的 Apache Spark 生态系统的一部分。它提供了一个高级且表达力强的 API,用于处理来自各种来源(如 Kafka、Flume、Kinesis 或自定义来源)的数据流,并支持广泛的用例,包括实时分析、机器学习、欺诈检测等。Spark Streaming 遵循微批处理模型,其中传入数据被分成基于时间的小批次,并行处理,并将结果聚合以生成最终输出。Spark Streaming 提供了强大的容错、可靠性和恰好一次处理语义保证,使其成为构建可扩展且强大的流处理应用程序的流行选择。
Spark 结构化流式处理
Spark 结构化流式处理是 Apache Spark 生态系统中一个较新的补充,它提供了一个更高级且统一的流处理 API,该 API 基于“连续处理”的概念。Spark 结构化流式处理扩展了熟悉的 DataFrame 和 Dataset API(用于 Spark 中的批处理处理),以无缝支持流数据处理。它提供了一个更高级别的抽象来处理数据流,允许开发人员编写类似于批处理代码的流处理代码,使其更直观且用户友好。Spark 结构化流式处理提供了高级功能,例如对容错、事件时间处理和状态管理的内置支持,使其成为构建可扩展、可靠和复杂流处理应用程序的强大且便捷的选择。Spark 结构化流式处理还与更大的 Apache Spark 生态系统紧密集成,从而能够与其他 Spark 模块无缝集成,以实现跨批处理和流数据的端到端数据处理管道。
结构化流式处理是满足现代流处理需求的最佳选择,因为
- 它是一个真正的流模型,具有连续处理功能,而不是 Spark Streaming 的微批处理模型。
- 它通过利用 Spark SQL 拥有更丰富的 API 和流处理功能集。
- 它具有更强的容错和一致性保证。通过检查点和恢复来防止数据丢失。
- 它支持基于事件时间的处理和跨时间的推理数据。
- 与较低级别的 DStreams API 相比,该 API 更高级别且更易于使用。
结构化流式处理是 Spark 流处理的未来,并且正在不断投资和改进。因此,对于任何新的流应用程序,都强烈建议从结构化流式处理开始。
在这篇博文中,我们将探讨如何使用 Spark 结构化流式处理处理从 Kafka 流式传输的事件。我们还将探讨MinIO 检查点并展示它提供的显著性能改进。我们还将探讨如何将 Kafka 中的事件数据直接保存为 Iceberg 表到 MinIO 中。
如果您尚未设置 Kafka,请查看这篇博文。如果您尚未设置 Kafka 主题 nyc-avro-topic
并使用 avro-producer
生成事件,请参阅充分利用 Kafka Schema Registry 和 MinIO 进行流式传输。
PySpark 结构化流式处理应用程序
下面我们将编写一个简单的 PySpark 应用程序,该应用程序将持续不断地从 Kafka 中的主题 nyc-avro-topic
流式传输事件,并处理每个记录并将其保存为 Parquet
文件到 MinIO 中。
注意:我们假设 Kafka、Kafka Schema Registry、Kafka 主题 nyc-avro-topic
和 Avro 生产者已启动并正在运行。您可以参考充分利用 Kafka Schema Registry 和 MinIO 进行流式传输以获取详细说明。
您可以在此处找到示例代码。
在上面的代码中
load_config
包含 Spark 连接到 MinIO 以读取和写入数据所需的所有 Hadoop 配置。
value_schema_dict
包含 Spark 用于从 Kafka 反序列化数据的 Avro 架构。
配置 Spark 流
让我们深入了解如何配置 Spark 流
stream_df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092") \
.option("subscribe", "nyc-avro-topic") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("mode", "PERMISSIVE") \
.option("truncate", False) \
.option("newRows", 100000) \
.load()
在上面的代码片段中,我们将 Spark 流格式指定为 kafka
,并且我们包含的一些关键选项包括
kafka.bootstrap.servers
- 用于使用事件的 Kafka 服务器端点
subscribe
- 逗号分隔的字符串列表,指向 Spark 流将处理的主题。在我们的例子中,它是 nyc-avro-topic
。
startingOffsets
- 指定 Spark 流应从何时开始处理已订阅主题的事件。在我们的例子中,它将是 earliest
,即已订阅主题的开始时间
预处理 Avro 数据流
在我们的案例中,由于我们使用的是 Confluent Avro 数据流,所以在使用 Spark 消费数据之前,我们需要对字节流进行预处理。前 6 位包含 Avro 模式元数据,Kafka 中的 Confluent API(消费者/连接器)会使用这些元数据。在我们的案例中,我们可以跳过 Spark 流处理中的这些位。
将 Parquet 数据写入 MinIO
我们在 Spark 中打开一个写入流,将预处理后的数据框 taxi_df
以 parquet
格式写入选项中指定的 MinIO 的 path
中,我们还提供了 checkpointLocation
,这是一个 MinIO 存储桶,Spark 将在其中持续创建检查点。如果作业失败,Spark 将根据检查点从中断的地方继续执行。
构建您自己的镜像,或使用 Docker Hub 上的 openlake/sparkjob-demo:3.3.2
镜像,该镜像包含上述代码。
您可以在此处找到示例代码。
应用它
注意:由于我们的 Kafka 消费者大约运行 3 小时才能流式传输所有 112M
行,因此当生产者和消费者同时启动时,Spark 结构化流处理也将花费接近相同的时间。
性能基准测试
我们测量了仅由 Spark 结构化流消费者发出的 S3 API 调用次数,并在下面记录了结果。
API RX TX CALLS ERRORS
s3.CopyObject 6.7 MiB 4.5 MiB 20220 0
s3.DeleteMultipleObjects 11 MiB 3.0 MiB 26938 0
s3.DeleteObject 9.9 MiB 0 B 39922 0
s3.GetObject 1.8 MiB 496 MiB 6736 0
s3.HeadObject 84 MiB 0 B 336680 0
s3.ListObjectsV2 60 MiB 1.6 GiB 241903 0
s3.PutObject 2.3 GiB 0 B 26975 0
Summary:
Total: 699374 CALLS, 2.5 GiB RX, 2.1 GiB TX - in 11999.80s
从上表可以看出,我们大约向 MinIO 端点发起了 700K 次调用。我们可以对消费者代码进行简单的更改来优化这一点。如果我们在消费者代码中添加 1 分钟
延迟触发器,而不是持续轮询 Kafka 中的新事件,我们可以显著减少 API 调用的总数。以下是优化的代码。
这里需要注意的关键点是 .trigger(processingTime='1 minute')
,它会在每次轮询 Kafka 事件之前添加 1 分钟
的延迟。以下是经过优化的代码。
API RX TX CALLS ERRORS
s3.CopyObject 207 KiB 139 KiB 614 0
s3.DeleteMultipleObjects 335 KiB 92 KiB 812 0
s3.DeleteObject 235 KiB 0 B 921 0
s3.GetObject 54 KiB 469 KiB 199 0
s3.HeadObject 2.5 MiB 0 B 9867 0
s3.ListObjectsV2 1.7 MiB 12 MiB 6910 0
s3.PutObject 2.0 GiB 0 B 814 0
Summary:
Total: 20137 CALLS, 2.0 GiB RX, 13 MiB TX - in 12126.59s
从上表可以看出,我们已将 API 调用次数从 ~700K
减少到 ~20k
。只需添加一行简单的代码更改,我们就能对 S3 API 调用次数产生重大影响。
MinIO 检查点管理器
上述优化是一个巨大的改进。如果我们在版本化的存储桶中运行相同的代码,并在所有行存储到 MinIO 后执行 mc ls --versions --recursive opl/warehouse-v/k8 --summarize
,您仍然会注意到带有 v1
和 v2
的对象,其中 v2
是应该已删除的对象的删除标记。随着使用者继续添加记录,删除标记对象会不断累积并浪费存储空间,这可能会随着时间的推移而成为问题。
引入 MinIO 检查点管理器,io.minio.spark.checkpoint.S3BasedCheckpointFileManager
,它利用了 MinIO 严格一致的原子事务。MinIO 的检查点管理器充分利用了本地对象 API,并避免了基于 POSIX 的实现带来的不必要负担。
您可以通过在 Spark 配置中添加以下一行代码轻松地在您的代码中使用检查点管理器:
这里 是您可以运行以查看结果的示例代码。
构建您自己的镜像或使用 Docker Hub 中的 openlake/sparkjob-demo:3.3.2
,其中包含上述代码。您可以在 这里 找到代码。
部署使用新的 MinIO 检查点管理器的优化后的消费者,如下所示:
性能基准测试
我们测量了使用优化后的检查点管理器的 Spark 结构化流式消费者发出的 S3 API 调用次数,并记录了以下结果:
API RX TX CALLS ERRORS
s3.DeleteMultipleObjects 5.8 MiB 1.7 MiB 15801 0
s3.DeleteObject 12 MiB 0 B 46465 0
s3.GetObject 4.0 MiB 2.7 GiB 15802 0
s3.HeadObject 43 MiB 0 B 172825 0
s3.ListObjectsV1 7.8 MiB 7.8 GiB 31402 0
s3.ListObjectsV2 3.9 MiB 5.2 MiB 15782 0
s3.PutObject 4.7 GiB 0 B 63204 0
Summary:
Total: 361281 CALLS, 4.8 GiB RX, 10 GiB TX - in 12160.25s
我们已经可以看到,API 调用次数从 ~700K
减少到 ~361K
,在此基础上,如果我们在每次轮询之前添加 1 分钟
的延迟,我们将看到进一步的改进。
API RX TX CALLS ERRORS
s3.DeleteMultipleObjects 75 KiB 23 KiB 200 0
s3.DeleteObject 100 KiB 0 B 394 0
s3.GetObject 52 KiB 469 KiB 199 0
s3.HeadObject 508 KiB 0 B 1995 0
s3.ListBuckets 150 B 254 B 1 0
s3.ListObjectsV1 75 KiB 2.8 MiB 293 0
s3.ListObjectsV2 51 KiB 67 KiB 200 0
s3.PutObject 2.0 GiB 0 B 803 0
Summary:
Total: 4085 CALLS, 2.0 GiB RX, 3.3 MiB TX - in 11945.35s
从之前的 ~20K
API 调用次数,我们现在已经减少到 ~4K
。在 版本化存储桶
上,我们还会注意到另一个主要的改进,即没有 v2 删除标记
对象存在,我们只有 v1
对象。
Spark 结构化流式和数据湖
在这篇博文中,我们了解了如何使用 Spark 结构化流式从 Kafka 中消费事件,并且我们还深入研究了一些可以减少 S3 API 调用次数的优化方法。我们还了解了使用 MinIO 检查点管理器的优势,以及其实现如何避免所有 POSIX 负担并利用对象存储的本机严格一致性。在下一篇博文中,我们将了解如何在 Spark 结构化流式中实现端到端的 Kafka 生产者和消费者,以及这如何加快整个流程。
Kafka、Spark 和 MinIO 经常结合起来构建数据湖和分析平台。它们都是软件定义的,为流式数据提供了一个可移植的多云环境,使您能够在任何地方馈送分析和 AI/ML 应用程序。
立即下载 MinIO 并开始构建您的云原生流式数据湖。