使用 MinIO 构建符合 S3 标准的股票市场数据湖

Building an S3 Compliant Stock Market Data Lake with MinIO

在我之前所有关于 MinIO 的文章中,如果需要编写代码,我都会使用 MinIO 的 Python SDK,其文档在此。我更喜欢这个 SDK,因为它易于使用,并且提供了对 MinIO 企业功能的编程访问,例如生命周期管理对象锁定Bucket 通知站点复制。(在我的文章AI/ML 对象管理中,我展示了如何使用代码设置生命周期管理和对象锁定。)如果您的应用程序正在以编程方式设置 Bucket 并且需要这些功能,那么 MinIO SDK 是一个不错的选择。但是,MinIO 兼容 S3,您可以使用任何实现 S3 的 SDK 连接到 MinIO。

另一个流行的 S3 访问 SDK 是亚马逊的S3 客户端,它是其botocore库的一部分——一个到 AWS 上许多服务的低级接口。此库也适用于 MinIO,并且提供了访问我之前提到的所有企业功能。如果您正在考虑迁移应用程序及其数据从 AWS 到您本地数据中心的 MinIO,那么您无需更改任何代码。只需更改配置中的 URL、访问密钥和密钥,即可完成。

还有一个选择是S3fs 库。它不提供对 MinIO SDK 和 S3 客户端等企业功能的访问,但易于使用,许多其他库和工具(如 PyArrow 和 MLFlow)使用其类来设置 S3 数据源。

在这篇文章中,我将使用 S3fs Python 库与 MinIO 交互。为了使事情更有趣,我将创建一个小型数据湖,用市场数据填充它,并为希望分析股票市场趋势的人创建股票走势图。

本文中显示的所有代码都可以在此处找到。

安装所需的库

除了 s3fs 库之外,我们还需要 Yahoo Finance 库来下载历史市场数据。我还要使用 Seaborn 和 Matplotlib 来绘制股票图表。可以使用以下命令安装这四个库。

pip install matplotlib

pip install s3fs

pip install seaborn
pip install yfinance

下载市场数据

在过去十年中,市场观察人士习惯于发明新术语来指代最热门的股票。2013 年,吉姆·克莱默创造了 FANG 这个词——一个指代 Facebook、亚马逊、Netflix 和谷歌的首字母缩略词。2017 年,投资者开始将苹果纳入该组,首字母缩略词变成了 FAANG。当 Facebook 将其名称更改为 Meta 并且 Google 重塑品牌为 Alphabet 时,首字母缩略词进一步发展为 MAMAA。

现在,在 2024 年,我们有一个新的标签:七巨头。七巨头由苹果、亚马逊、Alphabet、Meta、微软、英伟达和特斯拉组成。让我们使用 Yahoo Finance 库下载 2023 年全年七巨头的市场数据。

import datetime
import os

import yfinance as yf

start_date = datetime.datetime(2023, 1, 1)
end_date = datetime.datetime(2023, 12, 31)
download_dir = os.path.join(os.getcwd(), 'marketdata')

magnificient_seven = ['AAPL', 'AMZN', 'GOOGL', 'META', 'MSFT', 'NVDA', 'TSLA']
for ticker_string in magnificient_seven:
  ticker = yf.Ticker(ticker_string)
  historical_data = ticker.history(start=start_date, end=end_date)
  file_path = os.path.join(download_dir, f'{ticker_string}.parquet')
  historical_data.to_parquet(file_path)
  print(f'{ticker_string} 下载完成.')

如果成功,输出结果将为

AAPL 下载完成.
AMZN 下载完成.
GOOGL 下载完成.
META 下载完成.
MSFT 下载完成.
NVDA 下载完成.
TSLA 下载完成.

设置 S3 连接

首先,我们需要创建一个 S3FileSystem 对象。与 S3 数据源交互所需的所有方法都挂载在这个对象上。以下代码将从环境变量中获取 MinIO 的端点、访问密钥和密钥,并创建一个 S3FileSystem 对象。

from s3fs import S3FileSystem

key = os.environ['MINIO_ACCESS_KEY']
secret = os.environ['MINIO_SECRET_ACCESS_KEY']
endpoint_url = os.environ['MINIO_URL']

s3 = S3FileSystem(anon=False, endpoint_url=endpoint_url,
                key=key,
                secret=secret,
                use_ssl=False)

创建存储桶

一旦我们拥有了一个 S3FileSystem 对象,就可以使用它来创建存储桶。makedir() 函数可以完成此操作。下面的代码首先检查存储桶是否存在,然后再创建它。makedir() 函数有一个 exist_ok 参数,你可能会认为如果存储桶已存在,则此参数将允许 makedir 成功完成。不幸的是,我发现事实并非如此。如果存储桶已存在,无论此参数如何设置,此函数都会抛出异常。因此,最好显式检查存储桶是否存在,并且仅在存储桶不存在时才调用 makedir()。

bucket_name = 'market-data'
if not s3.exists(bucket_name):
  s3.makedir(f's3://{bucket_name}', exist_ok=True)

上传市场数据

要将对象上传到我们新创建的存储桶,我们将使用 s3fs 的 put_file() 函数。此函数会将单个文件上传到 S3 对象存储中的目标位置,在本例中为 MinIO。它以本地源文件和远程路径作为参数。

magnificient_seven = ['AAPL', 'AMZN', 'GOOGL', 'META', 'MSFT', 'NVDA', 'TSLA']
for ticker_string in magnificient_seven:
  local_file_path = os.path.join(download_dir, f'{ticker_string}.parquet')
  remote_object_path = f'{bucket_name}/{ticker_string}.parquet'
  s3.put_file(local_file_path, remote_object_path)
  print(f'{ticker_string} 已上传。)

S3fs 还提供了一个 put() 函数,该函数会将本地目录的全部内容上传到指定路径。如果您的本地目录包含子文件夹,它甚至还具有递归开关。

上述代码完成后,您的 MinIO 存储桶将如下面的屏幕截图所示。

列出对象

列出存储桶中的对象非常简单。s3fs 对象有一个 ls() 方法,它以远程位置作为参数。如下所示。

object_list = s3.ls(f's3://{bucket_name}', detail=False)
object_list

当您调用 detail 参数设置为 False 的 ls() 方法时,返回值将是一个简单的 Python 列表,其中包含路径中所有对象的完整路径引用。当您需要一个循环遍历的列表时,请使用此版本的 ls()。下面显示了 Magnificent Seven 对象的列表。

['market-data/AAPL.csv',
'market-data/AMZN.csv',
'market-data/GOOGL.csv',
'market-data/META.csv',
'market-data/MSFT.csv',
'market-data/NVDA.csv',
'market-data/TSLA.csv']

如果您需要此函数返回更多详细信息,则将 detail 参数设置为 True。下面显示了显示详细信息的示例输出。(为简洁起见已截断。)

[{'Key': 'market-data/AAPL.parquet',
  'LastModified': datetime.datetime(2024, 1, 12, 14, 39, 12, 124000, tzinfo=tzutc()),
  'ETag': '"03e6abc232e58b1d83aa0b175ae04cf3"',
  'Size': 28612,
  'StorageClass': 'STANDARD',
  'type': 'file',
  'size': 28612,
  'name': 'market-data/AAPL.csv'},

...

Pandas 集成

如前所述,许多第三方库使用 s3fs 的 S3FileSystem 类来与兼容 S3 的对象存储进行交互。以下代码展示了如何使用 Pandas 将 MinIO 中的对象读取到 Pandas DataFrame 中。

import pandas as pd

ticker_string = 'NVDA'
storage_options={
  'key': key,
  'secret': secret,
  'endpoint_url': endpoint_url,
}
historical_data = pd.read_csv(f's3://{bucket_name}/{ticker_string}.parquet', storage_options=storage_options)
historical_data.tail(10)

输出结果

以上代码根本不需要文件系统就能将 MinIO 中的数据获取到 Pandas DataFrame 中。对于需要从原始数据创建数据集的数据科学家来说,这是一个非凡的功能。对于量化分析师来说,这也是一个强大的功能,因为 Pandas 基于 NumPy,可以创建数学算法来识别交易机会。

PyArrow 是另一个流行的库,它利用 s3fs 访问对象存储中的 S3。如果您正在处理大型对象,请考虑使用 PyArrow 的 S3 接口

绘制市场数据

在结束之前,让我们再玩一点。我现在想做的是从 MinIO 中提取所有市场数据并将其加载到一个 DataFrame 中。这可以通过下面的代码使用 Panda 的 concat 函数来完成。请注意,我还需要添加一个股票代码列。

storage_options={
  'key': key,
  'secret': secret,
  'endpoint_url': endpoint_url,
}

df_list = []
for ticker_string in magnificient_seven:
  new_data = pd.read_parquet(f's3://{bucket_name}/{ticker_string}.parquet'

                              storage_options=storage_options)
  new_data['Ticker'] = ticker_string
  df_list.append(new_data[['Ticker', 'Close']])

historical_data = pd.concat(df_list, axis=0)
historical_data[historical_data['Ticker']=='AMZN'].head()

输出

接下来,我们需要根据股票代码值对数据进行透视。

historical_data = historical_data.reset_index()
pt = historical_data.pivot(columns='Ticker', index='Date', values='Close')
pt.head()

现在,我们透视后的数据看起来像下面的屏幕截图。

现在,我们可以使用下面的代码绘制“Magnificient Seven”。

import matplotlib.pyplot as plt
import seaborn as sns

# 使用 Seaborn 设置样式。
sns.set_style('ticks')

# 将所有收盘价绘制为累积收盘值。
((pt.pct_change()+1).cumprod()).plot(figsize=(10, 7))
plt.legend()
plt.title('七宗最')

# 定义标签
plt.ylabel('累积收盘价')
plt.xlabel('日期')

# 绘制网格线
plt.grid(True)
sns.despine()
plt.show()

摘要

在这篇文章中,我们使用 MinIO 和 s3fs 库进行了一些有趣的尝试。我们下载了一些市场数据,使用 s3fs 库将其上传到 MinIO,然后再次使用 MinIO 的 S3 接口将数据集成到 Pandas 中。最后,我们使用 Matplotlib 和 Seaborn 对数据进行了可视化。

后续步骤

如果您一直关注我们的博客,那么您就会知道,我们像喜欢数据湖一样喜欢现代数据湖(也称为数据湖仓)。现代数据湖是一种集数据仓库和数据湖于一体的解决方案。这得益于开放式表格式(OTF),例如 Iceberg、Hudi 和 Deltalake。现代数据湖建立在对象存储之上,并允许与数据湖直接集成,同时提供一个处理引擎,能够通过 SQL 进行高级数据操作。如果您喜欢这篇文章中的内容并希望更进一步,那么请考虑构建一个现代数据湖。几个月前,我在这篇博文中展示了如何做到这一点:使用 Apache Iceberg 和 MinIO 构建数据湖仓

立即下载MinIO,了解构建数据湖仓有多么容易。如果您有任何疑问,请随时在Slack上联系我们!