Kubeflow Pipelines (KFP) 是 Kubeflow 最受欢迎的功能。Python 工程师可以使用 KFP 装饰器将用普通 Python 编写的函数转换为在 Kubernetes 中运行的组件。如果您使用过 KFP v1,请注意 - KFP v2 中的编程模型非常不同 - 但这是一个很大的改进。将普通的 Python 转换为可重用的组件并将这些组件编排成管道变得容易得多。
在这篇文章中,我想超越强制性的“Hello World”演示,并展示一些我希望您会发现直接可用或至少是用于插入您自己的逻辑的框架的东西。
我将展示如何构建一个 KFP 管道,该管道下载美国人口普查局数据(这是一个可免费访问的公共数据集),并将这些数据保存到 MinIO。MinIO 是存储 ML 数据和模型的好方法。使用 MinIO,您可以保存训练集、验证集、测试集和模型,而无需担心规模或性能。此外,有一天人工智能将受到监管;当这一天到来时,您将需要 MinIO 的企业功能(对象锁定、版本控制、加密和法律锁定)来确保您的数据在静止状态下的安全,并确保您不会意外删除监管机构可能要求的任何东西。
您可以了解有关我们将使用的数据的更多信息 这里。要获取人口普查 API 的 API 密钥,请访问人口普查局的网站以获取 开发人员。这非常简单。您只需指定一个电子邮件地址。
我们将构建什么
在这篇文章中,我将构建一个管道,该管道将表格代码(人口普查局数据集中的标识符)和年份作为参数。然后,它将通过 API 下载表格,如果我们以前没有下载过它。
我们只会在以前没有下载表格的情况下调用人口普查 API。当我们调用 ACS API 时,我们将在为存储原始数据而设置的 MinIO 实例中保存数据。这与 KFP 在内部使用的 MinIO 实例不同。我们可以尝试使用 KFP 的 MinIO 实例 - 但是,这对于 ML 数据管道来说不是最好的设计。出于我之前描述的原因,您需要一个完全受您控制的存储解决方案。以下是我们的 Kubeflow 和 MinIO 部署图,说明了每个 MinIO 实例的目的。

在我们开始编写代码之前,让我们创建管道的逻辑设计。
逻辑管道设计
您在 KFP 中运行的管道被称为有向无环图 (DAG)。它们单向移动并且不会回溯 - 没有闭环。这是您对数据管道的期望。以下是我们将构建并在 KFP 中运行的 DAG 的逻辑设计。它是自解释的。从概念工作流开始是帮助您将逻辑转换为充分利用 KFP 的函数的好方法。

现在我们有了逻辑设计,让我们开始编码。我将假设您已经安装了 KFP,并且您还设置了自己的 MinIO 实例。如果您没有安装 KFP 2.0 和 MinIO,请查看 使用 Kubeflow Pipeline 2.0 和 MinIO 设置开发机器.
从逻辑设计创建 Python 函数
上面逻辑设计中的每个任务都将成为一个 Python 函数。以下函数签名显示了如果我们编写一个没有 KFP 的 Python 脚本或独立服务,参数和返回值将如何设计。如果要将现有代码迁移到 KFP,我想讨论一下。
def survey_data_exists(survey_code: str, year: int) -> bool: '''检查 MinIO 以查看调查数据是否存在。''' pass
def download_survey_data(table_code: str, year: int) -> pd.DataFrame: '''使用 CB API 下载调查数据并返回 Pandas 数据帧。''' pass
def save_survey_data(bucket: str, object_name: str, survey_df: pd.DataFrame) -> None: '''将调查数据(一个 Pandas 数据帧)保存到 MinIO 存储桶。''' pass
def get_survey_data(bucket: str, object_name: str) -> pd.DataFrame: pass |
|
关于以上函数的一些说明。它们使用类型提示。如果您编写的是普通的 Python 代码,您可以选择不使用类型提示,因为它们是可选的。在 Kubeflow Pipelines 中,它们是必需的 - 您必须使用类型提示,以便 KFP 在将函数组装到管道时,能够告诉您参数和返回值是否匹配。这是一件好事。KFP 在您编译管道时会找到类型不匹配错误。这些相同的错误在运行时在集群中很难跟踪。
您可能会想要将函数合并,以便您有更少的函数需要管理。例如,最后三个函数可以通过使用一个简单的“if else”语句合并成一个函数,然后第一个函数就不需要了。在使用像 KFP 这样的工具时,这不是最佳实践。正如我们将会看到,KFP 有用于条件和循环的构造。通过使用 KFP 的构造,您将在 KFP UI 中获得更好的管道可视化效果。并行性也是可能的,这将提高管道性能。最后,如果我们保持函数的简单性,我们将获得更好的重用性。
现在,我们可以使用我们的 Python 函数创建 Kubeflow Pipeline 组件。
从 Python 函数创建 KFP 组件
下面的代码是我们的 Pipeline 组件的完整实现。当您使用像 KFP 和 MinIO 这样的工具时,您实际上不需要编写很多管道代码。
@dsl.component(packages_to_install=['minio==7.1.14']) def table_data_exists(bucket: str, table_code: str, year: int) -> bool: ''' Check for the existence of Census table data in MinIO. ''' from minio import Minio from minio.error import S3Error import logging
object_name=f'{table_code}-{year}.csv'
logger = logging.getLogger('kfp_logger') logger.setLevel(logging.INFO) logger.info(bucket) logger.info(table_code) logger.info(year) logger.info(object_name) try: # Create client with access and secret key. client = Minio('host.docker.internal:9000', 'Access key here.', 'Secret key here.', secure=False)
bucket_found = client.bucket_exists(bucket) if not bucket_found: return False
objects = client.list_objects(bucket) found = False for obj in objects: logger.info(obj.object_name) if object_name == obj.object_name: found = True
except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') except Error as err: logger.error(f'Error occurred: {err}.')
return found
@dsl.component(packages_to_install=['pandas==1.3.5', 'requests']) def download_table_data(dataset: str, table_code: str, year: int, table_df: Output[Dataset]): ''' Returns all fields for the specified table. The output is a DataFrame saved to csv. ''' import logging import pandas as pd import requests
logger = logging.getLogger('kfp_logger') logger.setLevel(logging.INFO)
census_endpoint = f'https://api.census.gov/data/{year}/{dataset}' census_key = 'Census key here.' # Setup a simple dictionary for the requests parameters. get_token = f'group({table_code})' params = {'key': census_key, 'get': get_token, 'for': 'county:*' }
# sending get request and saving the response as response object response = requests.get(url=census_endpoint, params=params) # Extract the data in json format. # The first row of our matrix contains the column names. The remaining rows # are the data. survey_data = response.json() df = pd.DataFrame(survey_data[1:], columns = survey_data[0]) df.to_csv(table_df.path, index=False) logger.info(f'Table {table_code} for {year} has been downloaded.')
@dsl.component(packages_to_install=['pandas==1.3.5', 'minio==7.1.14']) def save_table_data(bucket: str, table_code: str, year: int, table_df: Input[Dataset]): import io import logging from minio import Minio from minio.error import S3Error import pandas as pd
object_name=f'{table_code}-{year}.csv'
logger = logging.getLogger('kfp_logger') logger.setLevel(logging.INFO) logger.info(bucket) logger.info(table_code) logger.info(year) logger.info(object_name)
df = pd.read_csv(table_df.path)
try: # Create client with access and secret key client = Minio('host.docker.internal:9000', 'Access key here.', 'Secret key here.', secure=False)
# Make the bucket if it does not exist. found = client.bucket_exists(bucket) if not found: logger.info(f'Creating bucket: {bucket}.') client.make_bucket(bucket)
# Upload the dataframe as an object. encoded_df = df.to_csv(index=False).encode('utf-8') client.put_object(bucket, object_name, data=io.BytesIO(encoded_df), length=len(encoded_df), content_type='application/csv') logger.info(f'{object_name} successfully uploaded to bucket {bucket}.') logger.info(f'Object length: {len(df)}.')
except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') except Error as err: logger.error(f'Error occurred: {err}.')
@dsl.component(packages_to_install=['pandas==1.3.5', 'minio==7.1.14']) def get_table_data(bucket: str, table_code: str, year: int, table_df: Output[Dataset]): import io import logging from minio import Minio from minio.error import S3Error import pandas as pd
object_name=f'{table_code}-{year}.csv'
logger = logging.getLogger('kfp_logger') logger.setLevel(logging.INFO) logger.info(bucket) logger.info(table_code) logger.info(year) logger.info(object_name)
# Get data of an object. try: # Create client with access and secret key client = Minio('host.docker.internal:9000', 'Access key here.', 'Secret key here.', secure=False)
response = client.get_object(bucket, object_name) df = pd.read_csv(io.BytesIO(response.data)) df.to_csv(table_df.path, index=False) logger.info(f'Object: {object_name} has been retrieved from bucket: {bucket} in MinIO object storage.') logger.info(f'Object length: {len(df)}.')
except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') except Error as err: logger.error(f'Error occurred: {err}.')
finally: response.close() response.release_conn() |
在您实现和调试这些函数时,最重要的是要记住,在运行时,它们根本不是函数。它们将是组件。换句话说,KFP 将获取每个函数并将其部署到自己的容器中。此示例使用轻量级 Python 组件。您还可以使用容器化的 Python 组件,这些组件让您能够更好地控制容器中包含的内容。对于非 Python 代码,也存在容器化组件选项。
KFP 引入了几个构造来帮助您无缝地创建能够作为运行在容器中的独立组件运行的函数。它们是组件装饰器、参数和工件。让我们逐步了解这些工具,以便您了解 KFP 如何在运行时部署函数并在它们之间传递数据。
组件
组件装饰器告诉 KFP 函数应该作为组件部署。仔细观察上面的代码中如何使用这个装饰器。由于函数将被单独部署到容器中,您需要告诉 KFP 它的依赖项。这是通过使用装饰器的 `packages_to_install` 参数完成的。这只是确保依赖项被安装(通过 pip)。它不会为您导入它们。您需要在函数定义中自己完成此操作。这可能看起来有点不规范,因为我们大多数人习惯在模块级别导入依赖项,但在使用像 KFP 这样的将函数转换为服务的工具时是可以的。
在组件之间传递数据必须谨慎进行。KFP v2 区分参数和工件。参数用于在函数调用之间传递简单数据(int、bool、str、float、list、dict)。另一方面,工件表示函数从外部来源检索或创建的数据,例如数据集、模型和描述模型精度的指标。您甚至可以使用工件创建 HTML 和 Markdown,如果您想对输出进行样式化,使其在 Kubeflow UI 中更具可读性。由于工件可能很大,因此 KFP 使用其自身的 MinIO 实例来存储它们。
参数(和返回值)
KFP 利用 Python 类型提示来指定简单的输入参数和简单的返回值。您只能使用 str、int、float、bool、list 和 dict。上面的 `table_data_exists` 函数展示了如何在函数签名中指定参数。在语法上,您以与标准 Python 相同的方式指定它们。请记住,使用类型提示是必需的。在运行时,KFP 会负责在组件之间(运行在不同容器中)编组这些值。
如果函数需要更复杂的数据类型作为输入,或者它返回更复杂的数据类型,则使用工件。
工件
工件与输入参数和输出值不同,因为它们可能变得很大。工件的示例包括:数据集、模型、指标(机器学习训练结果)、HTML 和 Markdown。在底层,KFP 使用自己的 MinIO 实例来存储工件。当你将工件从一个组件传递到另一个组件时,KFP 不会直接传递工件,而是将工件存储在 MinIO 中并传递对 MinIO 中工件(对象)的引用。这非常巧妙。这意味着,如果你有一个需要被多个组件访问的大型工件,那么这些组件可以高效地访问该工件,因为 MinIO 是专门为高效的对象存储和访问而设计的。
让我们看看当你将工件传递给组件时会发生什么。在上面的代码示例中,save_table_data 展示了如何实现这一点。在你的函数被调用之前,KFP 会将工件从其 MinIO 实例复制到运行你的组件的容器的本地文件系统。你的代码需要读取这个文件。这可以通过你声明为 Input[Dataset] 类型的参数的 path 属性来实现。在 save_table_data 函数中,我将这个文件读取到 Pandas DataFrame 中。
输出工件被指定为函数参数,不能是函数的返回值。在上面的代码中,get_table_data 展示了如何使用输出工件。请注意,table_df 参数的数据类型为 Output[Dataset]。为了成功地从函数返回数据,你必须将数据写入参数的 path 属性中指定的 location。同样,这是一个指向你容器中本地文件系统的引用 - KFP 会在你的函数完成时将这个文件移动到其 MinIO 实例中。
现在,我们已经准备好将我们的组件组装成一个管道了。
从组件创建管道
下面的代码从我们在上一节中实现的组件创建我们的管道(或 DAG)。
@dsl.pipeline( name='census-pipeline', description='下载人口普查数据并保存到 MinIO 的管道.' ) def census_pipeline(bucket: str, dataset: str, table_code: str, year: int) -> Dataset: # 不允许位置参数. # 当我在 DAG 中设置条件任务的 name 参数时,任务失败.
exists = table_data_exists(bucket=bucket, table_code=table_code, year=year)
with dsl.Condition(exists.output == False): table_data = download_table_data(dataset=dataset, table_code=table_code, year=year) save_table_data(bucket=bucket, table_code=table_code, year=year, table_df=table_data.outputs['table_df'])
with dsl.Condition(exists.output == True): table_data = get_table_data(bucket=bucket, table_code=table_code, year=year)
return table_data.outputs['table_df'] |
在这个函数中,有一些需要注意的地方。首先,pipeline 装饰器告诉 KFP 此函数包含我们的管道定义。您在此处指定的名称和描述将在 KFP UI 中显示。
接下来,此管道函数的返回值是 Dataset。事实证明,管道可以像组件一样使用。当管道具有返回值时,它可以在另一个管道中使用。这是重用组件的好方法。
最后,我们使用 dsl.Condition(这是一个 Python 上下文管理器)来仅在我们需要的数据不在 MinIO 实例中时才调用我们的下载组件。我们本可以在此处使用传统的 if 语句。但是,如果我们这样做,KFP 将无法知道我们的逻辑中有一个分支。通过使用 dsl.Condition 结构,我们告诉 KFP 我们管道中的一个分支。这将允许 KFP UI 为我们提供更好的视觉表示。
运行管道
一旦您实现了组件和管道,您就只需两行代码就可以运行您的管道。
client = Client()
run = client.create_run_from_pipeline_func(census_pipeline, experiment_name='Implementing functions', enable_caching=False, arguments={ 'bucket': 'census-data', 'table_code': 'B01001', 'year': 2020 } ) |
选择一个有意义的实验名称。KFP UI 有一个实验选项卡,用于将具有相同实验名称的运行分组在一起。上面的代码“编译”您的管道和组件——这仅仅是将所有内容(包括您的源代码)放入 YAML 文件中的操作。如果您有前面描述的任何类型不匹配,那么您将在创建运行时发现这些问题。此代码还将您的管道发送到 KFP 并运行它。下面是一个屏幕截图,显示了我们管道的一些成功运行。

摘要
在这篇文章中,我们创建了一个数据管道,它使用 KFP 和 MinIO 下载和保存美国人口普查数据。为此,我们为存储原始数据设置了自己的 MinIO 实例。这是机器学习管道的重要组成部分——将来 AI 将受到监管,并且拥有您控制下的存储解决方案可以让您对用于训练和模型本身的数据进行版本控制、锁定和加密。
我们还讨论了 KFP 如何使用其自己的 MinIO 实例在管道运行期间有效地保存和访问工件。
在我的下一篇文章中,我将展示如何将此数据管道用作另一个使用人口普查数据来训练模型的管道的输入。如果您有任何问题,请在 hello@min.io 联系我们,或加入我们 general Slack channel 的讨论。