MinIO 企业存储上的 Iceberg ACID 事务

ACID Transactions with Iceberg on MinIO Enterprise Store

Apache Iceberg 数据湖存储格式允许对存储在MinIO 企业版存储中的表格进行 ACID 事务。ACID 事务允许多个用户和服务并发且可靠地原子地添加和删除记录。同时,查询被隔离以保持对正在被更改的表格的读一致性。您可以使用 MinIO 企业版存储和 Iceberg 以及 PostgreSQL 作为元数据数据库,利用 ACID 事务支持进行写入、删除、更新、时间旅行和模式修改。

好消息是,您可以继续使用您已知且喜爱的 SQL 和 DML。例如,您可以使用 UPDATE 语句更新行中的新信息,或使用 DELETE 语句删除行。即使时间旅行也是一个 SELECT 语句。

将 Iceberg 表格格式与 MinIO 企业版存储配对,可以创建一个强大、灵活且可扩展的湖仓平台。 Iceberg 表格规范声明了一种表格格式,旨在管理存储在分布式系统中的“大量、缓慢变化的集合”文件或对象。Iceberg 规范版本 1 定义了使用不可变文件格式(Parquet、Avro 和 ORC)管理大型分析表格。规范版本 2 为具有不可变文件的分析表格添加了行级更新和删除。

Iceberg 表格格式跟踪表格中的各个数据文件,而不是目录。数据文件是在本地创建的,并且只能显式地将文件添加到表格中。表格状态保存在元数据中。Iceberg 元数据和清单列表仅仅是存储在 MinIO 企业版存储中的对象,它也维护自己的元数据,这些元数据与对象一起存储。表格状态的每次更改都需要创建一个新的 Iceberg 元数据文件,该文件用原子交换替换旧元数据。原子性由 MinIO 企业版存储启用。

Iceberg 表格格式要求

  • 就地写入:文件/对象一旦写入就不能移动或更改
  • 可搜索读取:数据文件格式需要支持搜索
  • 删除:表格删除不再需要的文件/对象(或者,在 MinIO 企业版存储的情况下,对象被标记为已删除,但会被保留)

这些要求与 MinIO 企业版存储等对象存储兼容。数据和元数据文件一旦写入,在被删除之前就是不可变的。MinIO 企业版存储继续保存数据和元数据对象的过时版本,确保数据永远不会被删除,并扩展 Iceberg 的时间旅行功能。

在本博文中,我们将深入研究 Iceberg 的 ACID 事务,以创建 Iceberg 表格,更新和删除其中的记录,并演化其模式。这篇文章是对 使用 Iceberg 和 MinIO 的湖仓架构终极指南的后续文章,其中包括对湖仓架构的解释,对 Spark、Iceberg、PostgreSQL 和 MinIO 如何协同工作进行了深入讨论,并提供了一个教程,教读者如何安装它们、创建表格、演化表格模式以及如何使用时间旅行和回滚。

使用 Iceberg 和 MinIO 的 ACID 事务教程

我正在使用托管在 UCI 机器学习资料库的 在线零售数据集。这是一个事务数据集,包含了英国注册的非门店在线零售业务在 2010 年 1 月 12 日至 2011 年 9 月 12 日期间发生的全部交易。数据集包含约 550,000 条记录,具有 8 个属性

  • InvoiceNo:发票编号。名义,一个 6 位的整数,唯一分配给每个交易。如果此代码以字母“c”开头,则表示取消。
  • StockCode:产品(商品)代码。名义,一个 5 位的整数,唯一分配给每个不同的产品。
  • Description:产品(商品)名称。名义。
  • Quantity:每笔交易中每个产品(商品)的数量。数字。
  • InvoiceDate:发票日期和时间。数字,生成每笔交易的日期和时间。
  • UnitPrice:单价。数字,以英镑计的产品单价。
  • CustomerID:客户编号。名义,一个 5 位的整数,唯一分配给每个客户。
  • Country:国家名称。名义,每个客户居住的国家名称。

该文件采用 Microsoft Excel 格式,因此第一步是将其转换为 .CSV 格式,以便可以将其读取到 Spark 中作为数据帧,然后保存到 Iceberg 中。将文件转换为 .CSV 格式后,我将其保存到用于存储原始数据的 MinIO 存储桶中。

我正在使用先前教程中的 Iceberg 安装程序, 使用 Iceberg 和 MinIO 的湖仓架构终极指南。如果您没有阅读过该博客文章,请参考该文章或自行设置 Iceberg 和 MinIO 环境。

我们将使用 Spark 和 Spark-SQL 来处理 Iceberg。

读取数据并保存为 Iceberg 表格

我们要完成的第一个任务是将 .CSV 文件读取到 Spark 中作为数据帧,然后将其保存到 Iceberg 中。

仅仅 3 行代码就包含了很多内容,所以让我们先简要概述一下 Spark 如何处理数据。 Spark 是一种用于大规模数据处理的统一分析引擎,它在 Java、Scala、Python 和 R 中提供了一组高级 API。Spark 可以使用 HDFS 和 YARN 的传统库,也可以使用对象存储(例如 MinIO)通过 S3 API 运行。Spark-SQL 是一个用于结构化数据处理的 Spark 模块。它使用与 Spark 相同的执行引擎,Spark 和 Spark-SQL 的组合为分析提供了强大的工具包。

数据帧是按列组织的 dataset(数据集)。数据帧可以看作关系型数据库中的表格,它对 Spark 引擎有一些内置的优化。数据帧可以从各种来源构建,我们将从 .CSV 文件构建数据帧,将我们的零售数据组织成列和行。

Spark 将读取 .CSV 文件并将其组织成一个数据帧,作为一个临时视图。Spark 从 SELECT 语句创建临时视图。这些视图是会话范围的,并且是只读的。然后,我们将临时视图写入 Iceberg 表格,让 Spark 自动为我们创建模式。    

val df spark.read.csv("online-retail.csv")

df.createOrReplaceTempView("tempview");

spark.sql("CREATE or REPLACE TABLE retail USING iceberg AS SELECT * FROM tempview");

数据集不是很大,因此读取 .CSV 文件并写入 Iceberg Parquet 文件所花费的时间很少。  

在 SparkSQL 中快速查询可以验证数据是否已写入 Iceberg 表格

spark-sql> SELECT * from local.retail limit 10;

22/08/24 18:08:56 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

InvoiceNo       StockCode       Description     Quantity        InvoiceDate     UnitPrice       CustomerID      Country

536365  85123A  WHITE HANGING HEART T-LIGHT HOLDER      6       12/1/2010 8:26  2.55    17850   United Kingdom

536365  71053   WHITE METAL LANTERN     6       12/1/2010 8:26  3.39    17850   United Kingdom

536365  84406B  CREAM CUPID HEARTS COAT HANGER  8       12/1/2010 8:26  2.75    17850   United Kingdom

536365  84029G  KNITTED UNION FLAG HOT WATER BOTTLE     6       12/1/2010 8:26  3.39    17850   United Kingdom

536365  84029E  RED WOOLLY HOTTIE WHITE HEART.  6       12/1/2010 8:26  3.39    17850   United Kingdom

536365  22752   SET 7 BABUSHKA NESTING BOXES      2       12/1/2010 8:26  7.65    17850   United Kingdom

536365  21730   GLASS STAR FROSTED T-LIGHT HOLDER       6       12/1/2010 8:26  4.25    17850   United Kingdom

536366  22633   HAND WARMER UNION JACK  6       12/1/2010 8:28  1.85    17850   United Kingdom

536366  22632   HAND WARMER RED POLKA DOT       6       12/1/2010 8:28  1.85    17850   United Kingdom

耗时:7.944 秒,已提取 10 行

在 MinIO 中,我们也可以看到数据文件已被保存到 iceberg 存储桶中

更新 Iceberg 表格中的记录

我们将使用 SELECT 语句,然后是 UPDATE 语句来更新表格。更新和删除是在 OLTP 数据库(如在线零售数据集)中常见的操作,而 Iceberg 表格格式将此功能引入数据湖。Iceberg 能够执行行级更新并维护数据一致性。

我们将更新零售表以更改特定发票的购买数量。想象一下,在现实世界中发生这种变化很容易,例如,客户可以返回网站并购买更多商品以包含在原始订单中。我们将为发票号 (retail._c0) 559340 和库存代码 (retail._c1) 22413 (METAL SIGN TAKE IT OR LEAVE IT) 的数量 (retail._c3) 添加一个单位。

首先,查询发票和商品

SELECT retail.c0,retail.c1,retail.c3 FROM retail where retail._c0='559340' and retail._c1='22413';

然后使用 UPDATE 语句进行更新

UPDATE retail SET retail._c3='7' WHERE retail._c0='559340' and retail._c1='22413';

最后,验证 UPDATE 是否已执行

SELECT retail._c0, retail._c1, retail._c2, retail._c3 FROM retail WHERE retail._c0='559340' and retail._c1='22413';

您应该看到以下输出,数量 (retail._c3) 已更改为 7

559340  22413   METAL SIGN TAKE IT OR LEAVE IT  7

耗时:0.464 秒,获取 1 行

从 Iceberg 表中删除记录

ACID 属性使数据湖中的 Iceberg 表格删除操作成为可能。

想象一下,客户请求从我们的零售数据湖中删除其用户数据。为了遵守 GDPR,我们必须立即找到并删除这些数据。已经收到删除与客户 13269 相关的所有记录的请求。

首先,让我们计算与该客户相关的记录数量

SELECT count(*) FROM retail WHERE retail._c6=’13269’; 

320

耗时:0.242 秒,获取 1 行

我们将从表中删除这 320 条记录

DELETE FROM retail WHERE retail._c6='13269';

验证记录是否已删除。以下查询应报告 0 条记录

SELECT count(*) FROM retail WHERE retail._c6='13269';

演化 Iceberg 表的模式

我们让 Iceberg 在我们最初将 Spark 数据帧保存为 Iceberg 表格时自动创建表格模式,导致了一些不太直观的列名,_c0 到 _c7。我们将更改这些名称,使其更易于阅读,例如,_c0 将重命名为 InvoiceNo。

Iceberg 模式更新仅更改元数据。可以添加、删除、重命名、更新和重新排序列,而无需完全重写表格,这是一个昂贵的提议。

让我们看看当前的模式

SHOW CREATE TABLE retail;

CREATE TABLE iceberg.retail (

_c0 STRING,

_c1 STRING,

_c2 STRING,

_c3 STRING,

_c4 STRING,

_c5 STRING,

_c6 STRING,

_c7 STRING)

USING iceberg

LOCATION 'S3://iceberg/retail'

TBLPROPERTIES (

'current-snapshot-id' = '6565073876818863127',

'format' = 'iceberg/parquet',

'format-version' = '1')

耗时:0.082 秒,获取 1 行

让我们将列重命名为更易于阅读的名称。当您执行每个命令时,请注意 Iceberg 以多快的速度执行元数据更改 - 我们谈论的是更改元数据的毫秒数,而写入整个表格需要大约一分钟。

```ALTER TABLE retail RENAME COLUMN _c1 TO StockCode;

ALTER TABLE retail RENAME COLUMN _c2 TO Description;

ALTER TABLE retail RENAME COLUMN _c3 TO Quantity;

ALTER TABLE retail RENAME COLUMN _c4 TO InvoiceDate;

ALTER TABLE retail RENAME COLUMN _c5 TO UnitPrice;

ALTER TABLE retail RENAME COLUMN _c6 TO CustomerID;

ALTER TABLE retail RENAME COLUMN _c7 TO Country;

让我们验证表格模式是否已更改

SHOW CREATE TABLE retail;

CREATE TABLE iceberg.retail (

InvoiceNo STRING,

StockCode STRING,

Description STRING,

Quantity STRING,

InvoiceDate STRING,

UnitPrice STRING,

CustomerID STRING,

Country STRING)

USING iceberg

LOCATION 'S3://iceberg/retail'

TBLPROPERTIES (

'current-snapshot-id' = '6565073876818863127',

'format' = 'iceberg/parquet',

'format-version' = '1')

耗时:0.034 秒,获取 1 行

MinIO 和 Iceberg 用于多云数据湖

Iceberg 和 MinIO 是构建企业数据湖的强大技术。两者都是高性能、高度可扩展且可靠的开源组件,拥有大量用户在各种硬件、软件和云实例上运行分析工作负载。

基于 Iceberg、Delta 和 HUDI 开放表格格式构建的数据湖,将数据湖分析提升到了新的水平。使用这些开放表格格式、MinIO 和您选择的分析或 ML 包,您可以构建无限的东西。所有内容都是开放的,MinIO 是与 S3 API 兼容的层,将所有内容连接在一起,并将数据湖扩展到多云,从边缘到数据中心到公共/私有云。

下载 MinIO 企业版存储,将世界上最快的对象存储投入您的数据湖中。通过 我们的 Slack 频道分享您的经验或提出问题。我们很想了解您正在构建什么!