箭头在这里,箭头在那里,到处都是箭头。似乎现在你不能挥动一只死猫而不碰到一篇关于 Apache Arrow 的文章或博客文章。大多数文章似乎都是针对开发人员的,并且基于 Python 和 Spark 风格的开发平台。今天我将介绍如何在 R 编程语言中使用 Apache Arrow 与 MinIO。
如果您不熟悉 R(来自 维基百科)
“R 是一种用于统计计算和图形的编程语言,由 R 核心团队和 R 统计计算基金会支持。由统计学家罗斯·伊哈卡和罗伯特·詹特尔曼创建,R 被数据挖掘人员和统计学家用于数据分析和开发统计软件。用户创建了包来增强 R 语言的功能。”
当我处理数据、数据分析和机器学习时,我发现 R 对我来说是最直观的。
MinIO 是高性能软件定义的 S3 兼容对象存储,使其成为 Amazon S3 的强大而灵活的替代品。
最后,如果您不熟悉 Apache Arrow(来自网站)
“Apache Arrow 定义了一种语言无关的列式内存格式,用于平面和层次化数据,并针对现代硬件(如 CPU 和 GPU)上的高效分析操作进行组织。”
Arrow、R 和 MinIO 的组合形成了一个非常强大的平台,用于对大型数据集进行数据分析。我将使用 “E for Excel” 的 500 万行随机生成的销售数据 csv 文件。
请注意,此数据是虚构的。E for Excel 突出说明:“免责声明 - 数据集是通过 VBA 中的随机逻辑生成的。这些不是真实的销售数据,不应用于除测试之外的任何其他目的。”
我选择此数据集是因为它的结构相当简单,并且足够大,可以捕获分析处理中的一些计时指标。
R 语言有一个名为 RStudio 的配套 IDE,我将在本开发中使用它。如果您想继续,请安装 R 和 RStudio,并访问 MinIO 集群。如果您尚未运行 MinIO 和 mc
(MinIO 客户端),请 下载并安装它们。
我首先将销售数据下载到我的机器上的本地目录,然后将其复制到 MinIO 上的存储桶中。使用 MinIO 命令行工具 mc
可以将文件复制到 MinIO,也可以从 MinIO 控制台中上传文件。我创建了一个名为 large-csv
的存储桶,并将 5m Sales Records.csv
文件从控制台中上传到该存储桶。这是我们将要进行的工作的起点。

网络上有很多资源可以帮助您使用 R 与 MinIO 一起使用,以及在 R 中使用 Apache Arrow。以下是我发现对将这些东西整合在一起有用的两个链接
现在进入编码。正如我提到的,我正在从 RStudio 中的 R 中工作。RStudio 是用于处理 R 的常见 IDE,看起来像这样

我将把本文中的代码分段粘贴。
这是开头。这是设置环境并将必需的库加载到 R 中的代码。请注意使用环境来传递凭据和端点。有很多方法可以使这些值对库可用,包括与大多数 IAM 系统的集成。为了演示目的,这似乎是最简单的
# set the credentials and endpoint this r instances uses to access my minio cluster
Sys.setenv("AWS_ACCESS_KEY_ID" = "minioadmin", # enter your credentials
"AWS_SECRET_ACCESS_KEY" = "minioadmin", # enter your credentials
"AWS_S3_ENDPOINT" = "HP-Z230:9000") # change it to your specific minio IP and port to override default aws s3
# load the library
library(minio.s3)
library(arrow)
library(dplyr)
#library(aws.s3) This works as well
接下来,我已经包含了从 MinIO 中的存储桶读取大型 csv 文件所需的代码。我下面所做的事情要求整个 csv 能够容纳在本客户端的内存中。如果您的 csv 文件太大而无法容纳在内存中,则有几种方法可以在循环中以更小的块处理文件。下面的代码获取存储桶的句柄并读取对象。
# load the large csv file
b <- get_bucket(bucket = 'large-csv', use_https = F)
ob <- aws.s3::s3read_using(FUN = read.csv, object = "5m Sales Records.csv", bucket = b, opts = list(use_https = FALSE, region = ""))
一旦我们在一个对象中获得了 csv 的表格数据,我们就会将其写入另一个存储桶中的 Arrow 格式。Arrow 库喜欢 URL,因此我已经创建了几个函数来使构建 URL 更容易
# these functions like URLs
# get minio config, with expected defaults
minio_key <- Sys.getenv("MINIO_ACCESS_KEY", "minioadmin")
minio_secret <- Sys.getenv("MINIO_SECRET_KEY", "minioadmin")
minio_host <- Sys.getenv("MINIO_HOST", "hp-z230")
minio_port <- Sys.getenv("MINIO_PORT", "9000")
minio_arrow_bucket <- Sys.getenv("MINIO_ARROW_BUCKET", "arrow-bucket")
# helper function for minio URIs
minio_path <- function(...) paste(minio_arrow_bucket, ..., sep = "/")
minio_uri <- function(...) {
template <- "s3://%s:%s@%s?scheme=http&endpoint_override=%s%s%s"
sprintf(template, minio_key, minio_secret, minio_path(...), minio_host, ":", minio_port)
}
Arrow 格式的优势在于,数据可以针对随后将执行的预期查询进行优化。Dplyr group_by
用于设置分区方案。分区类似于在数据库上设置有用的索引,需要了解数据的查询或使用方式。分区既带来了成本,也带来了收益。需要权衡利弊,如果一个人了解数据最常被查询的方式,那么权衡可能是双赢的。对于这个例子,我将假设销售数据是针对特定国家进行分析的。另一方面,如果数据最常按国家/地区的商品类型进行分析,那么这个分区方案可能不会有帮助。尝试按每个列进行分区(以防万一)会消除分区的优势。以下是对来自使用 Arrow 和 R 的文章中分区的指南的解释 (https://arrow.apache.ac.cn/docs/r/articles/dataset.html)
分区性能注意事项
分区数据集有两个影响性能的方面:它增加了文件的数量,并在文件周围创建了一个目录结构。这两者都有利有弊。根据配置和数据集的大小,成本可能会超过收益。
由于分区将数据集拆分为多个文件,因此可以使用并行性读取和写入分区数据集。但是,每个额外的文件都会在文件系统交互处理中增加一些开销。它还会增加整个数据集的大小,因为每个文件都包含一些共享的元数据。例如,每个 parquet 文件都包含模式和组级统计信息。分区数是文件数的下限。如果您按日期对一年数据的数据集进行分区,您将至少有 365 个文件。如果您通过另一个具有 1,000 个唯一值的维度进一步分区,您将最多有 365,000 个文件。这种分区通常会导致小型文件,这些文件主要包含元数据。
分区数据集创建嵌套文件夹结构,这些结构允许我们修剪在扫描中加载的文件。但是,这会增加发现数据集中的文件的开销,因为我们需要递归地“列出目录”才能找到数据文件。
分区过细会导致问题:按日期对一年数据的 数据集进行分区将需要 365 次列表调用才能找到所有文件;添加另一个基数为 1,000 的列将使其变为 365,365 次调用。
最优的分区布局将取决于您的数据、访问模式以及读取数据的系统。大多数系统(包括 Arrow)应该能够跨范围的文件大小和分区布局工作,但有一些极端情况应该避免。这些指南可以帮助避免一些已知的最坏情况
避免文件小于 20MB 或大于 2GB。
避免分区布局具有超过 10,000 个不同分区的布局。
对于具有文件内组概念的文件格式(例如 Parquet),类似的指南适用。行组可以在读取时提供并行性,并允许根据统计信息跳过数据,但非常小的组会导致元数据成为文件大小的很大一部分。在大多数情况下,Arrow 的文件写入器提供了合理的组大小默认值。”
鉴于我假设数据将在国家/地区内进行分析,这是将 Arrow 格式数据集写入 MinIO 中不同存储桶的代码。
# partition by the "Country" column - we typically analyze by country
ob %>%
group_by(Country) %>%
write_dataset(minio_uri("sales-data"), format = "arrow" )
如果我们查看结果,我们会看到创建了多个文件来支持分区。

如果我们查看 arrow-bucket 的内容,我们会发现数据已按预期按国家/地区进行分区

这就是读取 .csv 文件并将数据集存储在 MinIO 上的 Arrow 格式中的所有代码。接下来,我们将打开此数据集并使用一些基本的数据选择和聚合来查询数据。
我们之所以要经历这些折腾,是因为通过在我们将最常在查询中使用的列上对数据集进行分区,我们可以减少每次查询从 MinIO 读取的数据,从而加快查询速度。
此外,Arrow 在 R 和 Dplyr 中的集成通过在执行之前先对处理进行预先准备来提高性能,从而避免在每个步骤之间创建中间存储。这样可以节省时间并减少内存使用量,从而有可能在处理过程中避免内存不足的情况。这种方法还尽可能地将处理下推(或完全避免)。根据数据集被分区的属性选择数据子集会显着减少必须处理的数据。如果数据按“国家/地区”进行分区,并且我们选择了一个特定的国家/地区,则所有其他文件及其包含的数据将被忽略。
此外,Arrow 文件可以包含元数据,这些元数据允许仅检索文件的一部分,如果查询将处理限制在该文件中的数据范围内。
查询步骤 mutate()/transmute(),select()/rename()/relocate(),filter(),group_by() 和 arrange() 记录了它们的行动,但在您运行 collect() 之前不会对数据进行评估。延迟这些步骤允许查询确定数据的一个小子集,而无需创建中间数据集。
为了处理或查询数据集,我们首先必须打开它。在下面的注释中,我们展示了内存数据结构“ds”包含的内容。“Ds”实际上是对元数据的引用 - 还没有将任何数据加载到内存中。这种处理元数据并将数据检索延迟或下推的方法允许处理比能够容纳在内存中的数据集更大的数据集。
##################
# Querying the sales order data
# open the dataset
ds <- open_dataset(minio_uri("sales-data"), format = "arrow" )
# here is what it contains:
# > ds
# FileSystemDataset with 186 Feather files
# Region: string
# Item.Type: string
# Sales.Channel: string
# Order.Priority: string
# Order.Date: string
# Order.ID: int32
# Ship.Date: string
# Units.Sold: int32
# Unit.Price: double
# Unit.Cost: double
# Total.Revenue: double
# Total.Cost: double
# Total.Profit: double
# Country: string
接下来,我们能够对该数据集执行有用的查询。鉴于我加载的数据是销售数据,我将查询在特定国家/地区销售的不同商品类型的平均利润,作为查询处理的示例。我还将计时在我的旧的、缓慢的 mac 笔记本电脑上执行此查询所需的时间,该笔记本电脑通过我旧的、缓慢的千兆网络访问我旧的、缓慢的 MinIO 集群 :-)
# what is the median margin on the differing item types sold in Cyprus?
# what's the highest? how long does it take to query 5m rows?
system.time(ds %>%
filter(Country == "Cyprus") %>%
select(Unit.Price, Unit.Cost, Item.Type) %>%
mutate(margin = 100 * ((Unit.Price - Unit.Cost)/Unit.Cost)) %>%
group_by(Item.Type) %>%
collect() %>%
summarise(
median_margin = median(margin),
n = n()
) %>% arrange(desc(median_margin)) %>%
print()
)
以下是查询的结果和时间。还不错。
Item.Type median_margin n
<chr> <dbl> <int>
1 Clothes 205. 2263
2 Cereal 75.6 2268
3 Vegetables 69.4 2234
4 Cosmetics 66.0 2235
5 Baby Food 60.1 2271
6 Snacks 56.6 2266
7 Beverages 49.3 2261
8 Personal Care 44.2 2265
9 Fruits 34.8 2234
10 Household 33.0 2227
11 Office Supplies 24.0 2253
12 Meat 15.7 2233
user system elapsed
0.057 0.037 0.303
没有针对数据集的“关闭”调用,因此我们完成了此处理。
总结
总而言之,在 R 中使用 Arrow 格式的数据集允许一个人根据列以有利于处理的方式对数据进行智能分区。它还允许一个人根据需要加载数据的子集,因此能够处理比能够容纳在内存中的数据集更大的数据集。这些函数非常直观,正如您所看到的,执行相当复杂的查询不需要太多代码。享受!