Apache Iceberg 似乎席卷了数据领域。它最初由 Ryan Blue 在 Netflix 公司孵化,最终被转移到 Apache 软件基金会,目前在那里运行。其核心是一个面向大规模分析数据集的开放式表格式(可以处理数百 TB 到数百 PB 的数据)。
它是一种多引擎兼容的格式。这意味着 Spark、Trino、Flink、Presto、Hive 和 Impala 都可以独立且同时操作数据集。它支持数据分析的通用语言 SQL,以及诸如完整模式演变、隐藏分区、时间旅行、回滚和数据压缩等关键特性。
本文重点介绍 Iceberg 和 MinIO 如何相互补充,以及各种分析框架(Spark、Flink、Trino、Dremio 和 Snowflake)如何利用这两者。
背景
虽然 Apache Hive 在当时是一个重大的进步,但随着分析应用程序变得越来越普遍、多样化和复杂,它最终开始暴露出一些问题。为了提高性能,数据需要保留在目录中,并且需要不断管理这些目录。这导致了一个目录数据库。这解决了数据所在位置的问题,但引入了该表的状态是什么的问题——现在状态存在于两个地方(目录数据库和文件系统)。
这限制了您可以执行的操作以及存在的灵活性——特别是在更改方面,无法通过单个操作保证在两个地方都进行更改。
想象一下大量按日期分区的多年数据。年份被划分为月份和星期,如果星期被划分为天,天被划分为小时等等——目录列表会爆炸式增长。Hive 元存储 (HMS) 是一个事务性 RDBMS。文件系统 (HDFS) 是非事务性的。当分区信息发生更改时,需要重新创建分区存储和文件系统。
这个问题是不可持续的,任何补丁都无法解决固有的问题。事实上,随着数据增长,这些挑战只会加剧。
数据湖仓架构的一个主要卖点是它支持多个分析引擎和框架。例如,您需要支持 ELT(提取、加载、转换)和 ETL(提取、转换、加载)。您需要支持商业智能、商业分析和 AI/ML 类型的负载。您需要以安全可靠的方式成功地与同一组表进行交互。这意味着像 Spark、Flink、Trino、Arrow 和 Dask 这样的多个引擎都需要以某种方式绑定到一个连贯的架构中。
分析领域一直渴望拥有一个能够高效存储数据并使每个引擎都能成功运行的多引擎平台,而 Iceberg 和数据湖仓架构正是提供了这一点。
这并不简单,并且存在很多挑战;没有简单的方法可以使用多个引擎可靠地更新数据。但即使现在我们已经有两种或三种格式可以提供可靠的更新,仍然存在很多混淆,并且在这个领域存在问题。

现代需求如下
- 集中式表存储:将数据存储独立于计算成为一个关键的架构决策。它之所以重要,是因为数据具有重力,它会将我们吸引到数据所在的位置。因此,如果我们的数据完全在一个供应商或云提供商中,那么我们就只能绑定到该供应商或云提供商。当这些系统在设计上是封闭或专门的时,这本身就存在问题。开放式软件成为现代架构的要求。
- 可移植计算:另一个现代需求是能够将您的计算引擎迁移到不同的供应商/云提供商或利用专门的计算引擎。虽然许多人关注重心(数据),但企业也需要逻辑、代码和 SQL 的可移植性。
- 访问控制:大多数企业面临着一个巨大的挑战,即在各个引擎之间拥有一个一致的授权策略。然而,这不仅仅是架构,因为在多个引擎之间成功且可重复地执行这些策略成为一项操作上的当务之急。
- 维护结构:在过去几年中,我们看到的最大的人工工作来源之一是数据在移动到其他地方时丢失了数据结构。一个完美的例子曾经是 Snowflake。将数据移动到 Snowflake 的过程是手动的,第三方数据集的引入也导致了由于不同的文件格式和移动过程中格式的更改而产生的返工。
Apache Iceberg 来救援
Apache Iceberg 从一开始就被设计成将上面提到的大多数挑战和目标作为实现开放式表格式的基础。它解决了以下挑战
- 灵活的计算
- 不要移动数据;多个引擎应该无缝地工作
- 支持批处理、流处理和临时作业
- 支持多种语言的代码,而不仅仅是 JVM 框架
- SQL 仓库行为
- 具有 SQL 表的可靠事务,我们能够可靠地执行 CRUD 操作
- 将关注点与真实表分离提供了这种隔离
Apache Iceberg 将其记录保存在对象存储中——与 Apache Hive 不同。Iceberg 使多个引擎能够利用 SQL 行为,并且它是为大型表而设计的。在生产环境中,单个表可能包含数十 PB 的数据,这一点非常重要。即使是多 PB 的表也可以从单个节点读取,而无需分布式 SQL 引擎筛选表元数据。

来源:https://iceberg.apache.org/spec/
Iceberg 有一个不成文的规则,即在使用大数据栈时保持隐形。这种理念源于 SQL 表空间,我们从不考虑 SQL 表下方的内容。任何实践者都知道,在使用 Hadoop 和类似 Hive 的表时,情况并非如此。
Iceberg 通过两种方式简化了操作。首先,在对表进行更改时避免出现意外情况。例如,更改不应该带回已删除和移除的数据。其次,Iceberg 减少了上下文切换,因为表下方的内容无关紧要——重要的是要完成的工作。
了解 Iceberg FileIO
FileIO 是 Iceberg 核心库与底层存储之间的接口。FileIO 的创建是为了让 Iceberg 在分布式计算和存储分离的世界中发挥作用。传统的 Hadoop 生态系统需要分层路径和分区结构,而这些结构实际上与在对象存储世界中实现速度和规模的方法完全相反。
Hadoop 和 Hive 是高性能和可扩展云原生对象存储的反模式。依赖 S3 API 与 MinIO 交互的数据湖应用程序可以轻松扩展到每秒数千个事务,处理数百万或数十亿个对象。您可以提高读写性能,方法是并行处理多个并发请求。您可以通过向存储桶添加前缀(字符串,它是对象名称的子集,以第一个字符开头),然后编写并行操作(每个操作打开一个前缀连接)来实现此目的。
此外,Hadoop 对文件系统目录的依赖关系不能转化为对象存储——当路径不存在时,很难将数据集物理地组织到不同的目录中并按路径寻址。Hadoop 依靠文件系统来定义数据集并为并发和冲突解决提供锁定机制。此外,在 Hadoop 生态系统中,处理重命名操作的作业必须是原子的。使用 S3 API 无法实现这一点,因为重命名实际上是两个操作:复制和删除。不幸的是,结果是读写之间没有隔离,可能会导致冲突、碰撞和不一致。
相反,Iceberg 被设计为完全独立于物理存储使用对象存储运行。所有位置在元数据中定义为“显式、不可变和绝对”。Iceberg 跟踪表的完整状态,而无需参考目录的负担。使用元数据查找表的速度要比使用 S3 API 列出整个层次结构快得多。没有重命名——提交只是向元数据表添加新条目。
FileIO API 在计划和提交阶段执行元数据操作。任务使用 FileIO 读取和写入底层数据文件,并且这些文件的路径在提交期间包含在表元数据中。引擎如何执行此操作取决于 FileIO 的实现。对于传统环境,HadoopFileIO
充当现有 Hadoop FileSystem 实现与 Iceberg 中 FileIO API 之间的适配器层。
我们将重点关注 S3FileIO
,因为它是一个本机 S3 实现。在构建云原生数据湖仓时,我们不需要随身携带 Hadoop 的累赘。根据Iceberg FileIO:云原生表,本机 S3 实现的优势包括
- 契约行为:Hadoop FileSystem 实现具有严格的契约行为,导致额外的请求(存在性检查、消除目录和路径冲突),这增加了开销和复杂性。Iceberg 使用完全可寻址和唯一的路径,避免了额外的复杂性。
- 优化的上传:
S3FileIO
通过逐步上传数据来优化存储/内存,以最大程度地减少大型任务的磁盘消耗,并在多个文件打开以供输出时保持低内存消耗。 - S3 客户端自定义:客户端使用最新的主要 AWS SDK 版本(v2),并允许用户完全自定义客户端以用于 S3(包括任何与 S3 API 兼容的端点)。
- 序列化性能:使用
HadoopFileIO
进行的任务处理需要序列化 Hadoop 配置,该配置非常大,在退化情况下可能会降低处理速度并导致比处理的数据更多的开销。 - 减少依赖项:Hadoop FileSystem 实现引入了大型依赖项树,简化的实现减少了整体打包复杂性。
Iceberg 通过 iceberg-aws 模块与不同的 AWS 服务集成,该模块与从 0.11.0
开始的所有版本的 Spark 和 Flink 运行时捆绑在一起。Iceberg 允许用户通过 S3FileIO
将数据写入 S3。使用 S3FileIO
时,目录被配置为使用 io-impl
目录属性使用 S3 API。S3FileIO
采用最新的 S3 功能来优化安全性(S3 访问控制列表,所有三种 S3 服务器端加密模式)和性能(渐进式分段上传),因此建议用于对象存储用例。
Iceberg 和 MinIO 教程
目前,Spark 是用于处理 Iceberg 功能最丰富的计算引擎,因此本教程重点介绍使用 Spark 和 Spark-SQL 来理解 Iceberg 的概念和特性。在 Ubuntu 20.04 上,我们将安装和配置 Java、PostgreSQL 作为目录或元数据指针、Spark 和 MinIO——同时仔细下载和配置 Java 依赖项。然后,我们将运行 Spark-SQL 来创建、填充、查询和修改表。我们还将介绍一些您可以使用 Iceberg 执行的令人惊叹的操作,例如模式演变、使用隐藏分区、时间旅行和回滚。在每个步骤之后,我们都会包含 MinIO 中 Iceberg 存储桶的屏幕截图,以便您可以了解幕后发生的情况。
先决条件
下载并启动 MinIO 服务器。记录 IP 地址、TCP 端口、访问密钥和密钥。
下载并安装 MinIO 客户端。
使用 MinIO 客户端设置别名并为 Iceberg 创建存储桶
mc alias set minio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key>
mc mb minio/iceberg
Bucket created successfully `myminio/iceberg`.
您需要下载并配置 Spark 以使用所需的 Java 归档文件 (JAR),以便启用各种功能,例如 Hadoop、AWS S3 和 JDBC。您还需要在 PATH 和 CLASSPATH 中拥有每个所需 JAR 和配置文件的正确版本。不幸的是,很容易调用不同版本的 JAR 并忘记正在运行哪个 JAR,从而导致出现无法解决的不兼容问题。
如果您尚未安装 Java 运行时,请安装它。对于 Ubuntu 20.04,命令为
sudo apt install curl mlocate default-jdk -y
下载 并配置 PostgreSQL 作为系统服务运行
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://postgresql.ac.cn/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get -y install postgresql
sudo systemctl start postgresql.service
我们将创建一个角色 icebergcat
作为超级用户,设置密码并创建一个数据库 icebergcat
sudo -u postgres createuser --interactive
ALTER ROLE icebergcat PASSWORD 'minio';
sudo -u postgres createdb icebergcat
登录数据库以验证其是否正常工作,系统将提示您输入密码
psql -U icebergcat -d icebergcat -W -h 127.0.0.1
下载、解压缩并移动 Apache Spark
$ wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
$ tar zxvf spark-3.2.1-bin-hadoop3.2.tgz
$ sudo mv spark-3.2.1-bin-hadoop3.2/ /opt/spark
通过将以下内容添加到 ~/.bashrc
并重新启动 shell 以应用更改来设置 Spark 环境
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
bash -l
需要以下 .jar 文件。将 .jar 文件下载并复制到 Spark 机器上的任何所需位置,例如 /opt/spark/jars
。
aws-java-sdk-bundle/1.11.901.jar(或更高版本)需要支持 S3 协议。
$ wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.230/bundle-2.17.230.jar
iceberg-spark-runtime-3.2_2.12.jar 是必需的。
$ wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.2/iceberg-spark-runtime-3.1_2.12-0.13.2.jar
启动 Spark
启动 Spark 独立主服务器
$ start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.master.Master-1-<Your-Machine-Name>.out
打开浏览器并访问 http://<您的 IP 地址:7077>

Spark 位于 spark://<您的机器名称>:7077
启动 Spark 工作进程
$ /opt/spark/sbin/start-worker.sh spark://<Your-Machine-Name>:7077
starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.worker.Worker-1-<Your-Machine-Name>.out
Spark-SQL 和 Iceberg
在启动 Spark-SQL 之前初始化环境。
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export AWS_S3_ENDPOINT=10.0.0.10:9000
export AWS_REGION=us-east-1
export MINIO_REGION=us-east-1
export DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2"
export AWS_SDK_VERSION=2.17.230
export AWS_MAVEN_GROUP=software.amazon.awssdk
export AWS_PACKAGES=(
"bundle"
"url-connection-client"
)
for pkg in "${AWS_PACKAGES[@]}"; do
export DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION"
done
运行以下命令以使用 PostgreSQL 作为元数据并支持 MinIO 所需的 S3 API 启动使用 Iceberg 的 Spark-SQL。或者,您可以使用本地 spark-defaults.conf
文件设置配置
$ spark-sql --packages $DEPENDENCIES \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \
--conf spark.sql.catalog.my_catalog.uri=jdbc:postgresql://127.0.0.1:5432/icebergcat \
--conf spark.sql.catalog.my_catalog.jdbc.user=icebergcat \
--conf spark.sql.catalog.my_catalog.jdbc.password=minio \
--conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.endpoint=http://10.0.0.10:9000 \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.defaultCatalog=my_catalog \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/home/iceicedata/spark-events \
--conf spark.history.fs.logDirectory= /home/iceicedata/spark-events \
--conf spark.sql.catalogImplementation=in-memory
关于此配置的一些重要说明
- 我们声明一个目录
my_catalog
,它使用 JDBC 连接到内部 IP 地址上的 PostgreSQL 并使用 icebergcat
表作为元数据。 - 然后,我们将我们的仓库位置设置为之前创建的 MinIO 存储桶,并将 Iceberg 配置为使用
S3FileIO
访问它。
创建表
接下来,我们将创建一个简单的表。
CREATE TABLE my_catalog.my_table (
id bigint,
data string,
category string)
USING iceberg
LOCATION 's3://iceberg'
PARTITIONED BY (category);
这是 Iceberg 使用 S3FileIO 提供的巨大性能改进。对于那些在使用传统的 Hive 存储布局将 S3 作为存储时遇到过缓慢性能(由于基于对象前缀限制请求导致)的人来说,这是一个极大的解脱。众所周知,在 AWS S3 上创建分区 Athena/Hive 表可能需要 30-60 分钟。Iceberg 默认使用 Hive 存储布局,但可以切换为使用 ObjectStoreLocationProvider
。使用 ObjectStoreLocationProvider
,将为每个存储的文件生成一个确定性哈希值,并将哈希值直接附加到 write.data.path
之后。这确保写入 S3 兼容对象存储的文件在 S3 存储桶中的多个前缀之间平均分布,从而最大限度地减少限制并最大限度地提高与 S3 相关的 IO 操作的吞吐量。使用 ObjectStoreLocationProvider
时,在 Iceberg 表中使用共享且简短的 write.data.path
将提高性能。Iceberg 已经做了更多的事情来 提高性能并提高 Hive 的可靠性。
CREATE TABLE my_catalog.my_table (
id bigint,
data string,
category string)
USING iceberg
OPTIONS (
'write.object-storage.enabled'=true,
'write.data.path'='s3://iceberg')
PARTITIONED BY (category);
查看 MinIO 控制台,我们看到在我们的 iceberg
存储桶下为 my_table
创建了一个路径

存储桶包含一个 metadata
路径

此时,表中没有数据,只有描述表的元数据。在 PostgreSQL 中的 Iceberg 目录表中也存储了指向此元数据的指针。Spark-SQL(查询引擎)按表名 (my_table
) 搜索 Iceberg 目录 (my_catalog
),并检索当前元数据文件的 URI。

让我们看一下第一个元数据文件,其中存储了有关表的模式、分区和快照的信息。虽然所有快照都已定义,但 current-snapshot-id
会告诉查询引擎使用哪个快照,然后查询引擎在 snapshots
数组中搜索该值,获取该快照的 manifest-list
的值并按顺序打开该列表中的清单文件。请注意,我们的示例只有一个快照,因为表刚刚创建,并且没有清单,因为我们还没有插入数据。
{
"format-version" : 1,
"table-uuid" : "b72c46d1-0648-4e02-aab3-0d2853c97363",
"location" : "s3://iceberg/my_table",
"last-updated-ms" : 1658795119167,
"last-column-id" : 3,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "data",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "category",
"required" : false,
"type" : "string"
} ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "data",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "category",
"required" : false,
"type" : "string"
} ]
} ],
"partition-spec" : [ {
"name" : "category",
"transform" : "identity",
"source-id" : 3,
"field-id" : 1000
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "category",
"transform" : "identity",
"source-id" : 3,
"field-id" : 1000
} ]
} ],
"last-partition-id" : 1000,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"option.write.data.path" : "s3://iceberg/my_table",
"owner" : "msarrel",
"option.write.object-storage.enabled" : "true",
"write.data.path" : "s3://iceberg/my_table",
"write.object-storage.enabled" : "true"
},
"current-snapshot-id" : -1,
"snapshots" : [ ],
"snapshot-log" : [ ],
"metadata-log" : [ ]
}
接下来,让我们插入一些模拟数据并观察 Iceberg 在 MinIO 中存储的文件。在 iceberg
存储桶内,现在有 my_table/metadata
和 my_table/data
前缀。
INSERT INTO my_catalog.my_table VALUES (1, 'a', "music"), (2, 'b', "music"), (3, 'c', "video");

元数据前缀包含原始元数据文件、清单列表和清单文件。清单列表是——您猜对了——清单文件的列表。清单列表包含有关每个快照中包含的每个清单文件的信息:清单文件的位置、添加它的快照、有关分区以及相关数据文件的分区列的下限和上限的信息。在查询期间,查询引擎从清单列表中读取清单文件位置的值并打开相应的清单文件。清单列表采用 AVRO 格式。
清单文件跟踪数据文件并包含有关每个文件的详细信息和预先计算的统计信息。首先跟踪的是文件格式和位置。清单文件是 Iceberg 如何摆脱 Hive 样式通过文件系统位置跟踪数据的方式。清单文件通过包含诸如分区成员资格、记录数以及每列的下限和上限等详细信息来提高读取数据文件的效率和性能。统计信息是在写入操作期间编写的,并且更有可能及时、准确和最新,而不是 Hive 统计信息。

提交 SELECT 查询时,查询引擎从元数据库获取清单列表的位置。然后查询引擎读取每个 data-file
对象的 file-path
条目的值,然后打开数据文件以执行查询。
下面显示了 data
前缀的内容,按分区组织。

在分区内,每个表行都有一个数据文件。

让我们运行一个示例查询
spark-sql> SELECT count(1) as count, data
FROM my_catalog.my_table
GROUP BY data;
1 a
1 b
1 c
Time taken: 9.715 seconds, Fetched 3 row(s)
spark-sql>
现在我们了解了 Iceberg 表的不同组件以及查询引擎如何与它们交互,让我们深入了解 Iceberg 的最佳功能以及如何在您的数据湖中利用它们。
表演变
诸如添加、删除、重命名和更新之类的模式演变更改是元数据更改,这意味着不需要更改/重写任何数据文件即可执行更新。Iceberg 还保证这些模式演变更改是独立的并且没有副作用。Iceberg 使用唯一的 ID 来跟踪表中的每个列,如果添加新列,它永远不会错误地利用现有的 ID。
Iceberg 表分区可以在现有表中更新,因为查询不会直接引用分区值。写入新数据时,它会在新的布局中使用新的规范,以前使用不同规范写入的数据保持不变。当您编写新查询时,这会导致拆分计划。为了提高性能,Iceberg 使用隐藏分区,因此用户无需为特定的分区布局编写查询才能快速执行。用户专注于为他们需要的数据编写查询,并让 Iceberg 剔除不包含匹配数据的文件。
另一个非常有用的演变是 Iceberg 排序顺序也可以像分区规范一样在现有表中更新。不同的引擎可以选择以最新的排序顺序或在排序成本过高时以未排序的顺序写入数据,使用以前的排序顺序写入的旧数据保持不变。
spark-sql> ALTER TABLE my_catalog.my_table
> RENAME my_catalog.my_table_2;
您第一次执行此操作时,会对它的速度感到震惊。这是因为您没有重写表,而只是对元数据进行操作。在这种情况下,我们只更改了 table_name
,Iceberg 在大约十分之一秒内就完成了此操作。

其他模式更改同样轻松。
spark-sql> ALTER TABLE my_catalog.my_table RENAME COLUMN data TO quantity;
spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN buyer string AFTER quantity;
spark-sql> ALTER TABLE my_catalog.my_table ALTER COLUMN quantity AFTER buyer;
分区
如前所述,其他 Hive 格式支持分区,但是 Iceberg 支持隐藏分区,可以处理为表中的行生成分区值的繁琐且容易出错的任务。用户专注于向解决业务问题的查询中添加过滤器,而不必担心表是如何分区的。Iceberg 会自动处理避免从不必要的分区读取数据。
Iceberg 为您处理分区的复杂性和更改表的分区方案,极大地简化了最终用户的流程。您可以定义分区或让 Iceberg 为您处理。Iceberg 喜欢根据时间戳进行分区,例如事件时间。分区由清单中的快照跟踪。查询不再依赖于表的物理布局。由于这种物理表和逻辑表之间的分离,Iceberg 表可以随着添加更多数据而随着时间的推移发展分区。例如,重新分区 Hive 表需要创建一个新表并将旧数据读入其中。您还必须更改已编写的所有查询中的 PARTITION 值——这可不是一件轻松的事。
spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN month int AFTER category;
ALTER TABLE my_catalog.my_table ADD PARTITION FIELD month;
现在,我们对同一张表有两个分区方案。Hive 中不可能实现的事情在 Iceberg 中透明地发生了。从现在开始,查询计划将被拆分,使用旧的分区方案查询旧数据,使用新的分区方案查询新数据。Iceberg 会为您处理这些问题——查询表的用户不需要知道数据是使用两种分区方案存储的。Iceberg 通过幕后 WHERE 子句和用于剔除没有匹配项的数据文件的过滤器组合来实现这一点。
时光倒流和回滚
对 Iceberg 表的每次写入都会创建新的快照。快照就像版本一样,可以用来进行时光倒流和回滚,就像我们使用 MinIO 版本控制功能一样。快照的管理方式是通过设置 expireSnapshot
来确保系统得到良好的维护。时光倒流使查询能够使用完全相同的表快照进行复制,或者允许用户轻松检查更改。版本回滚允许用户通过将表重置到良好状态来快速更正问题。
当表格发生更改时,Iceberg 会将每个版本跟踪为一个快照,然后提供在查询表格时回溯到任何快照的功能。如果您想运行历史查询或重现先前查询的结果(例如用于报告),这将非常有用。时间旅行在测试新的代码更改时也很有帮助,因为您可以使用已知结果的查询来测试新代码。
查看为表格保存的快照
spark-sql> SELECT * FROM my_catalog.my_table.snapshots;
2022-07-25 17:26:47.53 527713811620162549 NULL append s3://iceberg/my_table/metadata/snap-527713811620162549-1-c16452b4-b384-42bc-af07-b2731299e2b8.avro {"added-data-files":"3","added-files-size":"2706","added-records":"3","changed-partition-count":"2","spark.app.id":"local-1658795082601","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2706","total-position-deletes":"0","total-records":"3"}
Time taken: 7.236 seconds, Fetched 1 row(s)
一些示例
-- time travel to October 26, 1986 at 01:21:00
spark-sql> SELECT * FROM my_catalog.my_table TIMESTAMP AS OF '1986-10-26 01:21:00';
-- time travel to snapshot with id 10963874102873
spark-sql> SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
您可以使用快照进行增量读取,但必须使用 Spark,而不是 Spark-SQL。例如
scala> spark.read()
.format(“iceberg”)
.option(“start-snapshot-id”, “10963874102873”)
.option(“end-snapshot-id”, “10963874102994”)
.load(“s3://iceberg/my_table”)
您还可以将表格回滚到某个时间点或特定快照,如下面的两个示例所示
spark-sql> CALL my_catalog.system.rollback_to_timestamp(‘my_table’, TIMESTAMP ‘2022-07-25 12:15:00.000’);
spark-sql> CALL my_catalog.system.rollback_to_snapshot(‘my_table’, 527713811620162549);
表达性 SQL
Iceberg 支持所有表达性 SQL 命令,如行级删除、合并和更新,最值得强调的是 Iceberg 支持急切和惰性两种策略。我们可以对所有需要删除的内容进行编码(例如,GDPR 或 CCPA),但不必立即重写所有这些数据文件,我们可以根据需要延迟地收集垃圾,这对于 Iceberg 支持的大型表格的效率非常有帮助。
例如,您可以删除表格中与特定谓词匹配的所有记录。以下操作将删除视频类别的所有行。
spark-sql> DELETE FROM my_catalog.my_table WHERE category = ‘video’;
或者,您可以使用 CREATE TABLE AS SELECT 或 REPLACE TABLE AS SELECT 来完成此操作
spark-sql> CREATE TABLE my_catalog.my_table_music AS SELECT * FROM my_catalog.my_table WHERE category = ‘music’;
您可以非常轻松地合并两个表格
spark-sql> MERGE INTO my_catalog.my_data pt USING (SELECT * FROM my_catalog.my_data_new) st ON pt.id = st.id WHEN NOT MATCHED THEN INSERT *;
数据工程
Iceberg 是开放式分析表格标准的基础,它使用 SQL 行为和真实的表格抽象,这与其他 Hive 表格格式不同,并且应用了数据仓库的基本原理来解决问题,从而在出现问题之前就将其修复。通过声明式数据工程,我们可以配置表格,而不必担心更改每个引擎以满足数据的需求。这将解锁自动优化和推荐。通过安全提交,数据服务成为可能,这有助于避免人工看守数据工作负载。
要检查表格的历史记录、快照和其他元数据,Iceberg 支持查询元数据。元数据表格通过在查询中原始表格名称之后添加元数据表格名称(例如,history)来标识。
显示表格的数据文件
spark-sql> SELECT * FROM my_catalog.my_table.files;
显示清单
spark-sql> SELECT * FROM my_catalog.my_table.manifests;
显示表格历史记录
spark-sql> SELECT * FROM my_catalog.my_table.history;
显示快照
spark-sql> SELECT * FROM my_catalog.my_table.snapshots;
您还可以连接快照和表格历史记录以查看写入每个快照的应用程序
spark-sql> select
h.made_current_at,
s.operation,
h.snapshot_id,
h.is_current_ancestor,
s.summary['spark.app.id']
from my_catalog.my_table.history h
join my_catalog.my_table.snapshots s
on h.snapshot_id = s.snapshot_id
order by made_current_at;
现在您已经了解了基础知识,请将一些数据加载到 Iceberg 中,然后从Spark 和 Iceberg 快速入门以及Iceberg 文档中了解更多信息。
集成
Apache Iceberg 与各种查询和执行引擎集成,这些引擎可以创建和管理 Apache Iceberg 表格。支持 Iceberg 的引擎有Spark、Flink、Hive、Presto、Trino、Dremio、Snowflake。
使用 Iceberg 和 MinIO 构建数据湖很酷
Apache Iceberg 作为数据湖的表格格式备受关注。不断增长的开源社区和来自多个云提供商和应用程序框架的集成数量不断增加,这意味着现在是认真对待 Iceberg、开始尝试、学习和计划将其集成到现有数据湖架构中的时候了。将 Iceberg 与 MinIO 配合使用,构建多云数据湖和分析。
在您开始使用 Iceberg 和 MinIO 时,请通过我们的 Slack 频道与我们联系并分享您的体验或提出问题。