Kafka主题端到端Spark结构化流

End to End Spark Structured Streaming for Kafka Topics

Apache Kafka 和 Apache Spark 是构建流式数据管道以馈送数据湖和湖仓的两大领先技术。从非常高的层次来看,Kafka 将消息流式传输到 Spark,在 Spark 中,这些消息被转换为应用程序可以读取并保存到存储中的格式。这种架构使得构建各种实时、事件驱动的分析和 AI/ML 应用程序成为可能。

我们在之前的文章使用 Kafka 和 MinIO 的 Spark 结构化流式处理中介绍了这种架构,演示了如何利用其统一的批处理和流式处理 API 从发布到 Kafka 的数据创建数据帧。此架构减轻了优化流式处理底层元素的负担,并提供了端到端的功用。Kafka 充当实时数据流的中心枢纽,这些数据流由 Spark 结构化流式处理以编程方式处理。数据处理完成后,Spark 结构化流式处理将结果发布到 MinIO,并在其中将其保存为对象;这些对象共同构成了数据湖。

MinIO 旨在提供高性能、弹性和可扩展的云原生对象存储,用于数据湖及其支持的应用程序。除了 S3 本身之外,最佳的 S3 API 兼容性让开发人员确信他们可以使用自己的自定义软件、云原生分析或 AI/ML 而不会出现问题。擦除编码 保护数据湖中的数据,而复制 使数据可在任何需要的地方使用。MinIO 充分利用底层硬件(请参阅为 MinIO 部署选择最佳硬件)以提供最大的性能——我们已对其进行基准测试,在仅使用 32 个现成的 NVMe SSD 节点的 GET 操作中达到 325 GiB/s(349 GB/s),在 PUT 操作中达到 165 GiB/s(177 GB/s)。

在之前的博文中,我们了解了如何在 Apache Spark 结构化流式处理中使用 Apache Kafka 事件。在本博文中,我们将探讨如何使用 Spark 结构化流式处理在不使用 Kafka 生产者或连接器的情况下,端到端地创建 Kafka 主题事件并将它们使用到 MinIO 中。

先决条件

在开始之前,我们需要准备好以下内容:

  1. Kafka
  2. Kafka Schema Registry
  3. Spark Operator
  4. MinIO 集群

准备好上述先决条件后,我们将执行以下操作:

  • 修改 Kafka 主题分区
  • 设置 Spark 结构化流式处理消费者
  • 设置 Spark 结构化流式处理生产者

修改 Kafka 主题分区

为了充分利用 Spark 的并行化功能,我们需要将 nyc-avro-topic Kafka 主题的分区数设置为 10,以便 Spark 结构化流式处理可以使用 10 个工作进程同时从 Kafka 中提取数据,如下所示。

%%writefile sample-code/spark-job/kafka-nyc-avro-topic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: nyc-avro-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-kafka-cluster
spec:
  partitions: 10
  replicas: 3

注意:在应用上述更改之前,强烈建议根据之前博文中运行的结果删除 nyc-avro-topic(如果已存在)。

Spark 结构化流式处理消费者

我们编写了一个示例 Spark 消费者流式处理 Python 代码片段,该片段使用 Spark 连接到我们的 MinIO 后端。然后,它在主题 nyc-avro-topic 上监听 Kafka 代理的新消息,然后将其写入 MinIO 存储桶 s3a://warehouse-v/k8/spark-stream/ 中。

您可以在下面找到代码片段,完整代码在此处

%%writefile sample-code/src/main-streaming-spark-consumer.py
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
import json

...

taxi_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .trigger(processingTime='1 second') \
    .option("path", "s3a://warehouse-v/k8/spark-stream/") \
    .option("checkpointLocation", "s3a://warehouse-v/k8/checkpoint") \
    .start() \
    .awaitTermination()

让我们深入了解上面代码中发生了哪些变化。在这种情况下,Avro 文件基于 Spark 的实现,因此不需要 Confluence 依赖项,也不需要像之前那样跳过前 6 位。我们还添加了 1 秒的延迟,以便每次轮询 Kafka 事件之前都会暂停。

taxi_df = stream_df.select(from_avro("value", json.dumps(value_schema_dict)).alias("data")).select("data.*")

taxi_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .trigger(processingTime='1 second') \
    .option("path", "s3a://warehouse-v/k8/spark-stream/") \
    .option("checkpointLocation", "s3a://warehouse-v/k8/checkpoint") \
    .start() \
    .awaitTermination()

现在,让我们使用下面的 Dockerfile 构建 Docker 镜像,其中包含上述 Python 代码运行所需的依赖项。我们还将 .py 脚本作为 Docker 镜像的一部分包含在内,以便执行。

使用此处提供的 镜像 构建 Docker 镜像,或者使用 openlake/sparkjob-demo:3.3.2

%%writefile sample-code/spark-job/sparkjob-streaming-consumer.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-stream-optimized
  namespace: spark-operator
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "openlake/sparkjob-demo:3.3.2"
  imagePullPolicy: Always
  mainApplicationFile: local:///app/main-streaming-spark-consumer.py
  sparkVersion: "3.3.2"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 1
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 3
    memory: "2048m"
    labels:
      version: 3.3.2
    serviceAccount: my-release-spark
    env:
      - name: AWS_REGION
        value: us-east-1
      - name: AWS_ACCESS_KEY_ID
        value: openlakeuser
      - name: AWS_SECRET_ACCESS_KEY
        value: openlakeuser
  executor:
    cores: 1
    instances: 10
    memory: "1024m"
    labels:
      version: 3.3.2
    env:
      - name: AWS_REGION
        value: us-east-1
      - name: AWS_ACCESS_KEY_ID
        value: openlakeuser
      - name: AWS_SECRET_ACCESS_KEY
        value: openlakeuser

Spark 结构化流生产者

现在我们已经正确配置了 Kafka 主题,让我们使用 Spark 结构化流创建一个 Kafka 生产者,如下所示。点击 此处 查看完整的代码片段。

%%writefile sample-code/src/spark-streaming-kafka-producer.py
import json
from io import BytesIO
import os
import avro.schema
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import to_avro
from pyspark.sql.functions import struct
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

...

df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
df = df.select(to_avro(struct([df[x] for x in df.columns]), value_schema_str).alias("value"))

df.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers",
            os.getenv("KAFKA_BOOTSTRAM_SERVER", "my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092")) \
    .option("flushInterval", "100ms") \
    .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") \
    .option("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer") \
    .option("schema.registry.url",
            os.getenv("KAFKA_SCHEMA_REGISTRY", "http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081")) \
    .option("topic", "nyc-avro-topic") \
    .save()


在上面的代码中,我们从 MinIO 存储桶中的 taxi-data.csv 数据读取数据,代码块如下所示。

df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

我们在下面的代码块中将 DataFrame 转换为 Avro 格式。

df = df.select(to_avro(struct([df[x] for x in df.columns]), value_schema_str).alias("value"))

最后,我们使用以下代码块将 Kafka 事件写入主题 nyc-avro-topic,目标 Kafka 服务器为 my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092,Kafka Schema Registry URL 为 http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081

df.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers",
            os.getenv("KAFKA_BOOTSTRAM_SERVER", "my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092")) \
    .option("flushInterval", "100ms") \
    .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") \
    .option("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer") \
    .option("schema.registry.url",
            os.getenv("KAFKA_SCHEMA_REGISTRY", "http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081")) \
    .option("topic", "nyc-avro-topic") \
    .save()

使用以上代码构建Docker镜像,或使用我们在openlake/sparkjob-demo:3.3.2中构建的镜像。

%%writefile sample-code/spark-job/sparkjob-kafka-producer.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: kafka-stream-producer
  namespace: spark-operator
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "openlake/sparkjob-demo:3.3.2"
  imagePullPolicy: Always
  mainApplicationFile: local:///app/spark-streaming-kafka-producer.py
  sparkVersion: "3.3.2"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 3
    # coreLimit: "1200m"
    memory: "2048m"
    labels:
      version: 3.3.2
    serviceAccount: my-release-spark
    env:
      - name: INPUT_PATH
        value: "s3a://openlake/spark/sample-data/taxi-data.csv"
      - name: AWS_REGION
        value: us-east-1
      - name: AWS_ACCESS_KEY_ID
        value: openlakeuser
      - name: AWS_SECRET_ACCESS_KEY
        value: openlakeuser
  executor:
    cores: 1
    instances: 10
    memory: "1024m"
    labels:
      version: 3.3.2
    env:
      - name: INPUT_PATH
        value: "s3a://openlake/spark/sample-data/taxi-data.csv"
      - name: AWS_REGION
        value: us-east-1
      - name: AWS_ACCESS_KEY_ID
        value: openlakeuser
      - name: AWS_SECRET_ACCESS_KEY
        value: openlakeuser

根据我们设置的配置,nyc-avro-topic有10个分区,Spark Kafka生产者和消费者有10个执行器,所有约1.12亿行数据在不到10分钟内完成流式传输和消费,与之前使用我们旧的Kafka/Spark流式传输设置需要约3小时相比,显著减少了时间。这是一个巨大的性能提升,并且在开发实时应用程序时,以这种速度处理流式数据至关重要。

注意:如果我们没有在nyc-avro-topic Spark生产者中应用10个分区的更改,我们仍然可以在<10分钟内完成,但消费者的完成时间将取决于主题级别的分区数量。

性能基准测试

我们测量了仅由Spark结构化流式消费者发起的S3 API调用次数,并获得了以下数据。

API                             接收      发送      调用次数   错误

s3.CompleteMultipartUpload      7.5 KiB 7.3 KiB 16      0       

s3.DeleteMultipleObjects        22 KiB  6.8 KiB 60      0       

s3.HeadObject                   51 KiB  0 B     200     0       

s3.ListObjectsV1                1.5 KiB 9.0 KiB 6       0       

s3.ListObjectsV2                15 KiB  20 KiB  60      0       

s3.NewMultipartUpload           4.1 KiB 6.1 KiB 16      0       

s3.PutObject                    1.1 GiB 0 B     63      0       

s3.PutObjectPart                1.2 GiB 0 B     32      0       


摘要


总计: 453 次调用, 2.4 GiB 接收, 49 KiB 发送 - 160.71s


如果我们不使用 MinIO 的检查点管理器重复此练习,我们将得到以下数字

API                             接收      发送      调用次数   错误

s3.CompleteMultipartUpload      6.1 KiB 5.9 KiB 13      0       

s3.CopyObject                   6.1 KiB 4.1 KiB 18      0       

s3.DeleteMultipleObjects        30 KiB  8.8 KiB 78      0       

s3.DeleteObject                 4.6 KiB 0 B     18      0       

s3.HeadObject                   110 KiB 0 B     432     0       

s3.ListObjectsV2                63 KiB  124 KiB 248     0       

s3.NewMultipartUpload           3.3 KiB 4.9 KiB 13      0       

s3.PutObject                    1.3 GiB 0 B     66      0       

s3.PutObjectPart                1.1 GiB 0 B     26      0       


摘要


总计: 912 次调用, 2.3 GiB 接收, 147 KiB 发送 - 166.55s


我们可以清楚地看到,使用端到端 Spark 结构化流式处理 Kafka 生产者和消费者,以及 MinIO 的检查点管理器,显著提高了性能,我们通过减少 S3 API 调用进一步增强了性能。

此外,如果上述示例代码在版本化的存储桶上运行,并且我们执行了 mc ls --versions --recursive opl/warehouse-v/k8 --summarize 命令,那么使用 MinIO 的检查点管理器将最终得到 84 个对象,而使用默认的检查点管理器将得到 140 个对象,这再次说明了默认管理器不会清理删除标记。

高性能流式处理开启无限可能

这篇博文演示了如何使用 Spark 结构化流式处理创建 Kafka 主题事件并将其端到端消费到 MinIO 中,而无需使用 Kafka 生产者或连接器。我们这样做是为了简化流式处理架构,并且使用 MinIO 检查点管理器将所需时间从三个小时减少到不到十分钟。

现在,您具备了使用 Kafka、Spark 和 MinIO 构建超高速流式数据管道的知识。在流式处理领域,速度越快越好,因为它可以实现更及时的分析。这三个云原生应用程序都是软件定义的,这使得它们成为构建多云流式数据湖的强大组合。

立即下载 MinIO,开始构建云原生流式数据湖,以支持您的实时分析和 AI/ML 计划。