Apache Kafka 和 Apache Spark 是构建流式数据管道以馈送数据湖和湖仓的两大领先技术。从非常高的层次来看,Kafka 将消息流式传输到 Spark,在 Spark 中,这些消息被转换为应用程序可以读取并保存到存储中的格式。这种架构使得构建各种实时、事件驱动的分析和 AI/ML 应用程序成为可能。
我们在之前的文章使用 Kafka 和 MinIO 的 Spark 结构化流式处理中介绍了这种架构,演示了如何利用其统一的批处理和流式处理 API 从发布到 Kafka 的数据创建数据帧。此架构减轻了优化流式处理底层元素的负担,并提供了端到端的功用。Kafka 充当实时数据流的中心枢纽,这些数据流由 Spark 结构化流式处理以编程方式处理。数据处理完成后,Spark 结构化流式处理将结果发布到 MinIO,并在其中将其保存为对象;这些对象共同构成了数据湖。
MinIO 旨在提供高性能、弹性和可扩展的云原生对象存储,用于数据湖及其支持的应用程序。除了 S3 本身之外,最佳的 S3 API 兼容性让开发人员确信他们可以使用自己的自定义软件、云原生分析或 AI/ML 而不会出现问题。擦除编码 保护数据湖中的数据,而复制 使数据可在任何需要的地方使用。MinIO 充分利用底层硬件(请参阅为 MinIO 部署选择最佳硬件)以提供最大的性能——我们已对其进行基准测试,在仅使用 32 个现成的 NVMe SSD 节点的 GET 操作中达到 325 GiB/s(349 GB/s),在 PUT 操作中达到 165 GiB/s(177 GB/s)。
在之前的博文中,我们了解了如何在 Apache Spark 结构化流式处理中使用 Apache Kafka 事件。在本博文中,我们将探讨如何使用 Spark 结构化流式处理在不使用 Kafka 生产者或连接器的情况下,端到端地创建 Kafka 主题事件并将它们使用到 MinIO 中。
先决条件
在开始之前,我们需要准备好以下内容:
- Kafka
- Kafka Schema Registry
- Spark Operator
- MinIO 集群
准备好上述先决条件后,我们将执行以下操作:
- 修改 Kafka 主题分区
- 设置 Spark 结构化流式处理消费者
- 设置 Spark 结构化流式处理生产者
修改 Kafka 主题分区
为了充分利用 Spark 的并行化功能,我们需要将 nyc-avro-topic
Kafka 主题的分区数设置为 10,以便 Spark 结构化流式处理可以使用 10 个工作进程同时从 Kafka 中提取数据,如下所示。
%%writefile sample-code/spark-job/kafka-nyc-avro-topic.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: nyc-avro-topic namespace: kafka labels: strimzi.io/cluster: my-kafka-cluster spec: partitions: 10 replicas: 3 |
注意:在应用上述更改之前,强烈建议根据之前博文中运行的结果删除 nyc-avro-topic
(如果已存在)。
Spark 结构化流式处理消费者
我们编写了一个示例 Spark 消费者流式处理 Python 代码片段,该片段使用 Spark 连接到我们的 MinIO 后端。然后,它在主题 nyc-avro-topic
上监听 Kafka 代理的新消息,然后将其写入 MinIO 存储桶 s3a://warehouse-v/k8/spark-stream/
中。
您可以在下面找到代码片段,完整代码在此处。
%%writefile sample-code/src/main-streaming-spark-consumer.py import os from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.avro.functions import from_avro from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType import json
...
taxi_df.writeStream \ .format("parquet") \ .outputMode("append") \ .trigger(processingTime='1 second') \ .option("path", "s3a://warehouse-v/k8/spark-stream/") \ .option("checkpointLocation", "s3a://warehouse-v/k8/checkpoint") \ .start() \ .awaitTermination() |
让我们深入了解上面代码中发生了哪些变化。在这种情况下,Avro 文件基于 Spark 的实现,因此不需要 Confluence 依赖项,也不需要像之前那样跳过前 6 位。我们还添加了 1 秒的延迟,以便每次轮询 Kafka 事件之前都会暂停。
taxi_df = stream_df.select(from_avro("value", json.dumps(value_schema_dict)).alias("data")).select("data.*")
taxi_df.writeStream \ .format("parquet") \ .outputMode("append") \ .trigger(processingTime='1 second') \ .option("path", "s3a://warehouse-v/k8/spark-stream/") \ .option("checkpointLocation", "s3a://warehouse-v/k8/checkpoint") \ .start() \ .awaitTermination() |
现在,让我们使用下面的 Dockerfile 构建 Docker 镜像,其中包含上述 Python 代码运行所需的依赖项。我们还将 .py 脚本作为 Docker 镜像的一部分包含在内,以便执行。
使用此处提供的 镜像 构建 Docker 镜像,或者使用 openlake/sparkjob-demo:3.3.2
。
%%writefile sample-code/spark-job/sparkjob-streaming-consumer.yaml apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-stream-optimized namespace: spark-operator spec: type: Python pythonVersion: "3" mode: cluster image: "openlake/sparkjob-demo:3.3.2" imagePullPolicy: Always mainApplicationFile: local:///app/main-streaming-spark-consumer.py sparkVersion: "3.3.2" restartPolicy: type: OnFailure onFailureRetries: 1 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 3 memory: "2048m" labels: version: 3.3.2 serviceAccount: my-release-spark env: - name: AWS_REGION value: us-east-1 - name: AWS_ACCESS_KEY_ID value: openlakeuser - name: AWS_SECRET_ACCESS_KEY value: openlakeuser executor: cores: 1 instances: 10 memory: "1024m" labels: version: 3.3.2 env: - name: AWS_REGION value: us-east-1 - name: AWS_ACCESS_KEY_ID value: openlakeuser - name: AWS_SECRET_ACCESS_KEY value: openlakeuser |
Spark 结构化流生产者
现在我们已经正确配置了 Kafka 主题,让我们使用 Spark 结构化流创建一个 Kafka 生产者,如下所示。点击 此处 查看完整的代码片段。
%%writefile sample-code/src/spark-streaming-kafka-producer.py import json from io import BytesIO import os import avro.schema from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.avro.functions import to_avro from pyspark.sql.functions import struct from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
...
df = spark.read.option("header", "true").schema(schema).csv( os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv")) df = df.select(to_avro(struct([df[x] for x in df.columns]), value_schema_str).alias("value"))
df.write \ .format("kafka") \ .option("kafka.bootstrap.servers", os.getenv("KAFKA_BOOTSTRAM_SERVER", "my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092")) \ .option("flushInterval", "100ms") \ .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") \ .option("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer") \ .option("schema.registry.url", os.getenv("KAFKA_SCHEMA_REGISTRY", "http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081")) \ .option("topic", "nyc-avro-topic") \ .save() |
在上面的代码中,我们从 MinIO 存储桶中的 taxi-data.csv
数据读取数据,代码块如下所示。
df = spark.read.option("header", "true").schema(schema).csv( os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv")) |
我们在下面的代码块中将 DataFrame 转换为 Avro 格式。
df = df.select(to_avro(struct([df[x] for x in df.columns]), value_schema_str).alias("value")) |
最后,我们使用以下代码块将 Kafka 事件写入主题 nyc-avro-topic
,目标 Kafka 服务器为 my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
,Kafka Schema Registry URL 为 http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081
。
df.write \ .format("kafka") \ .option("kafka.bootstrap.servers", os.getenv("KAFKA_BOOTSTRAM_SERVER", "my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092")) \ .option("flushInterval", "100ms") \ .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") \ .option("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer") \ .option("schema.registry.url", os.getenv("KAFKA_SCHEMA_REGISTRY", "http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081")) \ .option("topic", "nyc-avro-topic") \ .save() |
使用以上代码构建Docker镜像,或使用我们在openlake/sparkjob-demo:3.3.2
中构建的镜像。
%%writefile sample-code/spark-job/sparkjob-kafka-producer.yaml apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: kafka-stream-producer namespace: spark-operator spec: type: Python pythonVersion: "3" mode: cluster image: "openlake/sparkjob-demo:3.3.2" imagePullPolicy: Always mainApplicationFile: local:///app/spark-streaming-kafka-producer.py sparkVersion: "3.3.2" restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 3 # coreLimit: "1200m" memory: "2048m" labels: version: 3.3.2 serviceAccount: my-release-spark env: - name: INPUT_PATH value: "s3a://openlake/spark/sample-data/taxi-data.csv" - name: AWS_REGION value: us-east-1 - name: AWS_ACCESS_KEY_ID value: openlakeuser - name: AWS_SECRET_ACCESS_KEY value: openlakeuser executor: cores: 1 instances: 10 memory: "1024m" labels: version: 3.3.2 env: - name: INPUT_PATH value: "s3a://openlake/spark/sample-data/taxi-data.csv" - name: AWS_REGION value: us-east-1 - name: AWS_ACCESS_KEY_ID value: openlakeuser - name: AWS_SECRET_ACCESS_KEY value: openlakeuser |
根据我们设置的配置,nyc-avro-topic
有10个分区,Spark Kafka生产者和消费者有10个执行器,所有约1.12亿
行数据在不到10分钟内完成流式传输和消费,与之前使用我们旧的Kafka/Spark流式传输设置需要约3小时相比,显著减少了时间。这是一个巨大的性能提升,并且在开发实时应用程序时,以这种速度处理流式数据至关重要。
注意:如果我们没有在nyc-avro-topic
Spark生产者中应用10个分区
的更改,我们仍然可以在<10分钟
内完成,但消费者的完成时间将取决于主题级别的分区数量。
我们测量了仅由Spark结构化流式消费者发起的S3 API调用次数,并获得了以下数据。
API 接收 发送 调用次数 错误 s3.CompleteMultipartUpload 7.5 KiB 7.3 KiB 16 0
s3.DeleteMultipleObjects 22 KiB 6.8 KiB 60 0 s3.HeadObject 51 KiB 0 B 200 0 s3.ListObjectsV1 1.5 KiB 9.0 KiB 6 0 s3.ListObjectsV2 15 KiB 20 KiB 60 0 s3.NewMultipartUpload 4.1 KiB 6.1 KiB 16 0 s3.PutObject 1.1 GiB 0 B 63 0 s3.PutObjectPart 1.2 GiB 0 B 32 0
摘要
总计: 453 次调用, 2.4 GiB 接收, 49 KiB 发送 - 在 160.71s |
如果我们不使用 MinIO 的检查点管理器重复此练习,我们将得到以下数字
API 接收 发送 调用次数 错误 s3.CompleteMultipartUpload 6.1 KiB 5.9 KiB 13 0 s3.CopyObject 6.1 KiB 4.1 KiB 18 0 s3.DeleteMultipleObjects 30 KiB 8.8 KiB 78 0 s3.DeleteObject 4.6 KiB 0 B 18 0 s3.HeadObject 110 KiB 0 B 432 0 s3.ListObjectsV2 63 KiB 124 KiB 248 0 s3.NewMultipartUpload 3.3 KiB 4.9 KiB 13 0 s3.PutObject 1.3 GiB 0 B 66 0 s3.PutObjectPart 1.1 GiB 0 B 26 0
摘要
总计: 912 次调用, 2.3 GiB 接收, 147 KiB 发送 - 在 166.55s |
我们可以清楚地看到,使用端到端 Spark 结构化流式处理 Kafka 生产者和消费者,以及 MinIO 的检查点管理器,显著提高了性能,我们通过减少 S3 API 调用进一步增强了性能。
此外,如果上述示例代码在版本化的存储桶上运行,并且我们执行了 mc ls --versions --recursive opl/warehouse-v/k8 --summarize
命令,那么使用 MinIO 的检查点管理器将最终得到 84 个对象,而使用默认的检查点管理器将得到 140 个对象,这再次说明了默认管理器不会清理删除标记。
这篇博文演示了如何使用 Spark 结构化流式处理创建 Kafka 主题事件并将其端到端消费到 MinIO 中,而无需使用 Kafka 生产者或连接器。我们这样做是为了简化流式处理架构,并且使用 MinIO 检查点管理器将所需时间从三个小时减少到不到十分钟。
现在,您具备了使用 Kafka、Spark 和 MinIO 构建超高速流式数据管道的知识。在流式处理领域,速度越快越好,因为它可以实现更及时的分析。这三个云原生应用程序都是软件定义的,这使得它们成为构建多云流式数据湖的强大组合。
立即下载 MinIO,开始构建云原生流式数据湖,以支持您的实时分析和 AI/ML 计划。