使用 Hudi 和 MinIO 构建流式数据湖

Building Streaming Data Lakes with Hudi and MinIO

Apache Hudi 是一个流式数据湖平台,它将核心仓库和数据库功能直接带到数据湖。Hudi 不满足于像 DeltaApache Iceberg 一样,自称是一个开放的文件格式,它提供了表、事务、upsert/删除、高级索引、流式数据摄入服务、数据聚类/压缩优化以及并发性。

Hudi 在 2016 年推出,它扎根于 Hadoop 生态系统,其名称背后的含义是:Hadoop Upserts anD Incrementals。它是为了管理 HDFS 上大型分析数据集的存储而开发的。Hudi 的主要目的是在摄入流式数据时减少延迟。

随着时间的推移,Hudi 已经发展到使用 云存储 和对象存储,包括 MinIO。Hudi 离开 HDFS 的趋势与世界抛弃传统 HDFS,转向高性能、可扩展和云原生对象存储的趋势相吻合。Hudi 为 Apache Spark、Flink、Presto、Trino 和其他工具提供优化的承诺,与 MinIO 在规模上提供云原生应用程序性能的承诺完美结合。    

在生产环境中使用 Hudi 的公司包括 UberAmazon字节跳动Robinhood。这些是世界上一些最大的 流式数据湖。在这个用例中,Hudi 的关键在于它提供了一个增量数据处理堆栈,可以在列式数据上进行低延迟处理。通常,系统会使用 Apache Parquet 或 ORC 等开放文件格式一次性写入数据,并将这些数据存储在高度可扩展的对象存储或分布式文件系统之上。Hudi 作为数据平面,用于摄入、转换和管理这些数据。Hudi 通过 Hadoop FileSystem API 与存储进行交互,该 API 与从 HDFS 到对象存储到内存文件系统的各种实现兼容(但不一定最优)。    

Hudi 文件格式

Hudi 使用一个基础文件和增量日志文件来存储对给定基础文件的更新/更改。基础文件可以是 Parquet(列式)或 HFile(索引)。增量日志以 Avro(行式)格式保存,因为将更改记录到基础文件中,以便在它们发生时进行处理。

Hudi 将对给定基础文件的所有更改编码为一系列块。块可以是数据块、删除块或回滚块。这些块按顺序合并,以生成新的基础文件。这种编码也创建了一个自包含日志。

来源:https://hudi.apache.org/blog/2021/07/21/streaming-data-lake-platform/

Hudi 表格式

表格式由表的布局、表的模式以及跟踪对表更改的元数据组成。Hudi 贯彻写入时模式检查,这与流处理的重点一致,以确保管道不会因不向后兼容的更改而中断。

Hudi 将给定表/分区的文件分组在一起,并在记录键和文件组之间映射。如上所述,所有更新都记录在特定文件组的增量日志文件中。这种设计比 Hive ACID 更有效,Hive ACID 必须将所有数据记录与所有基础文件合并才能处理查询。Hudi 的设计预见了快速的基于键的 upsert 和删除,因为它使用的是文件组的增量日志,而不是整个数据集的增量日志。

来源:https://hudi.apache.org/blog/2021/07/21/streaming-data-lake-platform/

时间线至关重要,因为它作为 Hudi 所有表元数据的事实来源事件日志。时间线存储在 .hoodie 文件夹中,或者在我们的例子中存储在桶中。时间线上的事件会保留下来,直到它们被删除。时间线适用于整个表,也适用于文件组,通过将增量日志应用于原始基础文件,可以重建文件组。为了优化频繁写入/提交操作,Hudi 的设计使元数据的大小相对于整个表的大小而言很小。

时间线上的新事件被保存到内部元数据表中,并实现为一系列合并读取表,从而提供低写入放大。因此,Hudi 可以快速吸收元数据的快速变化。此外,元数据表使用 HFile 基础文件格式,通过一组索引的键查找进一步优化性能,避免了读取整个元数据表的需要。表中所有物理文件路径都包含在元数据中,以避免昂贵且耗时的云文件列表操作。  

Hudi 写入器

Hudi 写入器简化了体系结构,在这些体系结构中,Hudi 充当高性能写入层,具有 ACID 事务支持,可以实现非常快速的增量更改,例如更新和删除。

典型的 Hudi 体系结构依赖于 Spark 或 Flink 管道将数据传递到 Hudi 表中。Hudi 写入路径经过优化,比简单地将 Parquet 或 Avro 文件写入磁盘更有效。Hudi 会分析写入操作,并将它们分类为增量操作(insertupsertdelete)或批处理操作(insert_overwriteinsert_overwrite_tabledelete_partitionbulk_insert),然后应用必要的 优化.

Hudi 写入器还负责维护元数据。对于每条记录,提交时间和该记录特有的序列号(类似于 Kafka 偏移量)都会被写入,从而可以推断出记录级别的更改。用户也可以在传入的数据流中指定事件时间字段,并使用元数据和 Hudi 时间线跟踪这些字段。这可以对流处理产生重大改进,因为 Hudi 包含每条记录的到达时间和事件时间,这使得为复杂的流处理管道构建强大的 水印 成为可能。

Hudi 读取器

写入器和读取器之间的快照隔离允许从所有主要的数据湖查询引擎(包括 Spark、Hive、Flink、Prest、Trino 和 Impala)一致地查询表快照。与 Parquet 和 Avro 一样,Hudi 表可以被 SnowflakeSQL Server 等工具作为外部表读取。

Hudi 读取器经过设计,使其轻量级。只要有可能,就会使用特定于引擎的矢量化读取器和缓存,例如 Presto 和 Spark 中的那些读取器和缓存。当 Hudi 必须合并基础文件和日志文件以进行查询时,Hudi 会使用可溢出映射和延迟读取等机制来提高合并性能,同时还提供针对读取优化的查询。

Hudi 包含了不止几个非常强大的增量查询功能。元数据是其中的核心,它允许将大型提交作为更小的块进行使用,并且完全解耦数据的写入和增量查询。通过有效地利用元数据,时间旅行只是另一个具有定义的开始和结束点的增量查询。Hudi 在任何给定时间点原子地将键映射到单个文件组,支持在 Hudi 表上进行完整的 CDC 功能。如上所述,在 Hudi 写入器部分,每个表都由文件组组成,每个文件组都有自己的自包含元数据。  

为 Hudi 欢呼!

Hudi 最大的优势是它摄入流式数据和批处理数据的速度。通过提供 upsert 功能,Hudi 执行的任务速度比重写整个表或分区快几个数量级。

为了利用 Hudi 的摄入速度,数据湖需要一个能够提供高 IOPS 和吞吐量的存储层。MinIO 的可扩展性和高性能的结合正是 Hudi 所需要的。MinIO 完全能够提供支持实时企业数据湖所需的性能——最近的一项基准测试 使用仅 32 个节点的现成 NVMe SSD,在 GET 操作上实现了 325 GiB/s(349 GB/s)的吞吐量,在 PUT 操作上实现了 165 GiB/s(177 GB/s)的吞吐量。    

一个活跃的企业 Hudi 数据湖存储了大量的较小的 Parquet 和 Avro 文件。MinIO 包含了许多 针对小文件优化的功能,这些功能可以使数据湖更快。小对象与元数据一起内联保存,减少了读取和写入像 Hudi 元数据和索引这样的小文件所需的 IOPS。

模式是每个 Hudi 表的重要组成部分。Hudi 可以强制执行模式,也可以允许模式演化,以便流式数据管道可以在不中断的情况下进行调整。此外,Hudi 贯彻写入时模式检查,以确保更改不会破坏管道。Hudi 依赖于 Avro 来存储、管理和演化表的模式。

Hudi 为数据湖提供 **ACID 事务保证**。Hudi 确保原子写入:提交以原子方式进行到时间线,并被赋予一个时间戳,该时间戳表示操作被认为发生的时间。Hudi 将写入器、表和读取器进程之间的快照隔离起来,因此每个进程都对表的某个一致性快照进行操作。Hudi 通过写入器之间的乐观并发控制 (OCC) 和表服务与写入器之间以及多个表服务之间的基于 MVCC 的非阻塞并发控制来完成这些工作。

Hudi 和 MinIO 教程

本教程将引导您完成设置 Spark、Hudi 和 MinIO 的步骤,并介绍一些基本的 Hudi 功能。本教程基于 Apache Hudi Spark 指南,并经过调整以适应云原生 MinIO 对象存储。

请注意,使用版本化存储桶会给 Hudi 增加一些维护开销。任何被删除的对象都会创建一个 删除标记。当 Hudi 使用 Cleaner 工具 清理文件时,删除标记的数量会随着时间的推移而增加。正确配置 生命周期管理 来清理这些删除标记非常重要,因为如果删除标记的数量达到 1000,List 操作可能会卡住。Hudi 项目维护人员建议使用生命周期规则在一天后清理删除标记。

先决条件

下载并安装 Apache Spark。

下载并安装 MinIO。记录 IP 地址、控制台的 TCP 端口、访问密钥和密钥。

下载并安装 MinIO 客户端。

下载 AWS 和 AWS Hadoop 库,并将它们添加到您的类路径中,以便使用 S3A 与对象存储进行交互。

  • AWS: aws-java-sdk:1.10.34(或更高版本)
  • Hadoop: hadoop-aws:2.7.3(或更高版本)

下载 Jar 文件,解压缩它们并将它们复制到 /opt/spark/jars

创建 MinIO 存储桶

使用 MinIO 客户端创建一个存储桶来存放 Hudi 数据。

mc alias set myminio http://<your-MinIO-IP:port> <your-MinIO-access-key>  <your-MinIO-secret-key>
mc mb myminio/hudi

启动配置了 Hudi 的 Spark

启动 Spark shell,将 Hudi 配置为使用 MinIO 作为存储。确保配置 S3A 的条目,并使用您的 MinIO 设置。

spark-shell \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \
--conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\
--conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--conf 'fs.s3a.signing-algorithm=S3SignerType'

然后,在 Spark 中初始化 Hudi。

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

请注意,创建一个 外部配置文件 将简化 Hudi 的重复使用。

创建表

尝试使用 Scala 创建一个简单的 Hudi 小表。Hudi DataGenerator 是一种快速简便的方法,可以根据 示例行程架构 生成示例插入和更新。

val tableName = "hudi_trips_cow"
val basePath = "s3a://hudi/hudi_trips_cow"
val dataGen = new DataGenerator

将数据插入 Hudi 并将表写入 MinIO

以下步骤将生成新的行程数据,将它们加载到 DataFrame 中,并将我们刚刚创建的 DataFrame 作为 Hudi 表写入 MinIO。mode(Overwrite) 会覆盖并重新创建表,如果表已存在。行程数据依赖于记录键(uuid)、分区字段(region/country/city)和逻辑(ts)来确保每个分区的行程记录是唯一的。我们将使用默认的写入操作 upsert。如果您没有更新工作负载,可以使用 insertbulk_insert,这可能更快。

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

打开浏览器并使用您的访问密钥和密钥登录到 http://<您的 MinIO IP>:<端口> 上的 MinIO。您将在存储桶中看到 Hudi 表。


存储桶还包含一个 .hoodie 路径,其中包含元数据,以及 americasasia 路径,其中包含数据。


查看元数据。这是完成整个教程后我的 .hoodie 路径的样子。我们可以看到,我在 2022 年 9 月 13 日星期二的 9:02、10:37、10:48、10:52 和 10:56 修改了该表。

查询数据

让我们将 Hudi 数据加载到 DataFrame 中并运行一个示例查询。

// spark-shell
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

使用 Hudi 进行时间旅行

不,我们不是在谈论去观看 Hootie & the Blowfish 的 1988 年音乐会。

对 Hudi 表的每次写入都会创建新的快照。将快照视为表的版本,可以引用这些版本来执行时间旅行查询。

尝试一些时间旅行查询(您需要更改时间戳才能与您的情况相关)。

spark.read.
format("hudi").
option("as.of.instant", "2022-09-13 09:02:08.200").
load(basePath)

更新数据

此过程类似于我们之前插入新数据的方式。为了展示 Hudi 更新数据的能力,我们将生成对现有行程记录的更新,将它们加载到 DataFrame 中,然后将 DataFrame 写入已保存到 MinIO 中的 Hudi 表。

请注意,我们使用的是 append 保存模式。一个通用的准则是,除非您要创建新表,否则不要覆盖任何记录,就使用 append 模式。使用 Hudi 的一种典型方式是实时摄取流数据,将它们追加到表中,然后编写一些逻辑,根据刚刚追加的内容合并和更新现有记录。或者,可以使用 overwrite 模式进行写入,如果表已存在,则删除并重新创建该表。

// spark-shell
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

查询数据将显示更新后的行程记录。

增量查询

Hudi 可以使用增量查询提供自给定时间戳以来更改的记录流。我们只需要提供一个开始时间,从该时间开始将流式传输更改以查看更改直至当前提交,并且可以使用结束时间来限制流。

增量查询对于 Hudi 来说是一件大事,因为它允许您在批处理数据上构建流式管道。

// spark-shell
// reload data
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in
// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

特定时间点查询

Hudi 可以查询截至特定时间和日期的数据。

// spark-shell
val beginTime = "000" // Represents all commits > this time.
val endTime = commits(commits.length - 2) // commit time we are interested in
//incrementally query data
val tripsPointInTimeDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

使用软删除删除数据

Hudi 支持两种不同的方法来删除记录。软删除会保留记录键,并将所有其他字段的值置空。软删除会保存在 MinIO 中,并且只能使用硬删除从数据湖中删除。

// spark-shell
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
// fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
// fetch two records for soft deletes
val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
// prepare the soft deletes by ensuring the appropriate fields are nullified
val nullifyColumns = softDeleteDs.schema.fields.
map(field => (field.name, field.dataType.typeName)).
filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
&& !Array("ts", "uuid", "partitionpath").contains(pair._1)))
val softDeleteDf = nullifyColumns.
foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
(ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
// simply upsert the table after setting these fields to null
softDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY, "upsert").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// reload data
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
// This should return the same total count as before
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// This should return (total - 2) count as two records are updated with nulls
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

使用硬删除删除数据

相反,硬删除是我们认为的删除。记录键和相关字段将从表中删除。

// spark-shell
// fetch total records count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// fetch two records to be deleted
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// run the same read query as above.
val roAfterDeleteViewDF = spark.
read.
format("hudi").
load(basePath)
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

插入覆盖

当数据湖获得更新现有数据的能力时,它就会成为数据湖仓。我们将生成一些新的行程数据,然后覆盖现有数据。此操作比 upsert 更快,在 upsert 中,Hudi 会为您计算整个目标分区。在这里,我们指定配置以绕过 upsert 会为您执行的自动索引、预合并和重新分区操作。

// spark-shell
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION.key(),"insert_overwrite").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
// Should have different keys now for San Francisco alone, from query before.
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)

演化表架构和分区

架构演化允许您更改 Hudi 表的架构,以适应数据随时间发生的变化。

以下是一些关于如何查询和演化架构和分区的示例。有关更深入的讨论,请参阅 架构演化 | Apache Hudi。请注意,如果您运行这些命令,它们将更改您的 Hudi 表架构,使其与本教程中的架构不同。

-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName
-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
#Alter table examples
--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

目前,SHOW partitions 仅在文件系统上有效,因为它基于文件系统表路径。

本教程使用 Spark 来展示 Hudi 的功能。但是,Hudi 可以支持多种表类型/查询类型,并且可以使用 Hive、Spark、Presto 等查询引擎查询 Hudi 表。Hudi 项目有一个 演示视频,展示了所有这些功能,该视频基于 Docker 设置,所有相关系统都在本地运行。

呼!呼!让我们在 MinIO 上构建 Hudi 数据湖!

Apache Hudi 是第一个用于数据湖的开放式表格式,非常适合在流式体系结构中使用。Hudi 社区和生态系统 生机勃勃,并积极发展,越来越强调用 Hudi/对象存储替换 Hadoop/HDFS,以构建云原生流式数据湖。将 MinIO 用于 Hudi 存储为构建多云数据湖和分析铺平了道路。MinIO 包含 主动-主动复制,用于在不同位置之间同步数据(本地、公有/私有云和边缘),从而实现企业所需的重要功能,例如地理负载均衡和快速热热故障转移。

立即在 MinIO 上试用 Hudi。如果您有任何疑问或想分享技巧,请通过 我们的 Slack 频道 与我们联系。