GSP965

概览
流水线有助于自动重复执行机器学习工作流。Vertex AI 全面集成了 Google Cloud 中的机器学习产品,可打造无缝的开发体验。以前,使用 AutoML 训练的模型和自定义模型需要通过不同的服务来访问。现在,Vertex AI 将这两种模型连同其他新产品一起合并到了同一个 API 中。Vertex AI 中还包含 Vertex Pipelines 等各种 MLOps 产品。在本实验中,您将学习如何使用 Vertex Pipelines 创建和运行机器学习流水线。
机器学习流水线有哪些优势?
在深入探讨之前,先来了解一下为什么要使用流水线。假设您正在构建一个机器学习工作流,该工作流中包含数据处理、模型训练、超参数调优、评估和模型部署步骤。每个步骤可能有不同的依赖项,如果将整个工作流作为一个单体式应用来处理,可能会变得难以管理。在开始扩展机器学习流程时,您可能需要与团队中的其他成员共享您的机器学习工作流,以便其运行该工作流并贡献代码。但如果没有可靠且可重复的流程,将很难做到这一点。有了流水线,机器学习流程中的每个步骤都是其各自的容器。这样您就能独立开发各个步骤,并以可重复的方式跟踪每个步骤的输入和输出。您还可以基于云环境中的其他事件(比如有新训练数据可用时)来安排或触发流水线的运行。
目标
在本实验中,您将学习如何完成以下操作:
- 使用 Kubeflow Pipelines SDK 构建可扩缩的机器学习流水线
- 创建并运行一个三步式入门级流水线来接收文本输入
- 创建并运行一个流水线来训练、评估和部署 AutoML 分类模型
- 使用通过 google_cloud_pipeline_components 库提供的预构建组件与 Vertex AI 服务交互
- 使用 Cloud Scheduler 安排流水线作业
设置和要求
点击“开始实验”按钮前的注意事项
请阅读以下说明。实验是计时的,并且您无法暂停实验。计时器在您点击开始实验后即开始计时,显示 Google Cloud 资源可供您使用多长时间。
此实操实验可让您在真实的云环境中开展实验活动,免受模拟或演示环境的局限。我们会为您提供新的临时凭据,让您可以在实验规定的时间内用来登录和访问 Google Cloud。
为完成此实验,您需要:
- 能够使用标准的互联网浏览器(建议使用 Chrome 浏览器)。
注意:请使用无痕模式或无痕浏览器窗口运行此实验。这可以避免您的个人账号与学生账号之间发生冲突,这种冲突可能导致您的个人账号产生额外费用。
注意:如果您已有自己的个人 Google Cloud 账号或项目,请不要在此实验中使用,以避免您的账号产生额外的费用。
如何开始实验并登录 Google Cloud 控制台
-
点击开始实验按钮。如果该实验需要付费,系统会打开一个弹出式窗口供您选择付款方式。左侧是实验详细信息面板,其中包含以下各项:
-
打开 Google Cloud 控制台按钮
- 剩余时间
- 进行该实验时必须使用的临时凭据
- 帮助您逐步完成本实验所需的其他信息(如果需要)
-
点击打开 Google Cloud 控制台(如果您使用的是 Chrome 浏览器,请右键点击并选择在无痕式窗口中打开链接)。
该实验会启动资源并打开另一个标签页,显示登录页面。
提示:请将这些标签页安排在不同的窗口中,并将它们并排显示。
注意:如果您看见选择账号对话框,请点击使用其他账号。
-
如有必要,请复制下方的用户名,然后将其粘贴到登录对话框中。
{{{user_0.username | "<用户名>"}}}
您也可以在实验详细信息面板中找到用户名。
-
点击下一步。
-
复制下面的密码,然后将其粘贴到欢迎对话框中。
{{{user_0.password | "<密码>"}}}
您也可以在实验详细信息面板中找到密码。
-
点击下一步。
重要提示:您必须使用实验提供的凭据。请勿使用您的 Google Cloud 账号凭据。
注意:在本次实验中使用您自己的 Google Cloud 账号可能会产生额外费用。
-
继续在后续页面中点击以完成相应操作:
- 接受条款及条件。
- 由于该账号为临时账号,请勿添加账号恢复选项或双重验证。
- 请勿注册免费试用。
片刻之后,系统会在此标签页中打开 Google Cloud 控制台。
注意:如需查看列有 Google Cloud 产品和服务的菜单,请点击左上角的导航菜单。
任务 1. 创建 Vertex 笔记本实例
-
点击导航菜单。
-
前往 Vertex AI,然后选择 Workbench。
-
在笔记本实例页面中,前往用户管理的笔记本标签页,然后等待 ai-notebook
创建完成。

注意:系统可能需要几分钟时间才能完成笔记本的创建。
- 创建实例后,选择打开 JupyterLab:

检查是否已创建笔记本
任务 2. Vertex Pipelines 设置
若要使用 Vertex Pipelines,您还需要额外安装几个库:
-
Kubeflow Pipelines:此 SDK 用于构建流水线。Vertex Pipelines 支持运行使用 Kubeflow Pipelines 或 TFX 构建的流水线。
-
Google Cloud 流水线组件:此库可提供预构建的组件,可让您更轻松地在流水线步骤中与 Vertex AI 服务进行交互。
第 1 步:创建 Python 笔记本并安装库
- 在笔记本实例页面中,从“启动器”菜单中选择 Python 3 以创建笔记本:

-
若要访问“启动器”菜单,请点击笔记本实例页面左上角的加号 (+)。
-
如需安装本实验中要使用的两项服务,请先在笔记本单元中设置用户标志:
USER_FLAG = "--user"
- 然后,从笔记本中运行以下命令:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.59.0
!pip3 install {USER_FLAG} kfp google-cloud-pipeline-components==0.1.1 --upgrade
!pip3 uninstall -y shapely pygeos geopandas
!pip3 install shapely==1.8.5.post1 pygeos==0.12.0 geopandas>=0.12.2
!pip3 install google-cloud-pipeline-components
注意:您可能会看到有关版本方面的警告和错误提示,可以不必理会。
- 安装这些软件包后,需要重启内核:
import os
if not os.getenv("IS_TESTING"):
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
- 最后,检查您是否正确安装了软件包。KFP SDK 应为 1.6 或更高版本:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
第 2 步:设置项目 ID 和存储桶
在本实验中,您将全程引用之前创建的 Cloud 项目 ID 和存储桶。接下来,需要为它们创建变量。
- 如果不知道项目 ID,可运行以下命令来获取:
import os
PROJECT_ID = ""
# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]
print("Project ID: ", PROJECT_ID)
- 然后,创建一个变量来存储您的存储桶名称。
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"
第 3 步:导入库
from typing import NamedTuple
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
OutputPath, ClassificationMetrics, Metrics, component)
from kfp.v2.google.client import AIPlatformClient
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
第 4 步:定义常量
- 构建流水线之前,您要做的最后一件事是定义几个常量变量。
PIPELINE_ROOT
是一个 Cloud Storage 路径,流水线创建的制品将写入该路径。这里您使用的区域是“”,如果您在创建存储桶时使用了其他区域
,请运行以下代码来更新 REGION 变量:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="{{{ project_0.default_region | Placeholder value. }}}"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT
运行上述代码后,您应该会看到输出的流水线根目录。来自流水线的制品将写入此 Cloud Storage 位置。其格式为 gs://<bucket_name>/pipeline_root/
。
任务 3. 创建第一个流水线
您将创建一个流水线来生成由两个输出(产品名称和表情符号说明)组成的句子。此流水线由三个组件组成:
-
product_name
:此组件将接收产品名称作为输入,并返回该字符串作为输出。
-
emoji
:此组件将接收表情符号的文本说明,并将其转换为表情符号。例如,✨ 的文本代码为“sparkles”。此组件使用表情符号库来告诉您如何管理流水线中的外部依赖项。
-
build_sentence
:这是最后一个组件,它将使用前两个组件的输出生成一个使用表情符号的句子。例如,可能会生成“Vertex Pipelines is ✨”之类的输出。
第 1 步:创建基于 Python 函数的组件
利用 KFP SDK,您可以根据 Python 函数来创建组件。首先构建 product_name
组件,该组件仅接收字符串作为输入,并返回该字符串。
@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
return text
我们来剖析一下这段代码的语法:
- 流水线运行时,
@component
修饰器会将此函数编译为一个组件。您每次编写自定义组件时都会用到它。
-
base_image
参数用于指定此组件将使用的容器映像。
-
output_component_file
是可选参数,用于指定要在其中写入已编译组件的 yaml 文件。运行此单元后,您应该会看到该文件已写入您的笔记本实例中。如果您想与其他人共享此组件,可以将生成的 yaml 文件发送给相关人员,并让他们运行以下命令来加载该文件:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
函数定义后面的 -> str
用于指定此组件的输出类型。
第 2 步:创建两个附加组件
- 若要构建完整的流水线,您还需要创建两个组件。第一个组件接收字符串作为输入,并将该字符串转换为对应的表情符号(如有)。它将返回一个元组,其中包含所传递的输入文本以及生成的表情符号:
@component(base_image="python:3.9", output_component_file="second-component.yaml", packages_to_install=["emoji"])
def emoji(
text: str,
) -> NamedTuple(
"Outputs",
[
("emoji_text", str), # Return parameters
("emoji", str),
],
):
import emoji
emoji_text = text
emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
return (emoji_text, emoji_str)
此组件比上一个组件复杂一些。它新增了以下变化:
-
packages_to_install
参数用于向组件指明此容器的所有外部库依赖项。在本例中,您使用的是名为“emoji”的库。
- 此组件返回名为
Outputs
的 NamedTuple
。请注意,此元组中的每个字符串都有自己的键:emoji_text
和 emoji
。您在下一个组件中将会使用它们来获取输出。
- 此流水线中的最后一个组件将使用前两个组件的输出,并将它们合并后返回一个字符串:
@component(base_image="python:3.9", output_component_file="third-component.yaml")
def build_sentence(
product: str,
emoji: str,
emojitext: str
) -> str:
print("We completed the pipeline, hooray!")
end_str = product + " is "
if len(emoji) > 0:
end_str += emoji
else:
end_str += emojitext
return(end_str)
您可能会问:此组件如何知道要使用前几个步骤定义的输出呢?
这个问题问得好!下一步就是将所有这些组件相互关联。
第 3 步:将组件合并成一个流水线
前面指定的组件定义创建了出厂函数,这些函数可用于在流水线定义中创建步骤。
-
如需设置流水线,请使用 @dsl.pipeline
修饰器,为流水线命名并提供相关的说明,同时提供应向其中写入流水线制品的根路径。制品是指流水线生成的任何输出文件。此入门级流水线不会生成任何制品,但下一个流水线会生成制品。
-
在下一个代码块中,需要定义一个 intro_pipeline
函数。您将在此处指定初始流水线步骤的输入,以及如何关联各个步骤:
-
product_task
接收产品名称作为输入。此处您传递的是“Vertex Pipelines”,您也可以将其更改为任何其他产品名称。
-
emoji_task
接收表情符号的文本代码作为输入。您也可以将其更改为其他值。例如,“party_face”是指 🥳 表情符号。请注意,此组件和 product_task
组件都没有任何可提供输入的步骤,因此在定义流水线时,您需要手动为它们指定输入。
- 此流水线的最后一个步骤是
consumer_task
,它有三个输入参数:
-
product_task
的输出。该步骤仅生成一个输出,因此您可以通过 product_task.output
来引用它。
-
emoji_task
步骤的 emoji
输出。请参阅前面定义的 emoji
组件,您在其中命名了输出参数。
- 此外,还有
emoji
组件的名为 emoji_text
的输出。如果传递给流水线的文本没有对应的表情符号,流水线将使用此文本来构造句子。
@dsl.pipeline(
name="hello-world",
description="An intro pipeline",
pipeline_root=PIPELINE_ROOT,
)
# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
product_task = product_name(text)
emoji_task = emoji(emoji_str)
consumer_task = build_sentence(
product_task.output,
emoji_task.outputs["emoji"],
emoji_task.outputs["emoji_text"],
)
第 4 步:编译并运行流水线
- 流水线已经定义完毕,您可以开始编译了。以下命令将生成一个用于运行流水线的 JSON 文件:
compiler.Compiler().compile(
pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)
- 接下来,实例化 API 客户端:
api_client = AIPlatformClient(
project_id=PROJECT_ID,
region=REGION,
)
- 最后,运行该流水线:
response = api_client.create_run_from_job_spec(
job_spec_path="intro_pipeline_job.json",
# pipeline_root=PIPELINE_ROOT # this argument is necessary if you did not specify PIPELINE_ROOT as part of the pipeline definition.
)
运行结束后,系统应该会生成一个链接,供您在控制台中查看流水线运行作业。完成的流水线应如下所示:

- 此流水线需要 5-6 分钟时间才能运行完毕。完成后,您可以点击
build-sentence
组件查看最终输出:

至此,您已经熟悉了 KFP SDK 以及 Vertex Pipelines 的运作方式,接下来要构建一个流水线来使用其他 Vertex AI 服务创建和部署机器学习模型。
检查您的表情符号流水线是否完成
任务 4. 创建端到端机器学习流水线
现在来构建您的第一个机器学习流水线。此流水线将会用到“UCI Machine Learning Dry Beans”数据集,该数据集来自 KOKLU, M. 和 OZKAN, I.A.于 2020 年在《Computers and Electronics in Agriculture》上发表的“Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques.”(利用计算机视觉和机器学习技术对干豆进行多类别分类)一文 (174, 105507. DOI)。
注意:此流水线需要 2 个多小时才能运行完毕。所以,您可以先行完成本实验,不必等待整个流水线运行完毕。在流水线作业启动前,您可以先执行以下步骤。
这是一个表格式数据集。在此流水线中,您将使用该数据集来训练、评估和部署一个 AutoML 模型。该模型会根据豆子特征将不同豆子分为 7 种类型之一。
此流水线将执行以下步骤:
- 在 Vertex AI 中创建数据集
- 使用 AutoML 训练表格式分类模型
- 获取此模型的评估指标
- 根据评估指标,决定是否在 Vertex Pipelines 中使用条件逻辑来部署此模型
- 使用 Vertex Prediction 将此模型部署到端点
上述每个步骤都是一个组件。大部分流水线步骤都将使用为 Vertex AI 服务预构建的组件,这些组件通过您之前在本实验中导入的 google_cloud_pipeline_components
库提供。
在本部分,我们先来定义一个自定义组件。然后再使用预构建的组件定义其余的流水线步骤。借助预构建的组件,您可以更轻松地访问模型训练和部署等 Vertex AI 服务。
在此步骤中,大部分时间都花在流水线的 AutoML 训练方面,所需时间大约为 1 小时。
第 1 步:用于评估模型的自定义组件
您定义的自定义组件将在流水线的最后阶段,也就是模型训练完成之后使用。此组件将执行以下几个步骤:
- 获取经过训练的 AutoML 分类模型的评估指标
- 解析指标并将它们呈现在 Vertex Pipelines 界面中
- 将指标与阈值进行比较,以确定是否应该部署该模型
定义此组件前,先了解一下它的输入和输出参数。此流水线将接收以下内容作为输入:您的 Cloud 项目的一些元数据、生成的训练模型(稍后定义这一组件)、模型的评估指标以及 thresholds_dict_str
。
thresholds_dict_str
是您将在运行流水线时指定的参数。在此分类模型中,它代表的是 ROC 曲线下面积的值,您将根据此值来部署模型。例如,若您输入 0.95,即表示您希望流水线仅在该指标高于 95% 时才部署模型。
评估组件将返回一个字符串,以指明是否部署该模型。
- 将以下代码添加到笔记本单元中,以创建此自定义组件:
@component(
base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
project: str,
location: str, # "region",
api_endpoint: str, # "region-aiplatform.googleapis.com",
thresholds_dict_str: str,
model: Input[Model],
metrics: Output[Metrics],
metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]): # Return parameter.
"""This function renders evaluation metrics for an AutoML Tabular classification model.
It retrieves the classification model evaluation generated by the AutoML Tabular training
process, does some parsing, and uses that info to render the ROC curve and confusion matrix
for the model. It also uses given metrics threshold information and compares that to the
evaluation results to determine whether the model is sufficiently accurate to deploy.
"""
import json
import logging
from google.cloud import aiplatform
# Fetch model eval info
def get_eval_info(client, model_name):
from google.protobuf.json_format import MessageToDict
response = client.list_model_evaluations(parent=model_name)
metrics_list = []
metrics_string_list = []
for evaluation in response:
print("model_evaluation")
print(" name:", evaluation.name)
print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
metrics = MessageToDict(evaluation._pb.metrics)
for metric in metrics.keys():
logging.info("metric: %s, value: %s", metric, metrics[metric])
metrics_str = json.dumps(metrics)
metrics_list.append(metrics)
metrics_string_list.append(metrics_str)
return (
evaluation.name,
metrics_list,
metrics_string_list,
)
# Use the given metrics threshold(s) to determine whether the model is
# accurate enough to deploy.
def classification_thresholds_check(metrics_dict, thresholds_dict):
for k, v in thresholds_dict.items():
logging.info("k {}, v {}".format(k, v))
if k in ["auRoc", "auPrc"]: # higher is better
if metrics_dict[k] < v: # if under threshold, don't deploy
logging.info(
"{} < {}; returning False".format(metrics_dict[k], v)
)
return False
logging.info("threshold checks passed.")
return True
def log_metrics(metrics_list, metricsc):
test_confusion_matrix = metrics_list[0]["confusionMatrix"]
logging.info("rows: %s", test_confusion_matrix["rows"])
# log the ROC curve
fpr = []
tpr = []
thresholds = []
for item in metrics_list[0]["confidenceMetrics"]:
fpr.append(item.get("falsePositiveRate", 0.0))
tpr.append(item.get("recall", 0.0))
thresholds.append(item.get("confidenceThreshold", 0.0))
print(f"fpr: {fpr}")
print(f"tpr: {tpr}")
print(f"thresholds: {thresholds}")
metricsc.log_roc_curve(fpr, tpr, thresholds)
# log the confusion matrix
annotations = []
for item in test_confusion_matrix["annotationSpecs"]:
annotations.append(item["displayName"])
logging.info("confusion matrix annotations: %s", annotations)
metricsc.log_confusion_matrix(
annotations,
test_confusion_matrix["rows"],
)
# log textual metrics info as well
for metric in metrics_list[0].keys():
if metric != "confidenceMetrics":
val_string = json.dumps(metrics_list[0][metric])
metrics.log_metric(metric, val_string)
# metrics.metadata["model_type"] = "AutoML Tabular classification"
logging.getLogger().setLevel(logging.INFO)
aiplatform.init(project=project)
# extract the model resource name from the input Model Artifact
model_resource_path = model.uri.replace("aiplatform://v1/", "")
logging.info("model path: %s", model_resource_path)
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.ModelServiceClient(client_options=client_options)
eval_name, metrics_list, metrics_str_list = get_eval_info(
client, model_resource_path
)
logging.info("got evaluation name: %s", eval_name)
logging.info("got metrics list: %s", metrics_list)
log_metrics(metrics_list, metricsc)
thresholds_dict = json.loads(thresholds_dict_str)
deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
if deploy:
dep_decision = "true"
else:
dep_decision = "false"
logging.info("deployment decision is %s", dep_decision)
return (dep_decision,)
第 2 步:添加 Google Cloud 预构建组件
在此步骤中,您将定义其余的流水线组件并查看它们搭配使用的情况。
- 首先,使用时间戳来定义流水线运行作业的显示名称:
import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)
- 然后,将以下代码复制到新的笔记本单元中:
@kfp.dsl.pipeline(name="automl-tab-beans-training-v2",
pipeline_root=PIPELINE_ROOT)
def pipeline(
bq_source: str = "bq://aju-dev-demos.beans.beans1",
display_name: str = DISPLAY_NAME,
project: str = PROJECT_ID,
gcp_region: str = "{{{ project_0.default_region | Placeholder value. }}}",
api_endpoint: str = "{{{ project_0.default_region | Placeholder value. }}}-aiplatform.googleapis.com",
thresholds_dict_str: str = '{"auRoc": 0.95}',
):
dataset_create_op = gcc_aip.TabularDatasetCreateOp(
project=project, display_name=display_name, bq_source=bq_source
)
training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
project=project,
display_name=display_name,
optimization_prediction_type="classification",
budget_milli_node_hours=1000,
column_transformations=[
{"numeric": {"column_name": "Area"}},
{"numeric": {"column_name": "Perimeter"}},
{"numeric": {"column_name": "MajorAxisLength"}},
{"numeric": {"column_name": "MinorAxisLength"}},
{"numeric": {"column_name": "AspectRation"}},
{"numeric": {"column_name": "Eccentricity"}},
{"numeric": {"column_name": "ConvexArea"}},
{"numeric": {"column_name": "EquivDiameter"}},
{"numeric": {"column_name": "Extent"}},
{"numeric": {"column_name": "Solidity"}},
{"numeric": {"column_name": "roundness"}},
{"numeric": {"column_name": "Compactness"}},
{"numeric": {"column_name": "ShapeFactor1"}},
{"numeric": {"column_name": "ShapeFactor2"}},
{"numeric": {"column_name": "ShapeFactor3"}},
{"numeric": {"column_name": "ShapeFactor4"}},
{"categorical": {"column_name": "Class"}},
],
dataset=dataset_create_op.outputs["dataset"],
target_column="Class",
)
model_eval_task = classif_model_eval_metrics(
project,
gcp_region,
api_endpoint,
thresholds_dict_str,
training_op.outputs["model"],
)
with dsl.Condition(
model_eval_task.outputs["dep_decision"] == "true",
name="deploy_decision",
):
deploy_op = gcc_aip.ModelDeployOp( # noqa: F841
model=training_op.outputs["model"],
project=project,
machine_type="e2-standard-4",
)
这段代码做了什么:
- 首先,与上一个流水线一样,您定义了此流水线接收的输入参数。这些输入参数不依赖于流水线中其他步骤的输出,因此需要您手动设置。
- 此流水线的剩余部分将使用几个预构建的组件来与 Vertex AI 服务交互:
-
TabularDatasetCreateOp
会根据 Cloud Storage 或 BigQuery 中的数据集来源,在 Vertex AI 中创建表格式数据集。在此流水线中,您通过 BigQuery 表的网址来传递数据。
-
AutoMLTabularTrainingJobRunOp
会为表格式数据集启动 AutoML 训练作业。您向此组件传递了几个配置参数,包括模型类型(本例中为分类)、一些列数据、您希望训练运行的时长,以及指向数据集的指针。请注意,为了将数据集传递给此组件,您通过 dataset_create_op.outputs["dataset"]
提供了上一个组件的输出。
-
ModelDeployOp
会将给定模型部署到 Vertex AI 中的端点上。该产品还有一些其他配置选项可用,不过您在本例中提供了端点机器类型、项目以及要部署的模型。您通过访问流水线中训练步骤的输出传入了模型。
- 此流水线还使用了条件逻辑。这是 Vertex Pipelines 的一个功能,可用于定义条件以及基于该条件的结果的不同分支。请注意,在定义流水线时,您传递了一个
thresholds_dict_str
参数。此参数是准确率阈值,将用于确定是否将您的模型部署到端点。要实现此流水线,需要使用 KFP SDK 中的 Condition
类。传入的条件是您之前在本实验中定义的自定义评估组件的输出。如果此条件为 true,则流水线将会继续执行 deploy_op
组件。如果准确率未达到预定义的阈值,则流水线将在此处停止运行,并且不会部署模型。
第 3 步:编译并运行端到端机器学习流水线
- 至此整个流水线定义完成,接下来对它进行编译:
compiler.Compiler().compile(
pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)
- 然后,启动流水线运行作业:
response = api_client.create_run_from_job_spec(
"tab_classif_pipeline.json", pipeline_root=PIPELINE_ROOT,
parameter_values={"project": PROJECT_ID,"display_name": DISPLAY_NAME}
)
- 点击在运行上面的流水线单元后显示的链接,在控制台中查看您的流水线。运行此流水线所需的时间为 1 个小时多一点,其中大部分时间都花在了 AutoML 训练步骤上。完成的流水线如下所示:

- 切换顶部的“展开制品”按钮,即可查看流水线中创建的各个制品的详细信息。例如,点击
dataset
制品,将会看到所创建的 Vertex AI 数据集的详细信息。您可以点击此处的链接,以跳转到该数据集的页面:

- 同理,若想查看自定义评估组件生成的指标可视化图表,只需点击名为 metricsc 的制品。在信息中心右侧,您将看到此模型的混淆矩阵:

- 如需查看此流水线运行后创建的模型和端点,请前往模型部分,并点击名为
automl-beans
的模型。之后,您应该会看到此模型已部署到端点上:

-
您也可以点击流水线图表中的 endpoint 制品来访问此页面。
-
除了在控制台中查看流水线图表外,您还可以使用 Vertex Pipelines 来执行沿袭跟踪。
-
沿袭跟踪是指跟踪在整个流水线中创建的制品。这有助于您了解制品的创建位置,以及它们在整个机器学习工作流中的使用情况。例如,若想查看在此流水线中创建的数据集的沿袭跟踪,只需点击 dataset 制品,然后点击查看沿袭:

下图显示了使用此制品的所有位置:

注意:请等到流水线中的训练作业启动后,再在下方检查进度。训练作业的运行时间比为本实验分配的时间更长,但如果您成功提交了训练作业,将会得到所有积分。
检查您的端到端机器学习流水线的训练作业是否已启动
第 4 步:比较各次流水线运行的指标(可选)
- 如果您运行了此流水线多次,可能想要比较一下这几次运行的指标。您可以使用
aiplatform.get_pipeline_df()
方法来获取运行元数据。这里,我们将获取此流水线每次运行的元数据,并将其加载到 Pandas DataFrame 中:
pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df
现在,您已经了解了如何在 Vertex Pipelines 中构建、运行端到端机器学习流水线,以及如何获取其元数据。
恭喜!
在本实验中,您创建并运行了表情符号流水线。您还学习了如何在 Vertex Pipelines 中构建、运行端到端机器学习流水线,以及如何获取其元数据。
后续步骤/了解详情
您可以试着使用开发者关系部门的 Codelab,在自己的 Google Cloud 项目中实现同样的场景。
Google Cloud 培训和认证
…可帮助您充分利用 Google Cloud 技术。我们的课程会讲解各项技能与最佳实践,可帮助您迅速上手使用并继续学习更深入的知识。我们提供从基础到高级的全方位培训,并有点播、直播和虚拟三种方式选择,让您可以按照自己的日程安排学习时间。各项认证可以帮助您核实并证明您在 Google Cloud 技术方面的技能与专业知识。
本手册的最后更新时间:2024 年 10 月 7 日
本实验的最后测试时间:2024 年 10 月 7 日
版权所有 2025 Google LLC 保留所有权利。Google 和 Google 徽标是 Google LLC 的商标。其他所有公司名和产品名可能是其各自相关公司的商标。