使用 Spark 管理 Iceberg 表

Manage Iceberg Tables with Spark

Apache Iceberg 是一种开源表格式,具有多引擎兼容性,旨在适应大规模分析数据集。多引擎意味着 Spark、Trino、Presto、Hive 和 Impala 都可以同时独立地操作相同的数据。之前,我们发布了 使用 Iceberg 和 MinIO 的湖仓架构权威指南 作为该主题的介绍,包括设计目标和关键特性,以及 深入探讨 Iceberg:表上的 ACID 事务,讨论了如何在 Iceberg 格式中处理数据。

在这篇博文中,我们将构建一个 Notebook,它使用 MinIO 作为 Spark 作业的对象存储来管理 Iceberg 表。如果您尚未在 Kubernetes 环境中设置 spark-operator,请参阅 Spark、MinIO 和 Kubernetes

Apache Iceberg

Apache Iceberg 是一种开源表格式,允许在云存储系统(如 Amazon S3、Azure Blob Storage、Google Cloud Storage 和 MinIO)中高效存储大型、缓慢变化的数据集。Iceberg 最初由 Netflix 开发,解决了 Apache Parquet 和 Apache ORC 等现有表格式的一些限制。

Iceberg 旨在提供比传统表格式更多的优势,并成为数据湖的首选表格式,以下是一些优势

  • 模式演变:数据湖通常以其灵活性和存储各种数据格式的能力为特征。但是,这种灵活性可能使随着时间的推移管理模式更改变得具有挑战性。Iceberg 提供了一种添加、删除或修改表列的方法,而无需对数据进行完全重写,从而更容易随着时间的推移发展模式
  • 事务写入:在数据湖中,确保数据准确性和一致性非常重要,尤其是在数据用于业务关键目的时。Iceberg 为写入操作提供对 ACID 事务的支持,确保数据始终处于一致状态
  • 查询隔离:数据湖通常由许多用户或应用程序同时使用。Iceberg 允许多个查询同时运行而不会相互干扰,从而可以扩展数据湖的使用而不会牺牲性能
  • 时间旅行:在数据湖中,通常需要能够查询数据在特定时间点的样子。Iceberg 提供了一个时间旅行 API,使用户能够查询数据在特定版本或时间戳时的状态,从而更容易分析历史趋势或跟踪随时间推移的变化
  • 分区剪枝:数据湖通常包含大量数据,这可能导致查询速度缓慢且资源密集。Iceberg 支持按一列或多列对数据进行分区,这可以通过减少查询期间需要读取的数据量来显著提高查询性能。

Iceberg 可与各种处理引擎和框架一起使用,包括 Apache Spark、Dremio 和 Presto。它也与 Apache Arrow(一种跨语言内存数据格式)集成,后者支持跨不同处理引擎的高效数据序列化和反序列化。

Iceberg 目录

Apache Iceberg 目录是一个元数据存储,其中包含有关表的信息,包括其模式、位置和分区方案。它负责管理表的生命周期,包括创建、更新和删除它们,并提供用于查询元数据和访问数据的 API。

以下是 Apache Iceberg 支持的一些目录

  • JDBC 目录
  • Hive 目录
  • Nessie 目录
  • Hadoop 目录
  • Glue 目录
  • DynamoDB 目录
  • REST 目录

为了使本教程更简单,我们将为 Iceberg 表使用 Hadoop 目录。

MinIO 对象存储

MinIO 保证 Iceberg 表的持久性和对这些表上的 Spark 操作的高性能。MinIO 使用加密来保护 Iceberg 表,并根据基于策略的访问控制来限制对它们的访问。MinIO 使用 TLS 加密传输中的数据,并使用现代的行业标准加密算法(如 AES-256-GCM、ChaCha20-Poly1305 和 AES-CBC)对驱动器上的数据进行细粒度的对象级加密。MinIO 与 外部身份提供商(如 ActiveDirectory/LDAP、Okta 和 Keycloak)集成以进行 IAM。然后,用户和组在尝试访问 Iceberg 表时会受到 AWS IAM 兼容的 PBAC 的约束。

Iceberg 表在写入 MinIO 存储桶后会受到以下保护

  • 擦除编码 将数据文件拆分为 数据和奇偶校验块 并对其进行编码,以便即使编码数据的一部分不可用,也可以恢复主数据。横向扩展的分布式存储系统跨多个驱动器和节点保存编码数据。如果驱动器或节点发生故障或数据损坏,则可以从保存在其他驱动器和节点上的奇偶校验和数据块中重建原始数据。
  • 位腐烂保护 在后台捕获和修复损坏的对象,以消除对数据持久性的这种隐藏威胁
  • 存储桶和对象不变性 使用对象锁定、保留和其他治理机制保护保存到 MinIO 的数据免遭删除或修改。写入 MinIO 的对象永远不会被覆盖。
  • 存储桶和对象版本控制 提供进一步的对象保护。MinIO 存储 每个对象的每个版本,即使它被删除,除非特定版本被删除。MinIO 的 数据生命周期管理 允许管理员在层之间移动存储桶,例如使用 NVMe 处理性能密集型工作负载,并为版本设置过期日期,以便将其从系统中清除以提高存储效率。

将演示数据导入 MinIO

我们将使用 MinIO 上可用的 NYC Taxi 数据集。您可以从 此处 下载数据集,该数据集大约有 1.12 亿行,大小约为 10GB。您可以使用您选择的任何其他数据集,并使用以下命令将其上传到 MinIO

!mc mb play/openlake
!mc mb play/openlake/spark
!mc mb play/openlake/spark/sample-data
!mc cp nyc-taxi-data.csv play/openlake/spark/sample-data/nyc-taxi-data.csv

管理 Iceberg 表的示例 PySpark 应用程序

这基于 Iceberg 入门 Notebook

%%writefile sample-code/src/main-iceberg.py
import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")


# adding iceberg configs
conf = (
    SparkConf()
    .set("spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
    .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.demo.warehouse", "s3a://openlake/warehouse/")
    .set("spark.sql.catalog.demo.s3.endpoint", "https://play.min.io:50000")
    .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
    .set("spark.sql.catalogImplementation", "in-memory")
    .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
    .set("spark.executor.heartbeatInterval", "300000")
    .set("spark.network.timeout", "400000")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")


def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)

# Define schema for NYC Taxi Data
schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('tpep_pickup_datetime', StringType(), True),
    StructField('tpep_dropoff_datetime', StringType(), True),
    StructField('passenger_count', DoubleType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', DoubleType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

# Create Iceberg table "nyc.taxis_large" from RDD
df.write.mode("overwrite").saveAsTable("nyc.taxis_large")

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")

# Rename column "fare_amount" in nyc.taxis_large to "fare"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")

# Rename column "trip_distance" in nyc.taxis_large to "distance"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")

# Add description to the new column "distance"
spark.sql(
    "ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")

# Move "distance" next to "fare" column
spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")

# Add new column "fare_per_distance" of type float
spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")

# Check the snapshots available
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (1 till now)

# Populate the new column "fare_per_distance"
logger.info("Populating fare_per_distance column...")
spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (2 now) since previous operation will create a new snapshot

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Delete rows from "fare_per_distance" based on criteria
logger.info("Deleting rows from fare_per_distance column...")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")

# Partition table based on "VendorID" column
logger.info("Partitioning table based on VendorID column...")
spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")

# Query Metadata tables like snapshot, files, history
logger.info("Querying Snapshot table...")
snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
snapshots_df.show()  # shows all the snapshots in ascending order of committed_at column

logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")

spark.sql("""SELECT file_path,
                    file_format,
                    record_count,
                    null_value_counts,
                    lower_bounds,
                    upper_bounds
                    FROM nyc.taxis_large.files LIMIT 1""").show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()

# Time travel to initial snapshot
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()  # 1 new row

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")

构建 Docker 镜像

我们现在将构建包含上述 Python 应用程序的 Docker 镜像。您可以使用以下 Dockerfile 构建镜像

%%writefile sample-code/Dockerfile
FROM openlake/spark-py:3.3.2

USER root

WORKDIR /app

RUN pip3 install pyspark==3.3.2

COPY src/*.py .

您可以构建自己的 Docker 镜像,或使用 Docker Hub 上提供的预构建镜像 openlake/sparkjob-demo:3.3.2。

部署 Spark Iceberg 应用

我们将构建 Spark 作业 YAML 文件来定义规范,然后将其部署到 Kubernetes 集群中。

%%writefile sample-code/spark-job/sparkjob-iceberg.yml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: spark-iceberg
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "openlake/sparkjob-demo:3.3.2"
    imagePullPolicy: Always
    mainApplicationFile: local:///app/main-iceberg.py
    sparkVersion: "3.3.2"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        memory: "1024m"
        labels:
            version: 3.3.2
        serviceAccount: my-release-spark
        env:
            -   name: AWS_REGION
                value: us-east-1
            -   name: AWS_ACCESS_KEY_ID
                value: openlakeuser
            -   name: AWS_SECRET_ACCESS_KEY
                value: openlakeuser
    executor:
        cores: 1
        instances: 3
        memory: "2048m"
        labels:
            version: 3.3.2
        env:
            -   name: INPUT_PATH
                value: "s3a://openlake/spark/sample-data/taxi-data.csv"
            -   name: AWS_REGION
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_REGION
            -   name: AWS_ACCESS_KEY_ID
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_ACCESS_KEY_ID
            -   name: AWS_SECRET_ACCESS_KEY
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_SECRET_ACCESS_KEY
            -   name: ENDPOINT
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: ENDPOINT


您可以使用以下命令部署上述 sparkjob-iceberg.yml 文件

!kubectl apply -f sample-code/spark-job/sparkjob-iceberg.yml

应用部署后,您可以使用以下命令检查应用的状态

!kubectl get sparkapplications -n spark-operator

您还可以使用以下命令检查应用的日志(由于我们在 Spark 应用中禁用了 INFO 日志,因此在我们的应用日志开始显示之前,您可能看不到太多活动)

!kubectl logs -f spark-iceberg-driver -n spark-operator # 完成后停止此 Shell

应用完成后,您可以使用以下命令删除应用

!kubectl delete sparkapplications spark-iceberg -n spark-operator

代码演练

现在我们已经看到了端到端的代码,让我们详细了解一下代码片段。

设置 Iceberg 属性

# 添加 Iceberg 配置
conf = (
    SparkConf()
    .set("spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # 使用 Spark 集成 Iceberg
    .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.demo.warehouse", "s3a://warehouse/")
    .set("spark.sql.catalog.demo.s3.endpoint", "https://play.min.io:50000")
    .set("spark.sql.defaultCatalog", "demo") # Iceberg 目录名称
    .set("spark.sql.catalogImplementation", "in-memory")
    .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg 目录类型
    .set("spark.executor.heartbeatInterval", "300000")
    .set("spark.network.timeout", "400000")
)


以上代码片段指示 Spark 使用 Iceberg Spark 会话扩展,并定义名为 demo 的目录作为默认目录,该目录类型为 hadoop,使用 S3FileIO 作为 IO 实现,S3 端点为 https://play.min.io:50000。

创建 Iceberg 表

# 从 MinIO 读取 CSV 文件
df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

# 从 RDD 创建 Iceberg 表 "nyc.taxis_large"
df.write.saveAsTable("nyc.taxis_large")

# 查询表行数
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"纽约出租车数据总行数: {total_rows_count}")


在以上代码片段中,我们从 Minio https://play.min.io:50000 端点读取 taxi-data.csv 文件,并将其保存为 Iceberg 表 nyc.taxis_large。保存 Iceberg 表后,我们使用 Spark SQL 查询 nyc.taxis_large 以获取存在的记录总数。

模式演变

# Rename column "fare_amount" in nyc.taxis_large to "fare"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")

# Rename column "trip_distance" in nyc.taxis_large to "distance"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")

# Add description to the new column "distance"
spark.sql(
    "ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")

# Move "distance" next to "fare" column
spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")

# Add new column "fare_per_distance" of type float
spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")

# Check the snapshots available
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (1 till now)

# Populate the new column "fare_per_distance"
logger.info("Populating fare_per_distance column...")
spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")


以上代码演示了通过重命名、更改列类型、添加新列 fare_per_distance 以及基于 fare 和 distance 列的值填充新列来实现模式演变。

从表中删除数据

# 从 "fare_per_distance" 表中基于条件删除行
logger.info("正在从 fare_per_distance 列删除行...")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")

# 检查可用的快照
logger.info("正在检查快照...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # 打印所有可用的快照(4 个,目前),因为之前的操作将创建 2 个新的快照

# 查询表行数
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"删除操作后 NYC 出租车数据的总行数: {total_rows_count}")


在上面的代码片段中,我们删除了新字段 fare_per_distance 中为 null 或大于 4.0 的记录,以及 distance 字段大于 2.0 的记录。操作完成后,我们查询快照表,发现生成了 2 个新的快照。我们还获取了总记录数,它比我们开始时的数量少得多(397014 对比 112234626)。

对表进行分区

# 基于 "VendorID" 列 对表进行分区
logger.info("基于 VendorID 列对表进行分区...")
spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")

这段代码使用 VendorID 列创建了一个新的分区。此分区将适用于将来插入的新行,不会影响旧数据。我们还可以在创建 Iceberg 表时添加分区,例如使用以下代码:

CREATE TABLE IF NOT EXISTS nyc.taxis_large (VendorID BIGINT, tpep_pickup_datetime STRING, tpep_dropoff_datetime STRING, passenger_count DOUBLE, trip_distance DOUBLE, RatecodeID DOUBLE, store_and_fwd_flag STRING, PULocationID BIGINT, DOLocationID BIGINT, payment_type BIGINT, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE) PARTITIONED BY VendorID USING iceberg;

元数据表

# 查询元数据表,例如快照、文件、历史记录
logger.info("正在查询快照表...")
snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
snapshots_df.show()  # 显示所有快照,按 committed_at 列升序排列

logger.info("正在查询文件表...")
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
total_files_count = files_count_df.first().cnt
logger.info(f"纽约出租车数据的总数据文件数:{total_files_count}")

spark.sql("""SELECT file_path,
                    file_format,
                    record_count,
                    null_value_counts,
                    lower_bounds,
                    upper_bounds
                    FROM nyc.taxis_large.files LIMIT 1""").show()

# 查询历史记录表
logger.info("正在查询历史记录表...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()

Iceberg 有元数据表,如快照、文件、历史记录,我们可以查询这些表来了解后台发生的情况。例如,通过查询快照表,我们可以看到创建新快照时执行的操作。文件表提供了存储在 Minio 中的数据文件的信息,例如每个文件中的记录数、文件格式等。在历史记录表中,我们可以获取有关何时使快照成为当前快照以及父快照是谁等的所有信息。

使用快照进行时间旅行

# 时间旅行到初始快照
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")

# 查询表以查看结果
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# 查询历史表
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()  # 新增 1 行

# 查询表行数
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")


使用捕获特定时间点交易的快照,可以在 Iceberg 中进行时间旅行。在上面的代码中,我们查询历史表以获取曾经创建的第一个快照,并对该 snapshot_id 执行 roll_back_to_snapshot 系统调用。如果在回滚完成后查询该表,我们可以清楚地看到 fare_per_distance 字段为空,并且记录数恢复到 112234626。最后,历史表中有一条新记录,其中包含我们使用的 snapshot_id。

这是我们可以使用 Apache Iceberg 执行的操作的高级概述。它还支持稍后我们可以探索的表审计和维护。Apache Iceberg 还在快照的基础上添加了对标签和分支的支持,这具有巨大的潜力。一旦功能完善,我们将对其进行探索。

Spark 和 Iceberg 在 MinIO 上运行得非常好,这太酷了

Iceberg、Spark 和 MinIO 结合在一起,形成了构建可扩展高性能数据湖的强大技术组合。Iceberg 的开放式表格式和对多种引擎的支持使其成为企业数据湖的绝佳选择。将 MinIO 用于 Iceberg 存储为多云数据湖和分析奠定了坚实的基础。MinIO 包括 主动-主动复制,用于在本地、公共/私有云和边缘位置之间同步数据,从而实现企业所需的功能,例如地理负载均衡和快速热-热故障转移。

立即在 MinIO 上试用 Iceberg。如果您有任何疑问或想分享技巧,请通过我们的 Slack 频道 联系我们,或向 hello@min.io 发送邮件。