MinIO 企业存储上的 Iceberg ACID 事务

Apache Iceberg 数据湖存储格式允许对存储在MinIO 企业版存储中的表格进行 ACID 事务。ACID 事务允许多个用户和服务并发且可靠地原子地添加和删除记录。同时,查询被隔离以保持对正在被更改的表格的读一致性。您可以使用 MinIO 企业版存储和 Iceberg 以及 PostgreSQL 作为元数据数据库,利用 ACID 事务支持进行写入、删除、更新、时间旅行和模式修改。
好消息是,您可以继续使用您已知且喜爱的 SQL 和 DML。例如,您可以使用 UPDATE 语句更新行中的新信息,或使用 DELETE 语句删除行。即使时间旅行也是一个 SELECT 语句。
将 Iceberg 表格格式与 MinIO 企业版存储配对,可以创建一个强大、灵活且可扩展的湖仓平台。 Iceberg 表格规范声明了一种表格格式,旨在管理存储在分布式系统中的“大量、缓慢变化的集合”文件或对象。Iceberg 规范版本 1 定义了使用不可变文件格式(Parquet、Avro 和 ORC)管理大型分析表格。规范版本 2 为具有不可变文件的分析表格添加了行级更新和删除。
Iceberg 表格格式跟踪表格中的各个数据文件,而不是目录。数据文件是在本地创建的,并且只能显式地将文件添加到表格中。表格状态保存在元数据中。Iceberg 元数据和清单列表仅仅是存储在 MinIO 企业版存储中的对象,它也维护自己的元数据,这些元数据与对象一起存储。表格状态的每次更改都需要创建一个新的 Iceberg 元数据文件,该文件用原子交换替换旧元数据。原子性由 MinIO 企业版存储启用。
Iceberg 表格格式要求
- 就地写入:文件/对象一旦写入就不能移动或更改
- 可搜索读取:数据文件格式需要支持搜索
- 删除:表格删除不再需要的文件/对象(或者,在 MinIO 企业版存储的情况下,对象被标记为已删除,但会被保留)
这些要求与 MinIO 企业版存储等对象存储兼容。数据和元数据文件一旦写入,在被删除之前就是不可变的。MinIO 企业版存储继续保存数据和元数据对象的过时版本,确保数据永远不会被删除,并扩展 Iceberg 的时间旅行功能。
在本博文中,我们将深入研究 Iceberg 的 ACID 事务,以创建 Iceberg 表格,更新和删除其中的记录,并演化其模式。这篇文章是对 使用 Iceberg 和 MinIO 的湖仓架构终极指南的后续文章,其中包括对湖仓架构的解释,对 Spark、Iceberg、PostgreSQL 和 MinIO 如何协同工作进行了深入讨论,并提供了一个教程,教读者如何安装它们、创建表格、演化表格模式以及如何使用时间旅行和回滚。
使用 Iceberg 和 MinIO 的 ACID 事务教程
我正在使用托管在 UCI 机器学习资料库的 在线零售数据集。这是一个事务数据集,包含了英国注册的非门店在线零售业务在 2010 年 1 月 12 日至 2011 年 9 月 12 日期间发生的全部交易。数据集包含约 550,000 条记录,具有 8 个属性
- InvoiceNo:发票编号。名义,一个 6 位的整数,唯一分配给每个交易。如果此代码以字母“c”开头,则表示取消。
- StockCode:产品(商品)代码。名义,一个 5 位的整数,唯一分配给每个不同的产品。
- Description:产品(商品)名称。名义。
- Quantity:每笔交易中每个产品(商品)的数量。数字。
- InvoiceDate:发票日期和时间。数字,生成每笔交易的日期和时间。
- UnitPrice:单价。数字,以英镑计的产品单价。
- CustomerID:客户编号。名义,一个 5 位的整数,唯一分配给每个客户。
- Country:国家名称。名义,每个客户居住的国家名称。
该文件采用 Microsoft Excel 格式,因此第一步是将其转换为 .CSV 格式,以便可以将其读取到 Spark 中作为数据帧,然后保存到 Iceberg 中。将文件转换为 .CSV 格式后,我将其保存到用于存储原始数据的 MinIO 存储桶中。
我正在使用先前教程中的 Iceberg 安装程序, 使用 Iceberg 和 MinIO 的湖仓架构终极指南。如果您没有阅读过该博客文章,请参考该文章或自行设置 Iceberg 和 MinIO 环境。
我们将使用 Spark 和 Spark-SQL 来处理 Iceberg。
读取数据并保存为 Iceberg 表格
我们要完成的第一个任务是将 .CSV 文件读取到 Spark 中作为数据帧,然后将其保存到 Iceberg 中。
仅仅 3 行代码就包含了很多内容,所以让我们先简要概述一下 Spark 如何处理数据。 Spark 是一种用于大规模数据处理的统一分析引擎,它在 Java、Scala、Python 和 R 中提供了一组高级 API。Spark 可以使用 HDFS 和 YARN 的传统库,也可以使用对象存储(例如 MinIO)通过 S3 API 运行。Spark-SQL 是一个用于结构化数据处理的 Spark 模块。它使用与 Spark 相同的执行引擎,Spark 和 Spark-SQL 的组合为分析提供了强大的工具包。
数据帧是按列组织的 dataset(数据集)。数据帧可以看作关系型数据库中的表格,它对 Spark 引擎有一些内置的优化。数据帧可以从各种来源构建,我们将从 .CSV 文件构建数据帧,将我们的零售数据组织成列和行。
Spark 将读取 .CSV 文件并将其组织成一个数据帧,作为一个临时视图。Spark 从 SELECT 语句创建临时视图。这些视图是会话范围的,并且是只读的。然后,我们将临时视图写入 Iceberg 表格,让 Spark 自动为我们创建模式。
val df spark.read.csv("online-retail.csv")
df.createOrReplaceTempView("tempview");
spark.sql("CREATE or REPLACE TABLE retail USING iceberg AS SELECT * FROM tempview");
数据集不是很大,因此读取 .CSV 文件并写入 Iceberg Parquet 文件所花费的时间很少。
在 SparkSQL 中快速查询可以验证数据是否已写入 Iceberg 表格
spark-sql> SELECT * from local.retail limit 10;
22/08/24 18:08:56 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
InvoiceNo StockCode Description Quantity InvoiceDate UnitPrice CustomerID Country
536365 85123A WHITE HANGING HEART T-LIGHT HOLDER 6 12/1/2010 8:26 2.55 17850 United Kingdom
536365 71053 WHITE METAL LANTERN 6 12/1/2010 8:26 3.39 17850 United Kingdom
536365 84406B CREAM CUPID HEARTS COAT HANGER 8 12/1/2010 8:26 2.75 17850 United Kingdom
536365 84029G KNITTED UNION FLAG HOT WATER BOTTLE 6 12/1/2010 8:26 3.39 17850 United Kingdom
536365 84029E RED WOOLLY HOTTIE WHITE HEART. 6 12/1/2010 8:26 3.39 17850 United Kingdom
536365 22752 SET 7 BABUSHKA NESTING BOXES 2 12/1/2010 8:26 7.65 17850 United Kingdom
536365 21730 GLASS STAR FROSTED T-LIGHT HOLDER 6 12/1/2010 8:26 4.25 17850 United Kingdom
536366 22633 HAND WARMER UNION JACK 6 12/1/2010 8:28 1.85 17850 United Kingdom
536366 22632 HAND WARMER RED POLKA DOT 6 12/1/2010 8:28 1.85 17850 United Kingdom
耗时:7.944 秒,已提取 10 行
在 MinIO 中,我们也可以看到数据文件已被保存到 iceberg 存储桶中
更新 Iceberg 表格中的记录
我们将使用 SELECT 语句,然后是 UPDATE 语句来更新表格。更新和删除是在 OLTP 数据库(如在线零售数据集)中常见的操作,而 Iceberg 表格格式将此功能引入数据湖。Iceberg 能够执行行级更新并维护数据一致性。
我们将更新零售表以更改特定发票的购买数量。想象一下,在现实世界中发生这种变化很容易,例如,客户可以返回网站并购买更多商品以包含在原始订单中。我们将为发票号 (retail._c0) 559340 和库存代码 (retail._c1) 22413 (METAL SIGN TAKE IT OR LEAVE IT) 的数量 (retail._c3) 添加一个单位。
首先,查询发票和商品
SELECT retail.c0,retail.c1,retail.c3 FROM retail where retail._c0='559340' and retail._c1='22413';
然后使用 UPDATE 语句进行更新
UPDATE retail SET retail._c3='7' WHERE retail._c0='559340' and retail._c1='22413';
最后,验证 UPDATE 是否已执行
SELECT retail._c0, retail._c1, retail._c2, retail._c3 FROM retail WHERE retail._c0='559340' and retail._c1='22413';
您应该看到以下输出,数量 (retail._c3) 已更改为 7
559340 22413 METAL SIGN TAKE IT OR LEAVE IT 7
耗时:0.464 秒,获取 1 行
从 Iceberg 表中删除记录
ACID 属性使数据湖中的 Iceberg 表格删除操作成为可能。
想象一下,客户请求从我们的零售数据湖中删除其用户数据。为了遵守 GDPR,我们必须立即找到并删除这些数据。已经收到删除与客户 13269 相关的所有记录的请求。
首先,让我们计算与该客户相关的记录数量
SELECT count(*) FROM retail WHERE retail._c6=’13269’;
320
耗时:0.242 秒,获取 1 行
我们将从表中删除这 320 条记录
DELETE FROM retail WHERE retail._c6='13269';
验证记录是否已删除。以下查询应报告 0 条记录
SELECT count(*) FROM retail WHERE retail._c6='13269';
演化 Iceberg 表的模式
我们让 Iceberg 在我们最初将 Spark 数据帧保存为 Iceberg 表格时自动创建表格模式,导致了一些不太直观的列名,_c0 到 _c7。我们将更改这些名称,使其更易于阅读,例如,_c0 将重命名为 InvoiceNo。
Iceberg 模式更新仅更改元数据。可以添加、删除、重命名、更新和重新排序列,而无需完全重写表格,这是一个昂贵的提议。
让我们看看当前的模式
SHOW CREATE TABLE retail;
CREATE TABLE iceberg.retail (
_c0 STRING,
_c1 STRING,
_c2 STRING,
_c3 STRING,
_c4 STRING,
_c5 STRING,
_c6 STRING,
_c7 STRING)
USING iceberg
LOCATION 'S3://iceberg/retail'
TBLPROPERTIES (
'current-snapshot-id' = '6565073876818863127',
'format' = 'iceberg/parquet',
'format-version' = '1')
耗时:0.082 秒,获取 1 行
让我们将列重命名为更易于阅读的名称。当您执行每个命令时,请注意 Iceberg 以多快的速度执行元数据更改 - 我们谈论的是更改元数据的毫秒数,而写入整个表格需要大约一分钟。
```ALTER TABLE retail RENAME COLUMN _c1 TO StockCode;
ALTER TABLE retail RENAME COLUMN _c2 TO Description;
ALTER TABLE retail RENAME COLUMN _c3 TO Quantity;
ALTER TABLE retail RENAME COLUMN _c4 TO InvoiceDate;
ALTER TABLE retail RENAME COLUMN _c5 TO UnitPrice;
ALTER TABLE retail RENAME COLUMN _c6 TO CustomerID;
ALTER TABLE retail RENAME COLUMN _c7 TO Country;
让我们验证表格模式是否已更改
SHOW CREATE TABLE retail;
CREATE TABLE iceberg.retail (
InvoiceNo STRING,
StockCode STRING,
Description STRING,
Quantity STRING,
InvoiceDate STRING,
UnitPrice STRING,
CustomerID STRING,
Country STRING)
USING iceberg
LOCATION 'S3://iceberg/retail'
TBLPROPERTIES (
'current-snapshot-id' = '6565073876818863127',
'format' = 'iceberg/parquet',
'format-version' = '1')
耗时:0.034 秒,获取 1 行
MinIO 和 Iceberg 用于多云数据湖
Iceberg 和 MinIO 是构建企业数据湖的强大技术。两者都是高性能、高度可扩展且可靠的开源组件,拥有大量用户在各种硬件、软件和云实例上运行分析工作负载。
基于 Iceberg、Delta 和 HUDI 开放表格格式构建的数据湖,将数据湖分析提升到了新的水平。使用这些开放表格格式、MinIO 和您选择的分析或 ML 包,您可以构建无限的东西。所有内容都是开放的,MinIO 是与 S3 API 兼容的层,将所有内容连接在一起,并将数据湖扩展到多云,从边缘到数据中心到公共/私有云。
下载 MinIO 企业版存储,将世界上最快的对象存储投入您的数据湖中。通过 我们的 Slack 频道分享您的经验或提出问题。我们很想了解您正在构建什么!