使用 R、H2O 和 MinIO 进行金融数据(外汇)机器学习

Machine Learning with Finance Data (Forex) in R, H2O and MinIO

外汇(外汇交易)提供了大量数据和机会来应用机器学习。

在本教程中,我将使用 R、H2O 和 MinIO 来构建一个使用外汇(Forex)数据的非常简单的统计套利模型。我使用的是 TraderMade 作为 外汇数据 的来源。你可以在 如何在 R 中导入外汇数据 中了解更多关于 TraderMade 的外汇数据流的信息。我首先注册了一个免费账户并获取了 api_key。免费账户有一些限制,但对于本教程来说足够了。

如果你不熟悉 H2O,它是一个开源的分布式内存机器学习环境。我在之前的一篇文章中更详细地介绍了 H2O,使用 H20、R 和 MinIO 进行机器学习

用于统计计算的 R 编程语言 被统计学家和数据挖掘人员用于数据分析。我在之前的一篇文章中更详细地介绍了 R,MinIO 和 Apache Arrow 使用 R。R 在处理数据、数据分析和机器学习方面非常直观。

R 语言有一个名为 RStudio 的配套 IDE,我将在本次开发中使用它。

MinIO 是高性能软件定义的 S3 兼容对象存储,使其成为 Amazon S3 的强大而灵活的替代品。MinIO 与云原生分析和 ML/AI 框架无缝衔接。

请安装 R 和 RStudio,访问 H2O 集群,如果你还没有运行 MinIO,请 下载并安装它。此外,请下载并安装 aws.s3 R 库

统计套利

统计套利的概念是,2 种工具或资产的价格走势之间存在某种程度的可信赖关系。在本例中,我们将使用货币对作为工具。如果货币对之间的关系相当稳定,那么任何偏离这种关系的现象都可能伴随着回归到稳定关系的趋势。这意味着,当出现偏差时,如果我们相信相对价格将回归到之前观察到的关系,我们有机会采取仓位并可能获利。当然,潜在关系的偏差可能是由于一个或两个公司的商业模式的故意改变造成的,因此偏差并不保证获利机会。

一个经典的例子是假设存在稳定关系的两种资产,即同一行业中规模相似的公司的股价。以科技行业为例,观察两家规模相似的科技公司的股价走势,假设它们将同样受到行业整体变化的影响,例如同样训练的劳动力成本、利率、供应短缺等。我们应该能够辨别出这些公司股价之间的关系,如果我们看到偏差,如果我们相信相对股价将回归到之前观察到的关系,这可能是一个获利的机会。当然,潜在关系的偏差可能是由于一个或两个公司的商业模式的故意改变造成的,因此偏差并不保证获利机会。

另一个常见的例子是汽车公司,如 Investopedia 关于统计套利的文章 或维基百科关于 统计套利 的文章中所述。

评估工具价格时间序列之间的潜在关系是统计学或计量经济学中深入研究的领域。这是一个简化的教程,提供了 2 种简单的方法。第一种方法只是计算工具之间的差值,并使用 10 天滚动平均值和标准差来确定偏离关系的点。第二种方法使用一些历史数据使用 H2O AutoML 构建 ML 模型,然后应用该模型来预测工具之间未来应该存在的关系。将工具在某一天的实际差值与预测值进行比较,实际值与预测值之间的差值用于确定偏离关系的点。

教程

在本教程中,我从两个货币对开始 - eurjpy 和 gbpusd

货币对使用两个货币来指定 - eurjpy 是 1 欧元的日元汇率。欧元是“基础货币”,日元是“报价货币”。汇率 - 这种工具的价格 - 是购买 1 欧元所需的日元数量。目前,谷歌表示,购买 1 欧元需要 146.57 日元。

类似地,gbpusd 货币对的基础货币是英镑,报价货币是美元。汇率或这种工具的价格是购买 1 欧元所需的美元数量。同样,根据谷歌,汇率为 1.13 - 目前,购买 1 欧元需要 1.13 美元。

汇率与市场上交易的所有工具一样,都有一些卖家和一些买家,卖家“报价”希望有人支付的价格,买家“出价”一个价格。当卖家和买家之间的报价相同的时候,就会发生“交易”。在为实际应用构建类似统计套利的东西时,了解市场运作的结构以及典型“价差”非常重要,因为交易算法将受到这种价差的影响。不用说,本教程仅供信息参考,不应作为交易的依据。

我们将探索两种方法来确定这些货币之间的潜在关系。**实验 1** 使用它们之间的近期历史关系来预测关系应该是什么。任何偏离近期历史预测的现象都将是一个潜在的交易机会。我选择(相当随机地)使用 10 天的历史来计算货币对之间差值的平均值和标准差。**实验 2** 使用 H2O AutoML 构建模型来预测关系,该模型基于上半年训练数据。这些当然不是了解关系的唯一方法。大多数专业方法要复杂得多,需要在统计学或计量经济学方面有相当深的造诣。这种方法超出了本教程所能涵盖的范围。

数据

数据来自 TraderMade,如前所述,我使用的是每日时间序列。这些数据提供了每一天的市场开盘、高点、低点和收盘。在 Forex 中,“市场”的概念有点用词不当,因为外汇的兑换没有一个单一的清算所市场,而是多个市场,而且价格可能在这些市场之间有所不同。此外,外汇是一个连续市场,因此市场开盘和收盘的概念是人为的。由于我想要一个代表每一天的单一数据点,我选择将每一天的开盘价、高点和低点取平均值,并将该平均值用作某一天的单一汇率。这样做要说明的是,我用来代表“一天”的“每日价格”数据点实际上是本教程的平均值。这会降低实验的潜在性能,但会使解释更容易。

与其将数据存储为 CSV 文件或存储在 MySQL、MariaDB、PostgreSQL 或其他数据库中,我发现利用更新的“开放表格标准”文件格式并将数据存储在对象存储中要容易得多。MinIO 是这种用例的绝佳选择。

在本教程中,我将数据存储为 Parquet 格式,但是有几种新的格式提供了使用文件和表格结构功能的便利性。在本例中,我从 TraderMade 下载原始数据并按日期对其进行分区。按日期分区类似于在数据库术语中对日期创建索引。

保存后的数据如下所示。例如,在“eurjpy”的前缀中,数据按指定字段写入多个文件中,按日期进行分区。

这意味着,选择检索(例如检索日期范围)只读取相关文件。这种方法比将数据存储在 CSV 文件中要高效得多,因为 CSV 文件通常需要加载整个文件,然后在内存中进行选择。它也比建立一个带有所有必需配置、驱动程序和维护的独立数据库要方便得多。

以下是存储此数据的 R 代码

 # get the data from TraderMade and store as Parquet partitioned by date
  ccy1_dataframe_tick<-GetCCYTimeseries(ccy1,start_date,end_date)
  ccy1_dataframe_tick %>%
    group_by(quotes.date) %>%
    write_dataset(GetMinIOURI(ccy1), format = "parquet" )

第一行使用一个函数从 TraderMade 检索数据来隐藏 URL 的创建等。第二行是一个管道,将数据框通过 Dplyr 的“group_by”谓词进行管道化,然后将其管道化到一个函数,该函数将数据以 Parquet 格式写入我的 MinIO 集群。如前所述,“group_by”指令会导致数据按“quotes.date”进行分区。这再简单不过了。

数据检索同样简单。以下是检索所有这些货币数据到数据框中的 R 代码

df1 <- open_dataset(GetMinIOURI(ccy1), format = "parquet") %>% collect()

上面的代码生成了一个包含 132 行的数据框 - 所有我存储的数据。选择一行子集同样简单。在这里,我只加载“2022-04-01”之后的数据

df1 <- open_dataset(GetMinIOURI(ccy1), format = "parquet") %>% filter(quotes.date > "2022-04-01") %>% collect()

注意使用过滤器来指定加载数据的“where 子句”。这非常简单,不需要外部数据库的维护。由于只加载了请求的数据,因此 MinIO 对象存储的性能也很出色,假设分区是正确的。

回到我们手头的这个问题

为了创建一个表示这两种工具之间关系的单一时间序列,我首先取它们之间的汇率差值——也就是说,我从另一个汇率中减去一个汇率的每日价格。为了查看该值相对于这两种工具之间“现有关系”的位置,我用前 10 天差值的平均值来比较它的标准差。

什么?是的,计算每天汇率的差值。对于每一天,计算前 10 天差值的平均值。然后确定今天的汇率与该平均值的距离是多少标准差。

以下是一个示例。下面是 2022 年 3 月 1 日至 2022 年 8 月 31 日的图表。此图表取 EURJPY 并减去 GBPUSD 以获得差值。然后,它将每个点绘制为前 10 天差值的平均值的标准差。因此,在其峰值处,这大约高于前 10 天差值的平均值的 2.3 个标准差。在最低点,这大约低于前 10 天差值的平均值的 2.7 个标准差。红色散列线位于高于和低于 1.5 个标准差的位置,以便直观参考。

这种数据表示的优点是,它在零点之上和之下变化相当均匀。我们建议零线(前 10 天差值的平均值)代表潜在关系,当数据偏离该线时,我们相信它随后会恢复。

我们看到两种货币之间的标准化差值多次突破上限和下限。请记住,我选择的阈值是任意的,并且不需要它们在平均值周围对称。此外,Y 轴上的单位是标准差。由于数据不太可能呈正态分布或高斯分布,因此标准差的概念并不真正成立。但是,它提供了一种简单的方法来讨论任何单个测量的距离与平均值的距离,因此我们将使用它。

解释

漂亮的线;它意味着什么?当蓝线较高时,这意味着差值(EURJPY 减去 GBPUSD)相对于前 10 天较大——这意味着 EURJPY 按照 GBPUSD 的价格来说很贵。当这种情况发生时,我们相信 EURJPY 会下跌,GBPUSD 会上涨(因此该线会向“零”移动——前 10 天差值的平均值)。当蓝线较低时,这意味着差值很小,我们相信 EURJPY 在不久的将来会相对于 GBPUSD 上涨,以便向“零”移动。

当蓝线较高时,我们希望卖出 EURJPY(卖出 EUR 并以日元获得收益)以利用预期恢复。同时,我们希望买入 GBPUSD(买入英镑并以美元支付)。我们会这样做,因为我们认为 EURJPY 汇率相对于 GBPUSD 汇率较高,这是根据我们正在考虑的历史数据得出的。当蓝线较低时,我们希望买入 EURJPY 并卖出 GBPUSD,原因相似——我们相信汇率的走势很快就会改变,我们相信我们可以获利。

卖出 EUR 意味着什么?好吧,我住在美国,通常持有美元 (USD),因此为了卖出 EUR,我必须首先用我的美元购买一些欧元。为了运行本教程,我需要从一开始就持有这些货币中的每一种来开始交易,因为我可能需要放弃其中的任何一种作为交易的一部分,而我不能放弃我不拥有的东西。

那么我们该如何做到这一点?首先,我们在给定货币中设置一个“起始余额”的概念——对我来说,它将是美元。然后,我们购买我们将可能交易的每种货币中的一些。因此,我购买了 EURUSD、JPYUSD、GBPUSD、USDUSD——我购买了我可能需要的货币,并用我的美元支付了这些货币。请注意最后一个实际上并没有购买任何东西。在每个实验结束时,我将把这些货币兑回美元,以评估其表现。

一旦我们拥有一些我们将需要的货币,我们就会根据信号进行交易。当我们完成并已将我们的货币兑回美元后,我们可以查看我们是否赚了钱或赔了钱。简单但有效。

将 MinIO 和 Parquet 用于金融数据的 AI/ML 和数据分析

我正在使用 MinIO,它是一个 100% 与 S3 兼容的对象存储。数据被下载并存储为 Parquet 文件。Parquet 是较新的表格文件格式之一,它是 CSV 文件的高效替代品。Parquet 和其他“开放标准”文件正在迅速取代 CSV 数据存储并取代专有数据库存储格式——大多数数据库产品现在能够使用外部 Parquet 文件作为“外部表”。这意味着数据库产品提供的功能(索引和查询等)与其底层存储格式之间存在分离。这对数据用户来说是一个巨大的进步。存储在 MinIO S3 存储上的 Parquet 文件(或 ORC、Hudi 或 Iceberg——现在有很多开放标准)为 AI/ML 和分析工作负载提供了一种高效且经济高效的数据存储解决方案。

我还使用 R 和 H2O。如果您阅读过我之前的博客,那么这两种产品是我最喜欢的产品,适合这种工作。

实验一

首先,我创建了一个 R 脚本文件来加载名为 packages.R 的所需包。

#load necessary libraries

if (!require("httr")) {
  install.packages("httr")
  library (httr)
}

if (!require("jsonlite")) {
  install.packages("jsonlite")
library (jsonlite)
}

if (!require("ggplot2")) {
  install.packages("ggplot2")
  library(ggplot2)
}

if (!require("arrow")) {
  install.packages("arrow")
  library(arrow)
}

if (!require("dplyr")) {
  install.packages("dplyr")
  library(dplyr)
}

if (!require("zoo")) {
  install.packages("zoo")
  library(zoo)
}

if (!require("aws.s3")) {
  install.packages("aws.s3")
  library(aws.s3)
}

if (!require("lubridate")) {
  install.packages("lubridate")
  library(lubridate)
}

if (!require("quantmod")) {
  install.packages("quantmod")
  library(quantmod)
}
  
if (!require("zoo")) {
  install.packages("zoo")
  library(zoo)
}

if (!require("h2o")) {
  install.packages("h2o")
  library(h2o)
}


GetMinIOURI <- function(prefix) {
  # get minio config, with expected defaults
  minio_key <- Sys.getenv("MINIO_ACCESS_KEY", "minioadmin")
  minio_secret <- Sys.getenv("MINIO_SECRET_KEY", "minioadmin")
  minio_host <- Sys.getenv("MINIO_HOST", "Your MinIO Host IP address")
  minio_port <- Sys.getenv("MINIO_PORT", "Your MinIO Port")
  minio_arrow_bucket <- Sys.getenv("MINIO_ARROW_BUCKET", "arrow-bucket")
  # helper function for minio URIs
  minio_path <- function(...) paste(minio_arrow_bucket, ..., sep = "/")
  minio_uri <- function(...) {
    template <- "s3://%s:%s@%s?scheme=http&endpoint_override=%s%s%s"
    sprintf(template, minio_key, minio_secret, minio_path(...), minio_host, ":", minio_port)
  }
  return(minio_uri(prefix))
}

接下来,我创建了一些数据检索函数来从 Tradermade 中获取数据,并将其放在一个名为 DataFunctions.R 的 R 脚本中。

source("packages.R")

# using TraderMade
# https://marketdata.tradermade.com
# uses an api_key
api_key <- "Your Free TraderMade API Key"

GetCCYList <- function(...) {
# get list of ccys
req <- paste0("https://marketdata.tradermade.com/api/v1/historical_currencies_list?api_key=",api_key)
data_raw <- GET(url = req)
data_text <- content(data_raw, "text", encoding = "UTF-8")

data_json <- fromJSON(data_text, flatten=TRUE)
dataframe <- as.data.frame(data_json)
# this is like 9700 rows
return(dataframe)
}

GetCCYRates <- function(ccy1,ccy2) {
#get current rates
req <- paste0("https://marketdata.tradermade.com/api/v1/live?currency=",toupper(ccy1),",",ccy2,"&api_key=",api_key)
data_raw <- GET(url = req)
data_text <- content(data_raw, "text", encoding = "UTF-8")

data_json <- fromJSON(data_text, flatten=TRUE)
dataframe <- as.data.frame(data_json)
return(dataframe)
}

GetCCYTimeseries <- function(ccy,start_date,end_date) {
# get timeseries historical data for ccy
tick_req <- paste0("https://marketdata.tradermade.com/api/v1/timeseries?currency=",
                   toupper(ccy),"&api_key=",api_key,"&start_date=",
                   start_date,"&end_date=",end_date,"&format=records",collapse="")
data_tick_raw <- GET(url = tick_req)
data_tick_text <- content(data_tick_raw, "text", encoding = "UTF-8")
data_tick_json <- fromJSON(data_tick_text, flatten=TRUE)
dataframe_tick <- as.data.frame(data_tick_json)
dataframe_tick$quotes.avg <- 
  (dataframe_tick$quotes.open + dataframe_tick$quotes.high + dataframe_tick$quotes.low)/3 
dataframe_tick$quotes.date <- ymd(dataframe_tick$quotes.date)

#sort by date
dataframe_tick <- dataframe_tick[order(dataframe_tick$quotes.date),]

return(dataframe_tick)
}

由于数据框是 R 处理数据的方式,因此这些数据函数被以下数据框函数 DFFunctions.R 使用。

source("packages.R")
source("DataFunctions.R")


DownloadCCYDF <- function(ccy1, start_date, end_date) {
  # get the data
  ccy1_dataframe_tick<-GetCCYTimeseries(ccy1,start_date,end_date)
  ccy1_dataframe_tick %>%
    group_by(quotes.date) %>%
    write_dataset(GetMinIOURI(ccy1), format = "parquet" )

  return (ccy1_dataframe_tick)
}


DownloadCCYDFs <- function(ccy1, ccy2, start_date, end_date) {
  # get the data
  ccy1_dataframe_tick<-GetCCYTimeseries(ccy1,start_date,end_date)
  ccy1_dataframe_tick %>%
    group_by(quotes.date) %>%
    write_dataset(GetMinIOURI(ccy1), format = "parquet" )
  
  ccy2_dataframe_tick<-GetCCYTimeseries(ccy2,start_date,end_date)
  ccy2_dataframe_tick %>%
    group_by(quotes.date) %>%
    write_dataset(GetMinIOURI(ccy2), format = "parquet" )
  
  return (list(ccy1_dataframe_tick, ccy2_dataframe_tick))
}

LoadCCYDF <-function(ccy1) {
  df1 <- open_dataset(GetMinIOURI(ccy1), format = "parquet") %>% collect()
  return(df1)
}


LoadCCYDFs <-function(ccy1, ccy2) {
  df1 <- open_dataset(GetMinIOURI(ccy1), format = "parquet") %>% collect()
  df2 <- open_dataset(GetMinIOURI(ccy2), format = "parquet") %>% collect()
  return(list(df1,df2))
}


CreateJointDF<-function(ccy1_df,ccy2_df,rolling_period) {
  
  # we believe this relationship should be stable
  joint_df <- inner_join(ccy1_df,ccy2_df, by = 'quotes.date')
  
  # order by date
  joint_df<- joint_df[order(joint_df$quotes.date),]
  
  # ok, simple case, the relationship between ccy1 and ccy2 is absolute, so just subtract
  joint_df$quotes.diff <- joint_df$quotes.avg.x - joint_df$quotes.avg.y
  
  # add a rolling mean and sd at 10 days
  joint_df <- joint_df %>%
    mutate(rolling.mean = rollmean(quotes.diff, k=rolling_period, fill=NA, align='right')) %>%
    mutate(rolling.sd = rollapplyr(quotes.diff, rolling_period, sd, fill = NA))
  
  # remove the NAs in the beginning of the rolling mean
  joint_df <- joint_df[complete.cases(joint_df), ]
  
  joint_df$quotes.diff.normalized <- 
    (joint_df$quotes.diff - joint_df$rolling.mean) / joint_df$rolling.sd
  
  #   # write this as parquet files
  # joint_df %>%
  #   write_dataset(GetMinIOURI(paste0(ccy1,"-",ccy2)), format = "parquet" )
  
  return (joint_df)
  
}


CreateLaggedDF<-function(ccy1_df,ccy2_df,lag) {
  
  # we believe this relationship should be stable
  tmp_df <- inner_join(ccy1_df,ccy2_df, by = 'quotes.date')
  
  # sort the result
  tmp_df<-tmp_df[order(tmp_df$quotes.date),]
  
  # we are going to predict this difference
  tmp_df$quotes.diff <- tmp_df$quotes.avg.x - tmp_df$quotes.avg.y
  
  #save off date vector
  df.date<-tmp_df$quotes.date
  
  #save off the target
  df.target<-tmp_df$quotes.diff
  
  #save off the data that we are going to lag
  df.data<-tmp_df$quotes.diff
  df.data<-zoo(df.data,df.date)
  
  #create lagged df
  df.data.lag<-Lag(df.data,k=1:lag)
  df.data.lag<-as.data.frame(df.data.lag)
  
  # cbind the date and target on the front
  lagged_df<-cbind(df.date,df.target,df.data.lag)
  lagged_df<-lagged_df[complete.cases(lagged_df),]
  lagged_df$df.date <- ymd(lagged_df$df.date)
  
  #   # write this as parquet files
  # lagged_df %>%
  # write_dataset(GetMinIOURI(paste0(ccy1,"-",ccy2,"-lagged-",lag)), format = "parquet" )

  # this is a file that can be used for training a model
  return (lagged_df)
  
  
}

为了评估这些实验的结果,我们将需要一些回测函数。以下是该文件 TestFunctions.R

source("packages.R")
source("DFFunctions.R")




TestActionsDF<-function(ccy1,ccy2,actions_df,startingUSD,start_date,end_date) {
  
  # start with an equal amount (in USD) of each)
  USDBal <- startingUSD
  x = floor(USDBal/4)
  
  #load the data - get each in terms of USD
  c1<-toupper(paste0(substring(ccy1,1,3),"USD"))
  c2<-toupper(paste0(substring(ccy1,4,6),"USD"))
  c3<-toupper(paste0(substring(ccy2,1,3),"USD"))
  c4<-toupper(paste0(substring(ccy2,4,6),"USD"))
  
  # get the individual currency / USD rates
  if (c1 != "USDUSD") {
    #c1_df<- DownloadCCYDF(c1,start_date,end_date)
    c1_df <- LoadCCYDF(c1)
    p_c1 <- (c1_df %>% filter(quotes.date == ymd(start_date)))$quotes.avg
    c1_bal <- floor(x/p_c1)
    USDBal <- USDBal - (c1_bal*p_c1)
    c1_trade_amt <- floor(c1_bal/20)
  } else {
    c1_bal = x
    USDBal = USDBal - x
  }
  
  if (c2 != "USDUSD") {
    #c2_df<- DownloadCCYDF(c2,start_date,end_date)
    c2_df <- LoadCCYDF(c2)
    p_c2 <- (c2_df %>% filter(quotes.date == ymd(start_date)))$quotes.avg
    c2_bal <- floor(x/p_c2)
    USDBal <- USDBal - (c2_bal*p_c2)
    c2_trade_amt <- floor(c2_bal/20)
  } else {
    c2_bal = x
    USDBal = USDBal - x
  }
  
  
  if (c3 != "USDUSD") {
    #c3_df<- DownloadCCYDF(c3,start_date,end_date)
    c3_df <- LoadCCYDF(c3)
    p_c3 <- (c3_df %>% filter(quotes.date == ymd(start_date)))$quotes.avg
    c3_bal <- floor(x/p_c3)
    USDBal <- USDBal - (c3_bal*p_c3)
    c3_trade_amt <- floor(c3_bal/20)
  } else {
    c3_bal = x
    USDBal = USDBal - x
  }
  
  
  if (c4 != "USDUSD") {
    #c4_df<- DownloadCCYDF(c4,start_date,end_date)
    c4_df <- LoadCCYDF(c4)
    p_c4 <- (c4_df %>% filter(quotes.date == ymd(start_date)))$quotes.avg
    c4_bal <- floor(x/p_c4)
    USDBal <- USDBal - (c4_bal*p_c4)
    c4_trade_amt <- floor(c4_bal/20)
  } else {
    c4_bal = x
    USDBal = USDBal - x
  }
  
  # OK, we have a balance in each of the desired currencies, and we have a notion of the trade amount
  # apply the trades and see what happens
  
  actions <- actions_df %>% filter (quotes.date > ymd(start_date) & quotes.date <= ymd(end_date))
  
  # apply the trades
  for (row in 1:nrow(actions)) {
    # figure out what we are buying and selling and get the rates on the specified dates
    d = actions[row,1]
    buy.ccy = actions[row,2]
    sell.ccy = actions[row,3]
    
    # handle the buy
    if (buy.ccy == ccy1) {
      # get he exchange rate on this date
      ex_rate = (ccy1_df %>% filter(quotes.date == ymd(d)))$quotes.avg
      if (c2_bal >= c1_trade_amt / ex_rate) {
        c1_bal = c1_bal + c1_trade_amt
        c2_bal = c2_bal - (c1_trade_amt * ex_rate)
      }
    } else {
      # get he exchange rate on this date
      ex_rate = (ccy2_df %>% filter(quotes.date == ymd(d)))$quotes.avg
      if (c4_bal >= c3_trade_amt / ex_rate) {
        c3_bal = c3_bal + c3_trade_amt
        c4_bal = c4_bal - (c3_trade_amt * ex_rate)
      }
      
    }
    
    #handle the sell
    if (sell.ccy == ccy1) {
      # get he exchange rate on this date
      ex_rate = (ccy1_df %>% filter(quotes.date == ymd(d)))$quotes.avg
      if (c1_bal >= c1_trade_amt) {
        c1_bal = c1_bal - c1_trade_amt
        c2_bal = c2_bal + (c1_trade_amt * ex_rate)
      }
    } else {
      # get he exchange rate on this date
      ex_rate = (ccy2_df %>% filter(quotes.date == ymd(d)))$quotes.avg

      if (c3_bal >= c3_trade_amt) {
        c3_bal = c3_bal - c3_trade_amt
        c4_bal = c4_bal + (c3_trade_amt * ex_rate)
      }
    }
  }
  
  # unwind and accumulate the balance in USD
  # get the end quotes for individual currency / USD at the end date
  if (c1 != "USDUSD") {
    p_c1 <-c1_df[c1_df$quotes.date==end_date,]$quotes.avg
    USDBal <- USDBal + (c1_bal * p_c1)
  } else {
    USDBal <- USDBal + c1_bal
  }
  
  if (c2 != "USDUSD") {
    p_c2 <- c2_df[c2_df$quotes.date==end_date,]$quotes.avg
    USDBal <- USDBal + (c2_bal * p_c2)
  } else {
    USDBal <- USDBal + c1_bal
  }
  
  if (c3 != "USDUSD") {
    p_c3 <- c3_df[c3_df$quotes.date==end_date,]$quotes.avg
    USDBal <- USDBal + (c3_bal * p_c3)
  } else {
    USDBal <- USDBal + c1_bal
  }
  
  if (c4 != "USDUSD") {
    p_c4 <- c4_df[c4_df$quotes.date==end_date,]$quotes.avg
    USDBal <- USDBal + (c4_bal * p_c4)
  } else {
    USDBal <- USDBal + c1_bal
  }
  
  
  return ((USDBal-startingUSD)/startingUSD)
  
}

对于第一个实验,以下是主文件 MainAbsolute.R,它读取数据、整理数据、确定一些买入和卖出点,最后回测结果。

source("packages.R")
source ("DFFunctions.R")
source ("TestFunctions.R")


ccy1 = "eurjpy"
ccy2 = "gbpusd"

# these are used to get the data
start_date="2022-06-01"
end_date="2022-08-31"

# Adding buy/sell indicator
sd_threshold <- 1.5


# load the data
#df_list <- DownloadCCYDFs(ccy1,ccy2, start_date, end_date)
df_list <- LoadCCYDFs(ccy1,ccy2)
ccy1_df<-df_list[[1]]
ccy2_df<-df_list[[2]]

#sort these
ccy1_df<-ccy1_df[order(ccy1_df$quotes.date),]
ccy2_df<-ccy2_df[order(ccy2_df$quotes.date),]

# Test 1 - straight stat-arb using subtraction
joint_df<-CreateJointDF(ccy1_df,ccy2_df,10)


# graph these
ggplot(aes(x = quotes.date, y = quotes.diff.normalized, group=1), data = joint_df) +
  geom_point(color = "blue")  +
  geom_line(color = "blue") +
  geom_hline(yintercept=sd_threshold, linetype="dashed", 
             color = "red") +
  geom_hline(yintercept=-sd_threshold, linetype="dashed", 
             color = "red") +
  theme(axis.text.x = element_text(angle = 90))

joint_df <- joint_df %>%
  mutate(sell = case_when(
    quotes.diff.normalized > sd_threshold ~ ccy1,
    quotes.diff.normalized < (-1*sd_threshold) ~ ccy2)) %>%
  mutate(buy = case_when(
      quotes.diff.normalized > sd_threshold ~ ccy2,
      quotes.diff.normalized < (-1*sd_threshold) ~ ccy1))

# get rid of some unnecessary cols
joint_df <- joint_df %>% select(quotes.date,quotes.diff.normalized,buy,sell)

actions_df <- joint_df[,c('quotes.date','buy','sell')]
actions_df <- actions_df[complete.cases(actions_df), ]


startingUSD=10000

ret <- TestActionsDF(ccy1,ccy2,actions_df,startingUSD,start_date,end_date)
ret

让我们讨论一下实验一的结果。有几点需要注意。

  1. 我们测试这段时间的范围是 **2022 年 6 月 1 日至 2022 年 8 月 31 日**。
  2. 简单地将一些美元在期初转移到这些货币,然后在期末转移回美元,其表现为 -0.0675 或 **-6.75%**。我们将以此作为我们的“零假设”。如果我们不进行任何交易,那么这就是收益——我们必须用我们的指标系统来克服这一收益。
  3. 如果我们使用减法来定义关系,并使用 10 天滚动平均值作为基准,那么表现为 -0.06616 或大约 **-6.62%**。这比根本不交易要好,所以我们正在朝着正确的方向前进。
  4. 当然,我们忽略了大量现实世界因素,例如与这些交易相关的任何费用,这些费用会影响收益。

实验二

我们可以通过使用机器学习来定义/预测这些工具之间的关系,并创造更有利可图的交易来做得更好吗?我正在使用 H2O 的 AutoML 功能来构建一个模型,该模型可以预测两种货币之间的差值应该是什么(根据它所训练的数据)。然后,我将实际差值与预测值进行比较,如果实际值和预测值不一致,那么这似乎是一个交易机会。

我们需要另外几个文件来测试它,第一个是 MLFunctions.R

source("Packages.R")

# initialize the h2o server
h2o.init(ip="Your H2O host IP address", port=54321, startH2O=FALSE, 
         jvm_custom_args = "-Dsys.ai.h2o.persist.s3.endPoint=”http://Your MinIO Host”:9000 -Dsys.ai.h2o.persist.s3.enable.path.style=true")
h2o.set_s3_credentials("Your MinIO Access Key", "Your MinIO Secret Key")


TrainModel<-function(training_df) {
  training_df.hex<-as.h2o(training_df, destination_frame= "training_df.hex")
  
  splits <- h2o.splitFrame(data = training_df.hex, 
                           ratios = c(0.6),  #partition data into 60%, 40%
                           seed = 1)  #setting a seed will guarantee reproducibility
  train_hex <- splits[[1]]
  test_hex <- splits[[2]]
  
  y_value <- 1
  predictors <- c(2:ncol(train_hex))
  
  
  # train
  aml = h2o.automl(y=y_value,x=predictors,
             training_frame=train_hex,
             leaderboard_frame = test_hex,
             max_runtime_secs = 60,
             seed = 1)
  model<-aml@leader
  
  # train model
  # model <- h2o.deeplearning(y=y_value, x=predictors,
  #                              training_frame=train_hex,
  #                              activation="Tanh",
  #                              autoencoder=FALSE,
  #                              hidden=c(50),
  #                              l1=1e-5,
  #                              ignore_const_cols=FALSE,
  #                              epochs=1)
  
  
  
  # save the leader model as bin
  model_path <- h2o.saveModel(model, path = "s3://bin-models/stat_arb_model_bin")

  return(model)

}


RunModel<-function(model,pred_df) {
  pred_df.hex<-as.h2o(pred_df)
  pred.hex <- h2o.predict(model, pred_df.hex)  # predict(aml, test) and h2o.predict(aml@leader, test) also work
  pred<-as.data.frame(pred.hex)
  return(pred)
}

以及一个新的 MainModel.R 来练习模型方法。

source("Packages.R")
source ("DFFunctions.R")
source("MLFunctions.R")
source ("TestFunctions.R")

# Example of multiple approaches to stat arb in forex

ccy1 = "eurjpy"
ccy2 = "gbpusd"

# these are used to get the data
start_date="2022-03-01"
end_date="2022-08-31"
start_training_date = "2022-03-01"
end_training_date = "2022-05-31"
start_pred_date = "2022-06-01"
end_pred_date = "2022-08-31"



# load the data
#df_list <- DownloadCCYDFs(ccy1,ccy2, start_date, end_date)
df_list <- LoadCCYDFs(ccy1,ccy2)
ccy1_df<-df_list[[1]][,c("quotes.date","quotes.avg")]
ccy2_df<-df_list[[2]][,c("quotes.date","quotes.avg")]

#sort these
ccy1_df<-ccy1_df[order(ccy1_df$quotes.date),]
ccy2_df<-ccy2_df[order(ccy2_df$quotes.date),]

# Test 2 - arb using a model to predict the difference
lagged_df<-CreateLaggedDF(ccy1_df,ccy2_df,10)

# create the training df
training_df <- lagged_df %>% filter(df.date > ymd(start_training_date), df.date <= ymd(end_training_date))
#get rid of the date training_df
training_df<-training_df[-1]
# create the pred df
pred_df <- lagged_df %>% filter(df.date > ymd(start_pred_date), df.date <= ymd(end_pred_date))


#write the files out
training_df %>% write_dataset(GetMinIOURI("training_df"), format = "parquet" )
pred_df %>% write_dataset(GetMinIOURI("pred_df"), format = "parquet" )

#train the model on this data
model <- TrainModel(training_df)
model

# okay, run the model on the data from 6/1 to 8/31
# it's ok that date and target are in here, the model selects the X values by col name
pred<-RunModel(model,pred_df)

# wrangle the return
predictions<-cbind(quotes.date=pred_df$df.date,actual=pred_df$df.target,pred=pred)
predictions$quotes.date<-as.Date(predictions$quotes.date)
predictions$diff<-predictions$actual-predictions$pred


# Adding buy/sell indicator
upper_threshold <- 5.0
lower_threshold <- 0.5

ggplot(aes(x = quotes.date, y = diff, group=1), data = predictions) +
  geom_point(color = "blue")  +
  geom_line(color = "blue") +
  geom_hline(yintercept=upper_threshold, linetype="dashed", 
             color = "red") +
  geom_hline(yintercept=lower_threshold, linetype="dashed", 
             color = "red") +
  theme(axis.text.x = element_text(angle = 90))


predictions <- predictions %>%
  mutate(sell = case_when(
    diff > upper_threshold ~ ccy1,
    diff < (lower_threshold) ~ ccy2)) %>%
  mutate(buy = case_when(
    diff > upper_threshold ~ ccy2,
    diff < (lower_threshold) ~ ccy1))



actions_df <- predictions %>% select(quotes.date,buy,sell) %>% filter(! is.na(buy))

# Let's test it for 6/1 - 8/31
startingUSD=10000

ret <- TestActionsDF(ccy1,ccy2,actions_df,startingUSD,start_date,end_date)
ret

请注意,在建模函数中,我们运行 H2O AutoML 并允许它从其尝试的模型中选择最佳模型。我们用于 AutoML 执行的参数只是为了这个演示。一旦模型被选中,我们将其以二进制格式保存回 MinIO,这使得它可以在没有 R 的情况下从生产工作流程中执行。该模型可以存储在模型存储库中,但老实说,我发现这些存储库通常有点过时。直接存储模型以及有关模型的一些元数据通常就足够了。

使用建模方法来识别差异会更加复杂。我使用 2022 年 3 月 1 日至 2022 年 5 月 31 日的数据创建了一个 10 天的“滞后”训练集。这是有效的,但这是在时间序列数据上训练模型的最基本方法,它存在一些问题和缺点。我使用滞后训练数据与 H2O AutoML 训练一个模型,使其能够预测差值。我们将此预测用作货币之间的预期差值,而不是像在第一个实验中那样使用滚动平均值。如果您有兴趣处理时间序列数据以用于 AI/ML,这里有一篇关于 H2O 的好文章:时间序列预测博客。此外,本文末尾有一个关于作者 Marcos Lopez De Prado 的书籍的推荐,这本书对这个复杂主题进行了更深入的处理:金融机器学习进展

以下是 2022 年 6 月 1 日至 2022 年 8 月 31 日期间模型预测值与实际值之间差值的图表。

让我们看一下 **实验二** 的结果。

我们做得怎么样?这组实验的 **零假设** 是 **-6.75%**。第一个实验——使用 **滚动平均静态关系** 并按照指示进行交易,其表现为 **-6.62%。这稍微好一些,至少正在朝着正确的方向前进。**

使用在滞后历史关系上训练的 **AutoML 模型**,然后使用这种方法来预测差值,使我们能够略微提高收益。使用这种方法,其表现为 -0.05997 或大约 **-6.0%,比之前的实验略好。**

我们能做得更好吗——当然可以?这两个实验非常有限,仅用于教学目的。每个外汇交易员都有自己的方法,因此我试图提供一个易于定制的过程。还有许多金融途径可以用来改进此处概述的方法。

无论我们的财务策略如何,我们的技术途径都很明确——云原生 MinIO 对象存储具有性能和弹性,可以支持各种统计和 AI/ML 工作负载。软件定义、与 S3 兼容且性能极佳的 MinIO 是此类工作负载的标准。这就是为什么 MinIO 是 Kubeflow 的标准配置,并且可以与所有主要 AI/ML 框架无缝协作,包括 H2OTensorFlow

结论

本教程探讨了使用一对汇率之间的统计套利——EURJPY 和 GBPUSD。还有许多其他值得探索的汇率。它只研究了“日线”(开盘价、最高价、最低价、收盘价数据)。还有其他时间框架或粒度值得探索——“小时线”、“分钟线”、“秒线”,以及实际的逐笔数据——它们在时间域中不是均匀的,需要进行一些额外的处理。

如果你对金融算法和处理金融数据感兴趣,我推荐 高频金融入门。 作者讨论了高频(逐笔)数据在理解市场微观结构方面的价值和用途,包括对处理大量数据所需最佳数学模型和工具的讨论。这些示例以及许多主题都与外汇相关。

如果你对将机器学习应用于金融数据感兴趣,我推荐 金融机器学习的进展。作者提供了关于使用高级机器学习解决方案来克服现实世界投资问题的实用见解。

数据库和分析市场正在发生翻天覆地的变化,这为金融(和其他)数据打开了更大的协作和洞察力。近年来,许多数据库和分析软件包采用了开放表格标准,例如 Parquet、ArrowIcebergHudi。毫无疑问,数据库和分析产品局限于专有文件格式的时代即将结束。这些开放表格数据标准允许在不损失存储效率的情况下提高处理灵活性。它们还允许数据在产品之间移动 - 例如,存储在开放文件格式中的 Big Query 数据可以从 Snowflake 作为外部表进行访问。开放文件格式针对对象存储进行了优化。例如,本教程中创建的 Parquet 文件可以从能够使用开放表格格式合并外部表的数据管理和分析程序中访问。它们非常适合 MinIO 对象存储,因为它具有云原生和无处不在的特性。同样,对于内部部署,在商品硬件上运行的对象存储为客户创造了更大的价值。硬件供应商之间对商品硬件的竞争保证了低成本和高价值。

未来是开放的、云原生的和分布式的。