Apache Airflow 是一个开源平台,用于以编程方式创建、调度和监控工作流。它最初由 Airbnb 的工程团队开发,后来捐赠给了 Apache 软件基金会,并在 Apache 2.0 许可证下发布。
Airflow 通常用于数据工程和数据科学管道中,以自动化任务执行,例如数据转换、加载和分析。它也应用于其他行业,例如金融、医疗保健和电子商务,以自动化业务流程。
Airflow 在连接方面非常灵活。这包括数据湖、数据仓库、数据库、API 以及对象存储。它在以下使用数据管道即代码 (data-pipelines-as-code) 获益的用例中表现出色:
- 自动化驱动的数流
- 机器学习模型训练和再训练
- 备份和快照
Airflow 使用 Python 编写,并使用 有向无环图 (DAG) 来表示工作流。DAG 中的每个节点代表一个任务,节点之间的边代表任务之间的依赖关系。DAG 不关心任务本身,只关心顺序、重试次数等。一个复杂的 DAG 可能会变得脆弱且难以排查,尤其是在架构师需要管理数十个任务的情况下。
Airflow 提供了一个 Web 界面来管理和监控工作流,以及一个 API 来创建、更新和删除工作流。它还拥有丰富的功能,包括对调度、警报、测试和版本控制的支持。
MinIO 是 Airflow 的完美伴侣,因为它拥有业界领先的性能和可扩展性,让所有数据密集型工作负载触手可及。通过将 PB 级别的数据存储在 MinIO 存储桶中,您可以在 Airflow 中创建数据管道来处理海量数据,这对于 DAG 能够尽可能快地运行至关重要。处理完成后,您甚至可以将最终结果存储回 MinIO 存储桶中,供其他工具使用。MinIO 能够提供极高的性能 - 最近的基准测试在仅使用 32 个标准 NVMe SSD 节点的条件下,GET 操作达到了 325 GiB/s (349 GB/s),PUT 操作达到了 165 GiB/s (177 GB/s)。
此外,Apache Airflow 还可以将其日志存储在 MinIO 存储桶中。这在云或容器编排环境中很有帮助,在这些环境中,本地文件系统是短暂的,如果机器终止或容器停止,日志可能会丢失。
为了最大程度地降低这些环境中数据丢失的风险,建议使用更持久且更原生于云的存储解决方案(如 MinIO)来存储 PB 级的数据和日志。这可以确保即使机器或容器终止,数据也能持久保存。此外,只要网络和安全许可,它们就可以从任何位置访问。
使用 MinIO 与 Apache Airflow 结合有多个原因:
- MinIO 通过业界领先的 性能 使 Airflow DAG 运行速度更快,使其处理速度快于任何其他数据存储,并且成本更低。通过将 DAG 所需的所有数据存储在 MinIO 中,您可以大幅降低数据存储成本,同时获得最佳的性能与成本比。
- MinIO 依靠 擦除编码 为您的数据提供高可用性和持久性,并且还具有 可配置的数据保护策略,以确保即使在服务器故障或其他灾难的情况下也不会丢失数据。
- MinIO 可以将数据存储在多个区域,并在它们之间进行 复制,允许您将数据存储在靠近用户的位置或符合数据法规的区域。这可以降低延迟并提高工作流的性能。
- MinIO 与 Apache Airflow 无缝集成,允许您使用 S3 API 存储和检索数据和其他日志。这使得 MinIO 易于与 Airflow 设置和使用,无需任何额外的配置。MinIO 的 云原生集成 意味着它可以与最广泛使用的软件和框架顺利协作。
Apache Airflow 和 MinIO 教程
在本教程中,我们将向您展示 Airflow 与 MinIO 的多个用例。
- 首先,我们将向您展示如何将日志从 MinIO DAG 运行发送到 MinIO 存储桶。
- 接下来,我们将创建一个自定义 DAG,用于在后处理后将对象从 API 发送到 MinIO 存储桶。
安装 Airflow
使用 Pip 安装 Airflow。如果未安装 pip
,则需要安装它;如果 python
不可用,则还需要对其进行符号链接。
apt-get install python3-pip
ln -s /usr/bin/python /usr/bin/python3
AIRFLOW_VERSION=2.5.0 PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2) pip install "apache-airflow==${AIRFLOW_VERSION}"
|
安装 Amazon 提供商以连接到 MinIO
pip install apache-airflow-providers-amazon
|
以独立模式启动 Airflow
配置日志
使用 mc make bucket
在 MinIO 中创建一个存储桶。
mc admin alias minio http://<IP>:9000 minioadmin minioadmin mc mb minio/airflow-logs |
打开 /root/airflow/airflow.cfg
,并在 [logging]
下添加以下设置:
[logging] remote_logging = True remote_base_log_folder = s3://airflow-logs remote_log_conn_id = my_s3_conn encrypt_s3_logs = False |
对于 remote_base_log_folder
,请使用您在上一步中在 MinIO 中创建的存储桶名称。
remote_log_conn_id
应与我们在下一步中创建的连接 ID 的名称匹配。
在 Airflow UI 中,转到“Admin” -> “Connections”。

创建一个名为 my_s3_conn
的新连接。
为“Access Key”和“Secret Key”输入 minioadmin
。
在“Extras”中,让我们使用以下语法将 URL 设置为本地 MinIO 部署:
{ "endpoint_url": "http://<ip>:9000" } |
现在,要测试并确认此配置是否有效,请转到“DAGs” -> “example_sensor_decorator”,并启用此 DAG。
在右侧使用“播放”按钮触发 DAG。

几秒钟后,DAG 运行完成后,运行以下命令查看日志。对于每个 DAG 运行,都会创建一个单独的日志文件夹。
使用 MinIO 存储桶来存储 Airflow DAG 运行的日志,这只是我们正在探索的用例之一。在下一阶段,我们将创建一个自定义 DAG 来演示更多用例。
创建自定义 DAG
在此示例中,我们将创建一个自定义 DAG。此 DAG 将执行什么操作?
- 我们将连接到 Ghost 博客 API。
- 根据某些参数获取博客。
- 将它们备份到 MinIO 中的存储桶。
让我们为 DAG 设置框架。
导入 DAG 框架所需的 Python 库。
from airflow.decorators import dag, task |
创建 DAG 的运行频率调度
@dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) |
接下来,让我们创建一个任务,从 Ghost API 获取博客并将其放入 MinIO 存储桶中。
让我们导入几个 Python 包,以连接到 Ghost API 和 MinIO 存储桶
import json import requests
from minio import Minio import urllib3 |
使用 `requests` 模块连接到 Ghost API 并获取一些博客
api_token = "<token>"
page = 1 total_pages = 1
while page <= total_pages: api_url = ("https://minio.ghost.io/ghost/api/content/posts/?limit=1&page=%s&key=%s" % (page, api_token)) response_str = requests.get(api_url) response_json = requests.get(api_url).json()
print(response_json["meta"]) print(response_json["posts"][0]["url"])
total_pages = response_json["meta"]["pagination"]["pages"]
page = page + 1 |
将博客放入 MinIO 存储桶中
config = { "dest_bucket": "processed", # This will be auto created "minio_endpoint": "http://<ip>:9000", "minio_username": "minioadmin", "minio_password": "minioadmin", }
# Since we are using self-signed certs we need to disable TLS verification http_client = urllib3.PoolManager(cert_reqs='CERT_NONE') urllib3.disable_warnings()
# Initialize MinIO client minio_client = Minio(config["minio_endpoint"], secure=True, access_key=config["minio_username"], secret_key=config["minio_password"], http_client = http_client )
# Create destination bucket if it does not exist if not minio_client.bucket_exists(config["dest_bucket"]): minio_client.make_bucket(config["dest_bucket"]) print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))
minio_client.fget_object(bucket_name, object_path, object_path) print("- Doing some pseudo image resizing or ML processing on %s" % object_path) minio_client.fput_object(config["dest_bucket"], object_path, object_path) print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"])) minio_client.fput_object(config["dest_bucket"], object_path, object_path) print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"])) |
请注意,以上代码并非“开箱即用”,而是为了提供思路并展示创建您自己的 DAG 的路径,使用您首选的输入源;目标始终是 MinIO。
使用 Airflow 和 MinIO 构建云原生数据管道
当您将 MinIO 等云原生高性能存储与 Airflow 等云原生工具集成时,可能性是无限的。在本例中,我们向您展示了一些基本内容,例如在 MinIO 存储桶中保存 DAG 日志,以及编写一个可以与任何 API 交互并在其上执行操作的自定义 DAG,在本例中是将整个博客备份到 MinIO 存储桶中。
但这仅仅是开始,当您转向云原生时,您可以利用无数集成的框架。使用 Airflow,您可以创建任意数量的多云管道。例如,您可以将数千 TB 的非结构化数据 ETL 到存储在 MinIO 中的结构化数据,然后其他流程可以读取和分析这些数据。您甚至可以将此管道用作图像调整器(类似于 使用 Apache Kafka 和 MinIO 编排复杂工作流),通过获取各种尺寸的图像,将它们调整为您的业务所需的尺寸,然后将它们放入另一个 MinIO 存储桶中,以便 Web 服务使用。
自己尝试一下 Airflow 和 MinIO,如果您有任何疑问,请随时在 Slack 上联系我们!