Spark、MinIO 和 Kubernetes

Spark, MinIO and Kubernetes

Apache Spark 是一个开源的分布式计算系统,用于大数据处理和分析。它旨在以速度、效率和易用性处理大规模数据处理。Spark 为大规模数据处理提供了一个统一的分析引擎,支持多种语言,包括 Java、Scala、Python 和 R。

使用 Spark 的好处很多。首先,它提供了高度的并行性,这意味着它可以跨集群中的多个节点快速有效地处理大量数据。其次,Spark 为数据处理提供了一套丰富的 API,包括对 SQL 查询、机器学习、图处理和流处理的支持。第三,Spark 具有灵活且可扩展的架构,使开发人员可以轻松地与各种数据源和其他工具集成。

在运行 Spark 作业时,使用合适的存储系统来存储输入和输出数据至关重要。像 MinIO 这样的对象存储系统是唯一能够针对 PB 级数据运行 Spark 作业的方式,因为它们是高度可扩展和持久化的存储解决方案。MinIO 是一个开源的对象存储系统,可以轻松部署在您选择的本地或云环境中。借助 行业领先的 S3 兼容性,MinIO 被广泛用于支持 S3 API 的各种工具,包括 Spark。

与传统 Hadoop 分布式文件系统 (HDFS) 或其他基于文件存储系统相比,将 MinIO 与 Spark 结合使用具有多种优势。MinIO 可高度扩展,可以轻松处理 PB 级的大量数据。MinIO 的读写速度分别超过 2.6 Tbps 和 1.32 Tbps,可以提供支持大型 Spark 数据集所需的规模化性能。MinIO 是一种灵活且经济高效的存储解决方案,可以轻松 与其他工具 和系统集成。写入 MinIO 的数据是不可变的且带版本控制的,并且非常持久,多个 擦除编码 数据副本存储在多个节点中以实现冗余和容错。为了完善功能,主动-主动复制批处理复制 可用于进一步增强冗余和容错,或者只是将数据移动到最适合使用的位置。

为什么选择在 Kubernetes 上运行 Spark?

将 Apache Spark 部署到 Kubernetes 比独立部署它具有多个优势。以下是一些原因

  1. 资源管理:Kubernetes 提供强大的资源管理功能,可以帮助优化资源利用率并最大程度地减少浪费。通过在 Kubernetes 上部署 Spark,您可以利用 Kubernetes 的资源分配和调度功能,根据 Spark 作业的需要动态地为其分配资源。
  2. 可扩展性:Kubernetes 可以根据工作负载自动扩展分配给 Spark 的资源。这意味着 Spark 可以根据其需要处理的数据量进行扩展或缩减,无需人工干预。
  3. 容错性:Kubernetes 提供内置的容错机制,可确保 Spark 集群的可靠性。如果集群中的某个节点发生故障,Kubernetes 会自动将 Spark 任务重新调度到另一个节点,确保工作负载不受影响。
  4. 简化部署:Kubernetes 提供了一个简化的部署模型,您可以在其中使用单个 YAML 文件部署 Spark。此文件指定 Spark 集群所需的资源,Kubernetes 会自动处理其余事项。
  5. 与其他 Kubernetes 服务集成:通过在 Kubernetes 上部署 Spark,您可以利用其他 Kubernetes 服务(例如监控和日志记录)来更好地了解 Spark 集群的性能和运行状况。

在 Kubernetes 上设置 Spark

我们将使用 Spark Operator 在 Kubernetes 上设置 Spark。Spark Operator 是一个 Kubernetes 控制器,允许您在 Kubernetes 上管理 Spark 应用程序。它提供了一个名为 SparkApplication 的自定义资源定义 (CRD),允许您在 Kubernetes 上定义和运行 Spark 应用程序。Spark Operator 还提供了一个 Web UI,允许您轻松监控和管理 Spark 应用程序。Spark Operator 是在 Kubernetes Operator SDK 之上构建的,Kubernetes Operator SDK 是一个用于构建 Kubernetes Operator 的框架。Spark Operator 是开源的,可在 GitHub 上获得。它也作为一个 Helm 图表提供,这使得它易于部署在 Kubernetes 上。在本教程中,我们将使用 Helm 图表在 Kubernetes 集群上部署 Spark Operator。

Spark Operator 提供了多种功能,简化了 Kubernetes 环境中 Spark 应用程序的管理。这些功能包括使用自定义资源进行声明式应用程序规范和管理、自动提交符合条件的 SparkApplications、对计划应用程序的原生 cron 支持以及通过变异准入 Webhook 对 Spark Pod 进行原生功能之外的自定义。

此外,该工具支持更新的 SparkAppliations 的自动重新提交和重启,以及对失败提交的线性回退重试。它还提供将本地 Hadoop 配置作为 Kubernetes ConfigMap 挂载的功能,以及通过 sparkctl 将本地应用程序依赖项自动阶段到 MinIO。最后,该工具支持收集和导出应用程序级指标以及驱动程序/执行程序指标到 Prometheus。

先决条件

要学习本教程,您将需要

  1. 一个 Kubernetes 集群。您可以使用 Minikube 在您的机器上设置一个本地 Kubernetes 集群。
  2. Helm,Kubernetes 的包管理器。您可以按照本指南在您的机器上 安装 Helm
  3. 一个在裸机或 Kubernetes 上运行的 MinIO 服务器。您可以按照 本指南 在裸机上安装 MinIO,或者按照 本指南 在 Kubernetes 上安装 MinIO,或者您可以使用 MinIO Play 服务器 进行测试。
  4. 一个 MinIO 客户端 (mc) 来访问 MinIO 服务器。您可以按照 本指南 在您的机器上安装 mc。

安装 Spark Operator

要安装 Spark Operator,您需要将 Spark Operator 的 Helm 存储库添加到您的本地 Helm 客户端。您可以通过运行以下命令来完成此操作

helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator

添加存储库后,您可以使用以下命令安装 Spark Operator(您可能需要等待一分钟才能安装)

helm install my-release spark-operator/spark-operator \
--namespace spark-operator \
--set webhook.enable=true \
--set image.repository=openlake/spark-operator \
--set image.tag=3.3.1 \
--create-namespace

您将看到以下输出

LAST DEPLOYED: Mon Feb 27 19:48:33 2023
NAMESPACE: spark-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None

此命令将在 spark-operator 命名空间中安装 Spark Operator,并启用变异准入 Webhook。Webhook 必须启用才能将本地 Hadoop 配置作为 Kubernetes ConfigMap 挂载,并配置驱动程序和执行程序可以使用的环境变量。映像存储库和标记设置为包含最新版本 Spark Operator 的映像。您也可以通过省略 --set image.repository--set image.tag 标志来使用默认的映像存储库和标记,在撰写本文时,最新的 Spark Operator 版本使用了 Spark 的 3.1.1 版本,而 openlake/spark-operator 使用了 Spark 的最新 3.3.1 版本。如果您已经有一个名为 spark-operator 的命名空间,则可以跳过 --create-namespace 标志。这还将监控所有命名空间中的所有 Spark 应用程序。

可以在 此处 找到配置选项的详细列表。

验证 Spark Operator 安装

要验证 Spark Operator 是否已成功安装,您可以运行以下命令

kubectl get pods -n spark-operator

您将看到类似以下输出的结果

NAME                                        READY   STATUS    RESTARTS   AGE
my-release-spark-operator-f56c4d8c4-pr857   1/1     Running   0          14m

现在我们已经安装了 Spark Operator,我们可以在 Kubernetes 上部署 Spark 应用程序或计划的 Spark 应用程序。

部署 Spark 应用程序

让我们尝试部署与 Spark Operator 附带的示例简单 Spark 应用程序之一。您可以在 此处 找到示例应用程序列表,我们对计算 Pi 感兴趣,因此我们将修改 spark Pi 应用程序 以使用 Spark 3.3.1 并在 Kubernetes 上运行它。

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: pyspark-pi
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "openlake/spark-py:3.3.1"
    imagePullPolicy: Always
    mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
    sparkVersion: "3.3.1"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        coreLimit: "1200m"
        memory: "512m"
        labels:
        version: 3.1.1
        serviceAccount: my-release-spark
    executor:
        cores: 1
        instances: 1
        memory: "512m"
        labels:
        version: 3.3.1

上面的应用程序将使用 Kubernetes 上的 Spark 计算 Pi 的值。您可以将上面的应用程序保存为 spark-pi.yaml,并使用以下命令部署它

kubectl apply -f spark-pi.yaml

要验证作业是否正在运行,您可以运行以下命令

kubectl -n spark-operator get pods

您应该看到类似以下内容

NAME                                           READY   STATUS      RESTARTS   AGE
my-release-spark-operator-59bccf4d94-fdrc9     1/1     Running     0          24d
my-release-spark-operator-webhook-init-jspnn   0/1     Completed   0          68d
pyspark-pi-driver                              1/1     Running     0          23s
pythonpi-b6a3e48693762e5d-exec-1               1/1     Running     0          7s

您可以使用以下命令检查应用程序的状态

kubectl get sparkapplications -n spark-operator

您将看到以下输出

NAME         STATUS      ATTEMPTS   START                  FINISH                 AGE
pyspark-pi   COMPLETED   1          2023-02-27T15:20:29Z   2023-02-27T15:20:59Z   10m

您还可以使用以下命令检查应用程序的日志

kubectl logs pyspark-pi-driver -n spark-operator

您将看到以下输出

23/02/27 15:20:55 INFO DAGScheduler: Job 0 finished: reduce at /opt/spark/examples/src/main/python/pi.py:42, took 2.597098 s
Pi is roughly 3.137960
23/02/27 15:20:55 INFO SparkUI: Stopped Spark web UI at http://pyspark-pi-d73653869375fa87-driver-svc.spark-operator.svc:4040
23/02/27 15:20:55 INFO KubernetesClusterSchedulerBackend: Shutting down all executors

现在我们已经使简单的 Spark 应用程序按预期工作,我们可以尝试使用 Spark 从 MinIO 读取和写入数据。

使用 Spark 从 MinIO 读取和写入数据

一旦我们将正确的依赖项和配置到位,使用 Spark 从 MinIO 读取和写入数据就非常简单。在这篇文章中,我们将不讨论依赖项,为了保持简单,我们使用 openlake/spark-py:3.3.1 映像,该映像包含使用 Spark 从 MinIO 读取和写入数据所需的所有依赖项。

将演示数据放入 MinIO

我们将使用 MinIO 上提供的纽约出租车数据集。您可以从这里下载数据集,它大约有 1.12 亿行,大小约为 10GB。对于此练习,可以使用任何现有或 新的 MinIO 部署,只要有足够的可用空间。您可以使用您选择的任何其他数据集,并使用以下命令将其上传到 MinIO,首先我们将创建应用程序将引用的存储桶

mc mb <Your-MinIO-Endpoint>/openlake
mc mb <Your-MinIO-Endpoint>/openlake/spark
mc mb <Your-MinIO-Endpoint>/openlake/spark/sample-data
mc cp nyc-taxi-data.csv <Your-MinIO-Endpoint>/openlake/spark/sample-data/nyc-taxi-data.csv

示例 Python 应用程序

现在让我们使用 Spark 从 MinIO 读取和写入数据。我们将使用以下示例 Python 应用程序来完成此操作。

import logging
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinioSparkJob")
spark = SparkSession.builder.getOrCreate()
def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "<Your-MinIO-AccessKey>"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
os.getenv("AWS_SECRET_ACCESS_KEY", "<Your-MinIO-SecretKey>"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "<Your-MinIO-Endpoint>"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
load_config(spark.sparkContext)
# Define schema for NYC Taxi Data
schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('tpep_pickup_datetime', StringType(), True),
    StructField('tpep_dropoff_datetime', StringType(), True),
    StructField('passenger_count', DoubleType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', DoubleType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True)])
# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Filter dataframe based on passenger_count greater than 6
large_passengers_df = df.filter(df.passenger_count > 6)
total_rows_count = df.count()
filtered_rows_count = large_passengers_df.count()
# File Output Committer is used to write the output to the destination (Not recommended for Production)
large_passengers_df.write.format("csv").option("header", "true").save(
os.getenv("OUTPUT_PATH", "s3a://openlake-tmp/spark/nyc/taxis_small"))
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")

上面的应用程序从 MinIO 读取纽约出租车数据集,并过滤乘客人数大于 6 的行。然后将过滤后的数据写入 MinIO。您可以将上面的代码保存为 main.py

构建 Docker 镜像

现在我们将构建包含上述 Python 应用程序的 Docker 镜像。您可以创建一个包含以下内容的 Dockerfile 来构建镜像

FROM openlake/spark-py:3.3.1
USER root
WORKDIR /app
RUN pip3 install pyspark==3.3.1
COPY src/*.py .

您可以构建自己的 Docker 镜像,或者使用 Docker Hub 上提供的预构建镜像 openlake/sparkjob-demo:3.3.1。如果您需要关于构建 Docker 镜像的复习,请参阅docker build

部署 MinIO Spark 应用程序

要使用 Spark 从 MinIO 读取和写入数据,您需要创建一个包含 MinIO 访问密钥和密钥的密钥。您可以使用以下命令创建密钥

kubectl create secret generic minio-secret \
--from-literal=AWS_ACCESS_KEY_ID=<Your-MinIO-AccessKey> \
--from-literal=AWS_SECRET_ACCESS_KEY=<Your-MinIO-SecretKey> \
--from-literal=ENDPOINT=<Your-MinIO-Endpoint> \
--from-literal=AWS_REGION=us-east-1 \
--namespace spark-operator

您将看到以下输出

secret/minio-secret created

现在我们已经创建了密钥,我们可以部署从 MinIO 读取和写入数据的 Spark 应用程序。您可以将以下应用程序保存为 sparkjob-minio.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: spark-minio
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "openlake/sparkjob-demo:3.3.1"
    imagePullPolicy: Always
    mainApplicationFile: local:///app/main.py
    sparkVersion: "3.3.1"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        memory: "1024m"
        labels:
            version: 3.3.1
        serviceAccount: my-release-spark
        env:
            -   name: AWS_REGION
                value: us-east-1
            -   name: AWS_ACCESS_KEY_ID
                value: <Your-MinIO-AccessKey>
            -   name: AWS_SECRET_ACCESS_KEY
                value: <Your-MinIO-SecretKey>
    executor:
        cores: 1
        instances: 3
        memory: "1024m"
        labels:
            version: 3.3.1
        env:
            -   name: INPUT_PATH
                value: "s3a://openlake/spark/sample-data/taxi-data.csv"
            -   name: OUTPUT_PATH
                value: "s3a://openlake/spark/output/taxi-data-output"
            -   name: AWS_REGION
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_REGION
            -   name: AWS_ACCESS_KEY_ID
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_ACCESS_KEY_ID
            -   name: AWS_SECRET_ACCESS_KEY
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_SECRET_ACCESS_KEY
            -   name: ENDPOINT
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: ENDPOINT

上面的 Python Spark 应用程序 YAML 文件包含以下配置

  • spec.type:应用程序的类型。在本例中,它是一个 Python 应用程序。
  • spec.pythonVersion:应用程序中使用的 Python 版本。
  • spec.mode:应用程序的模式。在本例中,它是一个集群模式应用程序。
  • spec.image:包含应用程序的 Docker 镜像。
  • spec.imagePullPolicy:Docker 镜像的镜像拉取策略。
  • spec.mainApplicationFile:主应用程序文件路径。
  • spec.sparkVersion:应用程序中使用的 Spark 版本。
  • spec.restartPolicy:应用程序的重启策略。在本例中,如果应用程序失败,它将被重启。应用程序将重启 3 次,每次重启之间间隔 10 秒。如果应用程序提交失败,它将重启 5 次,每次重启之间间隔 20 秒。
  • spec.driver:应用程序的驱动程序配置。在本例中,我们使用 my-release-spark 服务帐户。驱动程序环境变量被设置为从 MinIO 读取和写入数据。
  • spec.executor:应用程序的执行器配置。在本例中,我们使用 3 个执行器,每个执行器有 1 个核心和 1GB 内存。执行器环境变量被设置为从 MinIO 读取和写入数据。

您可以使用以下命令部署应用程序

kubectl apply -f sparkjob-minio.yaml

应用程序部署后,您可以使用以下命令检查应用程序的状态

kubectl get sparkapplications -n spark-operator

您将看到以下输出

NAME          STATUS    ATTEMPTS   START                  FINISH       AGE
spark-minio   RUNNING   1          2023-02-27T18:47:33Z   <no value>   4m4s

应用程序完成后,您可以使用以下命令检查 MinIO 中的输出数据。您可以使用以下命令列出输出目录中的文件

mc ls minio/openlake/spark/output/taxi-data-output

您还可以使用以下命令检查应用程序的日志

kubectl logs -f spark-minio-driver -n spark-operator

您将看到以下输出

23/02/27 19:06:11 INFO FileFormatWriter: Finished processing stats for write job 91dee4ed-3f0f-4b5c-8260-bf99c0b662ba.
2023-02-27 19:06:11,578 - MinioSparkJob - INFO - Total Rows for NYC Taxi Data: 112234626
2023-02-27 19:06:11,578 - MinioSparkJob - INFO - Total Rows for Passenger Count > 6: 1066
2023-02-27 19:06:11,578 - py4j.clientserver - INFO - Closing down clientserver connection
23/02/27 19:06:11 INFO SparkUI: Stopped Spark web UI at http://spark-minio-b8d5c4869440db05-driver-svc.spark-operator.svc:4040
23/02/27 19:06:11 INFO KubernetesClusterSchedulerBackend: Shutting down all executors

您还可以使用 Spark UI 来监控正在运行的应用程序。您可以使用以下命令将 Spark UI 端口转发到外部访问

kubectl port-forward svc/spark-minio-ui-svc 4040:4040 -n spark-operator

在浏览器中,您可以使用以下 URL 访问 Spark UI

https://127.0.0.1:4040

您将看到以下 Spark UI

Spark UI

应用程序完成后,您可以使用以下命令删除应用程序

kubectl delete sparkapplications spark-minio -n spark-operator

部署计划的 Spark 应用程序与部署普通的 Spark 应用程序几乎相同。唯一的区别是您需要在 Spark 应用程序 YAML 文件中添加 spec.schedule 字段,并且 kind 为 ScheduledSparkApplication。您可以将以下应用程序保存为 sparkjob-minio-scheduled.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: ScheduledSparkApplication
metadata:
    name: spark-scheduled-minio
    namespace: spark-operator
spec:
    schedule: "@every 1h" # Run the application every hour
    concurrencyPolicy: Allow
    template:
        type: Python
        pythonVersion: "3"
        mode: cluster
        image: "openlake/sparkjob-demo:3.3.1"
        imagePullPolicy: Always
        mainApplicationFile: local:///app/main.py
        sparkVersion: "3.3.1"
        restartPolicy:
            type: OnFailure
            onFailureRetries: 3
            onFailureRetryInterval: 10
            onSubmissionFailureRetries: 5
            onSubmissionFailureRetryInterval: 20
        driver:
            cores: 1
            memory: "1024m"
            labels:
                version: 3.3.1
            serviceAccount: my-release-spark
            env:
                -   name: AWS_REGION
                    value: us-east-1
                -   name: AWS_ACCESS_KEY_ID
                    value: <Your-MinIO-AccessKey>
                -   name: AWS_SECRET_ACCESS_KEY
                    value: <Your-MinIO-SecretKey>
        executor:
            cores: 1
            instances: 3
            memory: "1024m"
            labels:
                version: 3.3.1
            env:
                -   name: INPUT_PATH
                    value: "s3a://openlake/spark/sample-data/taxi-data.csv"
                -   name: OUTPUT_PATH
                    value: "s3a://openlake/spark/output/taxi-data-output"
                -   name: AWS_REGION
                    valueFrom:
                        secretKeyRef:
                            name: minio-secret
                            key: AWS_REGION
                -   name: AWS_ACCESS_KEY_ID
                    valueFrom:
                        secretKeyRef:
                            name: minio-secret
                            key: AWS_ACCESS_KEY_ID
                -   name: AWS_SECRET_ACCESS_KEY
                    valueFrom:
                        secretKeyRef:
                            name: minio-secret
                            key: AWS_SECRET_ACCESS_KEY
                -   name: ENDPOINT
                    valueFrom:
                        secretKeyRef:
                            name: minio-secret
                            key: ENDPOINT

您可以以与普通 Spark 应用程序相同的方式部署并查看应用程序的结果。上面的 Spark 应用程序将每小时运行一次,并将输出写入相同的存储桶。

本教程的所有源代码都可以在以下 GitHub 存储库中找到:openlake/spark

未来的 Spark-itect

Apache Spark 和 MinIO 是数据湖和分析的强大工具。在 Kubernetes 上运行 Spark 可以为您带来更好的资源管理、容错和 Spark 作业可扩展性的优势。再加上高性能和高度可扩展的 MinIO,您就拥有了可以支持您所有 Spark 工作负载的组合,无论您需要在何处运行它们 - 公共/私有云、数据中心、边缘 - 在您选择的 Kubernetes 平台上。

下载 MinIO 并试用 Spark Operator。如果您有任何问题,请在我们的 Slack 频道上向我们提问。