使用 Spark 管理 Iceberg 表

Apache Iceberg 是一种开源表格式,具有多引擎兼容性,旨在适应大规模分析数据集。多引擎意味着 Spark、Trino、Presto、Hive 和 Impala 都可以同时独立地操作相同的数据。之前,我们发布了 使用 Iceberg 和 MinIO 的湖仓架构权威指南 作为该主题的介绍,包括设计目标和关键特性,以及 深入探讨 Iceberg:表上的 ACID 事务,讨论了如何在 Iceberg 格式中处理数据。
在这篇博文中,我们将构建一个 Notebook,它使用 MinIO 作为 Spark 作业的对象存储来管理 Iceberg 表。如果您尚未在 Kubernetes 环境中设置 spark-operator,请参阅 Spark、MinIO 和 Kubernetes。
Apache Iceberg
Apache Iceberg 是一种开源表格式,允许在云存储系统(如 Amazon S3、Azure Blob Storage、Google Cloud Storage 和 MinIO)中高效存储大型、缓慢变化的数据集。Iceberg 最初由 Netflix 开发,解决了 Apache Parquet 和 Apache ORC 等现有表格式的一些限制。
Iceberg 旨在提供比传统表格式更多的优势,并成为数据湖的首选表格式,以下是一些优势
- 模式演变:数据湖通常以其灵活性和存储各种数据格式的能力为特征。但是,这种灵活性可能使随着时间的推移管理模式更改变得具有挑战性。Iceberg 提供了一种添加、删除或修改表列的方法,而无需对数据进行完全重写,从而更容易随着时间的推移发展模式
- 事务写入:在数据湖中,确保数据准确性和一致性非常重要,尤其是在数据用于业务关键目的时。Iceberg 为写入操作提供对 ACID 事务的支持,确保数据始终处于一致状态
- 查询隔离:数据湖通常由许多用户或应用程序同时使用。Iceberg 允许多个查询同时运行而不会相互干扰,从而可以扩展数据湖的使用而不会牺牲性能
- 时间旅行:在数据湖中,通常需要能够查询数据在特定时间点的样子。Iceberg 提供了一个时间旅行 API,使用户能够查询数据在特定版本或时间戳时的状态,从而更容易分析历史趋势或跟踪随时间推移的变化
- 分区剪枝:数据湖通常包含大量数据,这可能导致查询速度缓慢且资源密集。Iceberg 支持按一列或多列对数据进行分区,这可以通过减少查询期间需要读取的数据量来显著提高查询性能。
Iceberg 可与各种处理引擎和框架一起使用,包括 Apache Spark、Dremio 和 Presto。它也与 Apache Arrow(一种跨语言内存数据格式)集成,后者支持跨不同处理引擎的高效数据序列化和反序列化。
Iceberg 目录
Apache Iceberg 目录是一个元数据存储,其中包含有关表的信息,包括其模式、位置和分区方案。它负责管理表的生命周期,包括创建、更新和删除它们,并提供用于查询元数据和访问数据的 API。
以下是 Apache Iceberg 支持的一些目录
- JDBC 目录
- Hive 目录
- Nessie 目录
- Hadoop 目录
- Glue 目录
- DynamoDB 目录
- REST 目录
为了使本教程更简单,我们将为 Iceberg 表使用 Hadoop 目录。
MinIO 对象存储
MinIO 保证 Iceberg 表的持久性和对这些表上的 Spark 操作的高性能。MinIO 使用加密来保护 Iceberg 表,并根据基于策略的访问控制来限制对它们的访问。MinIO 使用 TLS 加密传输中的数据,并使用现代的行业标准加密算法(如 AES-256-GCM、ChaCha20-Poly1305 和 AES-CBC)对驱动器上的数据进行细粒度的对象级加密。MinIO 与 外部身份提供商(如 ActiveDirectory/LDAP、Okta 和 Keycloak)集成以进行 IAM。然后,用户和组在尝试访问 Iceberg 表时会受到 AWS IAM 兼容的 PBAC 的约束。
Iceberg 表在写入 MinIO 存储桶后会受到以下保护
- 擦除编码 将数据文件拆分为 数据和奇偶校验块 并对其进行编码,以便即使编码数据的一部分不可用,也可以恢复主数据。横向扩展的分布式存储系统跨多个驱动器和节点保存编码数据。如果驱动器或节点发生故障或数据损坏,则可以从保存在其他驱动器和节点上的奇偶校验和数据块中重建原始数据。
- 位腐烂保护 在后台捕获和修复损坏的对象,以消除对数据持久性的这种隐藏威胁
- 存储桶和对象不变性 使用对象锁定、保留和其他治理机制保护保存到 MinIO 的数据免遭删除或修改。写入 MinIO 的对象永远不会被覆盖。
- 存储桶和对象版本控制 提供进一步的对象保护。MinIO 存储 每个对象的每个版本,即使它被删除,除非特定版本被删除。MinIO 的 数据生命周期管理 允许管理员在层之间移动存储桶,例如使用 NVMe 处理性能密集型工作负载,并为版本设置过期日期,以便将其从系统中清除以提高存储效率。
将演示数据导入 MinIO
我们将使用 MinIO 上可用的 NYC Taxi 数据集。您可以从 此处 下载数据集,该数据集大约有 1.12 亿行,大小约为 10GB。您可以使用您选择的任何其他数据集,并使用以下命令将其上传到 MinIO
管理 Iceberg 表的示例 PySpark 应用程序
这基于 Iceberg 入门 Notebook。
构建 Docker 镜像
我们现在将构建包含上述 Python 应用程序的 Docker 镜像。您可以使用以下 Dockerfile 构建镜像
您可以构建自己的 Docker 镜像,或使用 Docker Hub 上提供的预构建镜像 openlake/sparkjob-demo:3.3.2。
部署 Spark Iceberg 应用
我们将构建 Spark 作业 YAML 文件来定义规范,然后将其部署到 Kubernetes 集群中。
您可以使用以下命令部署上述 sparkjob-iceberg.yml 文件
应用部署后,您可以使用以下命令检查应用的状态
您还可以使用以下命令检查应用的日志(由于我们在 Spark 应用中禁用了 INFO 日志,因此在我们的应用日志开始显示之前,您可能看不到太多活动)
应用完成后,您可以使用以下命令删除应用
代码演练
现在我们已经看到了端到端的代码,让我们详细了解一下代码片段。
设置 Iceberg 属性
以上代码片段指示 Spark 使用 Iceberg Spark 会话扩展,并定义名为 demo 的目录作为默认目录,该目录类型为 hadoop,使用 S3FileIO 作为 IO 实现,S3 端点为 https://play.min.io:50000。
创建 Iceberg 表
在以上代码片段中,我们从 Minio https://play.min.io:50000 端点读取 taxi-data.csv 文件,并将其保存为 Iceberg 表 nyc.taxis_large。保存 Iceberg 表后,我们使用 Spark SQL 查询 nyc.taxis_large 以获取存在的记录总数。
模式演变
以上代码演示了通过重命名、更改列类型、添加新列 fare_per_distance 以及基于 fare 和 distance 列的值填充新列来实现模式演变。
从表中删除数据
在上面的代码片段中,我们删除了新字段 fare_per_distance 中为 null 或大于 4.0 的记录,以及 distance 字段大于 2.0 的记录。操作完成后,我们查询快照表,发现生成了 2 个新的快照。我们还获取了总记录数,它比我们开始时的数量少得多(397014 对比 112234626)。
对表进行分区
这段代码使用 VendorID 列创建了一个新的分区。此分区将适用于将来插入的新行,不会影响旧数据。我们还可以在创建 Iceberg 表时添加分区,例如使用以下代码:
元数据表
Iceberg 有元数据表,如快照、文件、历史记录,我们可以查询这些表来了解后台发生的情况。例如,通过查询快照表,我们可以看到创建新快照时执行的操作。文件表提供了存储在 Minio 中的数据文件的信息,例如每个文件中的记录数、文件格式等。在历史记录表中,我们可以获取有关何时使快照成为当前快照以及父快照是谁等的所有信息。
使用快照进行时间旅行
使用捕获特定时间点交易的快照,可以在 Iceberg 中进行时间旅行。在上面的代码中,我们查询历史表以获取曾经创建的第一个快照,并对该 snapshot_id 执行 roll_back_to_snapshot 系统调用。如果在回滚完成后查询该表,我们可以清楚地看到 fare_per_distance 字段为空,并且记录数恢复到 112234626。最后,历史表中有一条新记录,其中包含我们使用的 snapshot_id。
这是我们可以使用 Apache Iceberg 执行的操作的高级概述。它还支持稍后我们可以探索的表审计和维护。Apache Iceberg 还在快照的基础上添加了对标签和分支的支持,这具有巨大的潜力。一旦功能完善,我们将对其进行探索。
Spark 和 Iceberg 在 MinIO 上运行得非常好,这太酷了
Iceberg、Spark 和 MinIO 结合在一起,形成了构建可扩展高性能数据湖的强大技术组合。Iceberg 的开放式表格式和对多种引擎的支持使其成为企业数据湖的绝佳选择。将 MinIO 用于 Iceberg 存储为多云数据湖和分析奠定了坚实的基础。MinIO 包括 主动-主动复制,用于在本地、公共/私有云和边缘位置之间同步数据,从而实现企业所需的功能,例如地理负载均衡和快速热-热故障转移。
立即在 MinIO 上试用 Iceberg。如果您有任何疑问或想分享技巧,请通过我们的 Slack 频道 联系我们,或向 hello@min.io 发送邮件。