检查点
Create Cloud Composer environment.
/ 25
Create two Cloud Storage buckets.
/ 25
Create a dataset.
/ 25
Uploading the DAG and dependencies to Cloud Storage
/ 25
Cloud Composer:跨不同位置复制 BigQuery 表
GSP283
概览
假设您的数据集位于全球不同位置,且数据存储在 Google Cloud Storage 存储桶或 BigQuery 表中。为了进行整合和分析,从而为您的业务提供分析洞见,您应该如何整理这些数据?
Cloud Composer 提供直观的图形视图,可帮助您构建工作流,在不同区域和存储系统之间移动数据。此产品有众多优势,其中包括提供了模板,让您能够在 BigQuery 与 Cloud Storage 之间轻松可靠地完成双向数据传输。
在本实验中,您将在 Cloud Composer 中创建并运行完成以下任务的 Apache Airflow 工作流:
- 读取列出了要复制的表的配置文件
- 将位于美国 BigQuery 数据集的表的列表导出到 Cloud Storage
- 将导出的表从位于美国的 Cloud Storage 存储桶复制到位于欧盟的 Cloud Storage 存储桶
- 将表的列表导入位于欧盟的目标 BigQuery 数据集
您将执行的操作
在本实验中,您将学习如何完成以下操作:
- 创建一个 Cloud Composer 环境
- 创建 Cloud Storage 存储桶
- 创建 BigQuery 数据集
- 在 Cloud Composer 中创建和运行 Apache Airflow 工作流,目的是在 Cloud Storage 和 BigQuery 之间移动数据
设置和要求
点击“开始实验”按钮前的注意事项
请阅读以下说明。实验是计时的,并且您无法暂停实验。计时器在您点击开始实验后即开始计时,显示 Google Cloud 资源可供您使用多长时间。
此实操实验可让您在真实的云环境中开展实验活动,免受模拟或演示环境的局限。我们会为您提供新的临时凭据,让您可以在实验规定的时间内用来登录和访问 Google Cloud。
为完成此实验,您需要:
- 能够使用标准的互联网浏览器(建议使用 Chrome 浏览器)。
- 完成实验的时间 - 请注意,实验开始后无法暂停。
如何开始实验并登录 Google Cloud 控制台
-
点击开始实验按钮。如果该实验需要付费,系统会打开一个弹出式窗口供您选择付款方式。左侧是实验详细信息面板,其中包含以下各项:
- 打开 Google Cloud 控制台按钮
- 剩余时间
- 进行该实验时必须使用的临时凭据
- 帮助您逐步完成本实验所需的其他信息(如果需要)
-
点击打开 Google Cloud 控制台(如果您使用的是 Chrome 浏览器,请右键点击并选择在无痕式窗口中打开链接)。
该实验会启动资源并打开另一个标签页,显示登录页面。
提示:请将这些标签页安排在不同的窗口中,并将它们并排显示。
注意:如果您看见选择账号对话框,请点击使用其他账号。 -
如有必要,请复制下方的用户名,然后将其粘贴到登录对话框中。
{{{user_0.username | "<用户名>"}}} 您也可以在实验详细信息面板中找到用户名。
-
点击下一步。
-
复制下面的密码,然后将其粘贴到欢迎对话框中。
{{{user_0.password | "<密码>"}}} 您也可以在实验详细信息面板中找到密码。
-
点击下一步。
重要提示:您必须使用实验提供的凭据。请勿使用您的 Google Cloud 账号凭据。 注意:在本次实验中使用您自己的 Google Cloud 账号可能会产生额外费用。 -
继续在后续页面中点击以完成相应操作:
- 接受条款及条件。
- 由于该账号为临时账号,请勿添加账号恢复选项或双重验证。
- 请勿注册免费试用。
片刻之后,系统会在此标签页中打开 Google Cloud 控制台。
激活 Cloud Shell
Cloud Shell 是一种装有开发者工具的虚拟机。它提供了一个永久性的 5GB 主目录,并且在 Google Cloud 上运行。Cloud Shell 提供可用于访问您的 Google Cloud 资源的命令行工具。
- 点击 Google Cloud 控制台顶部的激活 Cloud Shell 。
如果您连接成功,即表示您已通过身份验证,且当前项目会被设为您的 PROJECT_ID 环境变量所指的项目。输出内容中有一行说明了此会话的 PROJECT_ID:
gcloud
是 Google Cloud 的命令行工具。它已预先安装在 Cloud Shell 上,且支持 Tab 自动补全功能。
- (可选)您可以通过此命令列出活跃账号名称:
-
点击授权。
-
现在,输出的内容应如下所示:
输出:
- (可选)您可以通过此命令列出项目 ID:
输出:
输出示例:
gcloud
, in Google Cloud, refer to the gcloud CLI overview guide.
任务 1. 创建一个 Cloud Composer 环境
-
在 Google Cloud 控制台标题栏中,在“搜索”字段中输入 Composer,然后点击“产品和页面”部分中的 Composer 以创建 Cloud Composer 环境。
-
然后点击创建环境。
-
在下拉菜单中,选择 Composer 3。
-
为您的环境设置以下参数:
-
名称:composer-advanced-lab
-
位置:
-
映像版本:composer-3-airflow-n.n.n-build.n(选择可用的最高数值映像)
-
在环境资源下,选择小。
-
点击下拉菜单中的显示高级配置,然后选择 Airflow 数据库可用区作为
。
将所有其他设置保留为默认设置。
- 点击创建。
当 Cloud 控制台中的“环境”页面上环境名称左侧显示绿色对勾标记时,即表示环境创建过程已完成。
创建 Cloud Storage 存储桶和 BigQuery 目标数据集
。点击检查我的进度以验证是否完成了以下目标:
任务 2. 创建 Cloud Storage 存储桶
在此任务中,您将创建两个 Cloud Storage 多区域存储桶。这些存储桶将用于在不同位置之间(即美国至欧盟)复制导出的表。
创建位于美国的存储桶
- 前往 Cloud Storage > 存储桶,然后点击创建。
- 为存储桶指定一个包含项目 ID 的全局唯一名称(例如
-us)。 - 对于位置类型,请选择 us(美国的多个区域)。
- 将其他值保留为默认值,然后点击创建。
- 勾选
禁止公开访问此存储桶
复选框,然后点击系统将禁止公开访问
弹出式窗口中的确认(如果出现该弹出式窗口)。
创建位于欧盟的存储桶
重复上述步骤,在 EU
区域创建另一个存储桶。全局唯一名称应将位置作为存储桶的后缀(例如
点击检查我的进度以验证是否完成了以下目标:
任务 3. 创建 BigQuery 目标数据集
-
从 BigQuery 的新版 Web 界面中,创建位于欧盟的目标 BigQuery 数据集。
-
前往导航菜单 > BigQuery。
您会看到欢迎在 Cloud 控制台中使用 BigQuery 消息框,其中会显示快速入门指南的链接以及界面更新。
-
点击完成。
-
然后,点击 Qwiklabs 项目 ID 旁边的三个点,选择创建数据集。
- 使用数据集 ID nyc_tlc_EU,为位置类型选择多区域,然后从下拉菜单中选择 EU。
- 点击创建数据集。
点击检查我的进度以验证是否完成了以下目标:
任务 4. Airflow 和核心概念简介
- 在环境构建期间,请阅读您将在本实验中使用的示例文件。
Airflow 是一个以编程方式编写、安排和监控工作流的平台。
使用 Airflow 将工作流编写为任务的有向无环图 (DAG)。Airflow 调度器会在遵循指定依赖项的同时在一组工作器上执行您的任务。
核心概念
DAG - 有向无环图是任务的集合,即为了反映任务的关系和依赖项而整理的集合。
执行器 (Operator) - 单个任务的描述,通常为原子方式。例如,“BashOperator”用于执行 Bash 命令。
任务 - 参数化的执行器实例;DAG 中的节点。
任务实例 - 任务的特定运行作业;特征包括:DAG、任务和时间点。它具有指示性状态:running(正在运行)、success(成功)、failed(失败)、skipped(已跳过)...
如需详细了解 Airflow 概念,请参阅“概念”文档。
任务 5. 定义工作流
Cloud Composer 工作流由 DAG(有向无环图)组成。bq_copy_across_locations.py 中显示的代码就是工作流代码,也称为 DAG。现在,打开该文件,了解其构建方式。接下来,我们来详细了解该文件的一些关键组成部分。
为了编排所有工作流任务,DAG 会导入以下执行器:
-
DummyOperator
:创建 Start 和 End 虚拟任务,以更好的视觉方式呈现 DAG。 -
BigQueryToCloudStorageOperator
:使用 Avro 格式将 BigQuery 表导出到 Cloud Storage 存储桶。 -
GoogleCloudStorageToGoogleCloudStorageOperator
:跨 Cloud Storage 存储桶复制文件。 -
GoogleCloudStorageToBigQueryOperator
:从 Cloud Storage 存储桶中的 Avro 文件导入表。
- 此示例定义了函数
read_table_list()
来读取配置文件并构建要复制的表的列表:
- 此 DAG 的名称为
bq_copy_us_to_eu_01
,默认情况下未调度,因此需要手动触发。
- 为了定义 Cloud Storage 插件,系统定义了
Cloud StoragePlugin(AirflowPlugin)
类,以便映射从 Airflow 1.10-stable 分支下载的钩子和执行器。
任务 6. 查看环境信息
-
返回 Composer 查看环境状态。
-
创建环境后,点击环境名称可查看其详细信息。
环境详情页面会提供 Airflow 网页界面网址、Google Kubernetes Engine 集群 ID、与 DAG 文件夹关联的 Cloud Storage 存储桶的名称等信息。
后续步骤应在 Cloud Shell 中完成。
创建虚拟环境
Python 虚拟环境用于将软件包安装与系统隔离开来。
- 安装
virtualenv
环境:
- 构建虚拟环境:
- 激活虚拟环境。
任务 7. 为 DAG Cloud Storage 存储桶创建一个变量
- 在 Cloud Shell 中,运行以下命令以从“环境详情”页面复制 DAG 存储桶的名称,并设置一个变量以在 Cloud Shell 中引用该名称:
-composer-advanced-YOURDAGSBUCKET-bucket
。在本实验中,您将多次使用此变量。
任务 8. 设置 Airflow 变量
Airflow 变量是 Airflow 特有的一个概念,此类变量不同于环境变量。在此步骤中,您将设置我们要部署的 DAG 使用的以下三个 Airflow 变量:table_list_file_path
、gcs_source_bucket
和 gcs_dest_bucket
。
键 | 值 | 详细信息 |
---|---|---|
table_list_file_path |
/home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv | 列出源表和目标表的 CSV 文件(包括数据集) |
gcs_source_bucket |
{UNIQUE ID}-us | 用于从来源导出 BigQuery 表 tabledest_bbucks 的 Cloud Storage 存储桶 |
gcs_dest_bucket |
{UNIQUE ID}-eu | 用于在目标位置导入 BigQuery 表的 Cloud Storage 存储桶 |
下一个 gcloud composer
命令会执行 Airflow CLI 子命令 variables。该子命令会将参数传递给 gcloud
命令行工具。
如需设置这三个变量,需要针对上表中的每一行运行一次 composer command
。该命令的形式如下:
(ERROR: gcloud crashed (TypeError): 'NoneType' object is not callable)
。这是将 gcloud composer environments run
与 410.0.0 版 gcloud 一起使用时会出现的已知问题。尽管会显示错误消息,但您的变量仍会相应得到设置。
-
ENVIRONMENT_NAME
是环境的名称。 -
LOCATION
是环境所在的 Compute Engine 区域。在运行 gcloud 命令之前,gcloud composer 命令需要包含--location
标志或设置默认位置。 -
KEY
和VALUE
用于指定要设置的变量及变量值。在左侧包含 gcloud 相关参数的gcloud
命令与右侧包含 Airflow 子命令相关参数之间添加一个空格两个短划线一个空格 (--
)。此外,在KEY
与VALUE
参数之间添加一个空格。使用gcloud composer environments run
命令,并在其中使用变量子命令
在 Cloud Shell 中运行以下命令,将 gcs_source_bucket
和 gcs_dest_bucket
替换为您在任务 2 中创建的存储桶的名称。
如需查看变量的值,请运行带 get
参数的 Airflow CLI 子命令 variables,或使用 Airflow 界面。
例如,运行以下命令:
任务 9. 将 DAG 及其依赖项上传到 Cloud Storage
- 将 Google Cloud Python 文档示例文件复制到 Cloud Shell 中:
- 将第三方钩子和执行器的副本上传到 Composer DAG 中 Cloud Storage 存储桶的“plugins”文件夹:
- 接下来,将 DAG 和配置文件上传到环境的 DAG Cloud Storage 存储桶:
Cloud Composer 会在您的 Airflow 环境中自动注册 DAG,然后 DAG 会在 3-5 分钟内发生更改。您可以在 Airflow 网页界面中查看任务状态,并确认 DAG 未按设置进行调度。
任务 10. 浏览 Airflow 界面
如需使用 Cloud 控制台访问 Airflow 网页界面,请执行以下操作:
- 返回 Composer 环境页面。
- 在环境对应的 Airflow Web 服务器列中,点击 Airflow 链接。
- 点击您的实验凭据。
- Airflow 网页界面会在新的浏览器窗口中打开。在您到达此页面时,系统将仍在加载数据。在等待期间,您可以继续该实验。
查看变量
您之前设置的变量会保留在您的环境中。
- 从 Airflow 菜单栏中选择 Admin > Variables,以查看变量。
手动触发 DAG 运行
-
点击 DAG 标签页,然后等待链接完成加载。
-
如需手动触发 DAG,请点击
composer_sample_bq_copy_across_locations
对应的执行按钮:
- 点击触发 DAG,确认执行此操作。
点击检查我的进度以验证是否完成了以下目标:
探索 DAG 运行
当您将 DAG 文件上传到 Cloud Storage 中的 DAG 文件夹时,Cloud Composer 会解析该文件。如果未发现任何错误,工作流的名称会显示在 DAG 列表中,并且如果满足调度条件(本例设置为“无”),工作流会排入队列以立即运行。
按下执行按钮后,运行状态会变为绿色:
- 点击 DAG 的名称,打开 DAG 详情页面。此页面包含工作流任务和依赖项的图形表示。
- 现在,在工具栏中点击图表,然后将鼠标悬停在每个任务对应的图形上,查看其状态。请注意,每个任务的边框也指示状态(绿色边框表示任务正在运行中;红色边框表示任务失败,等等)。
如需从图表视图重新运行工作流,请执行以下操作:
- 在 Airflow 界面的“图表视图”中,点击启动图形。
- 点击清除重置所有任务,然后点击 OK 进行确认。
在该流程运行期间,请刷新浏览器以查看最新信息。
任务 11. 验证结果
现在,请转到以下 Cloud 控制台页面,检查工作流的状态和结果:
- 导出的表已从位于美国的存储桶复制到位于欧盟的 Cloud Storage 存储桶中。点击 Cloud Storage,查看源(美国)和目标(欧盟)存储桶中的中间 Avro 文件。
- 表列的表已导入目标 BigQuery 数据集。点击 BigQuery,然后点击您的项目名称和 nyc_tlc_EU 数据集,验证是否可以从您创建的数据集访问这些表。
删除 Cloud Composer 环境
-
返回 Composer 中的环境页面。
-
选择 Composer 环境旁边的复选框。
-
点击删除。
-
再次点击删除以确认弹出式内容。
恭喜!
您已通过编程方式将表从美国复制到欧盟!本实验基于 David Sabater Dinter 的这篇博文。
后续步骤
- 如需详细了解如何使用 Airflow,请访问 Airflow 网站或 Airflow GitHub 项目。
- 关于 Airflow 还有许多其他资源可供参考,包括论坛。
- 注册 Apache JIRA 账号,并在 Apache Airflow JIRA 项目中重新打开您关注的任何问题。
- 如需了解 Airflow 界面,请参阅访问网页界面。
Google Cloud 培训和认证
…可帮助您充分利用 Google Cloud 技术。我们的课程会讲解各项技能与最佳实践,可帮助您迅速上手使用并继续学习更深入的知识。我们提供从基础到高级的全方位培训,并有点播、直播和虚拟三种方式选择,让您可以按照自己的日程安排学习时间。各项认证可以帮助您核实并证明您在 Google Cloud 技术方面的技能与专业知识。
上次更新手册的时间:2024 年 6 月 21 日
上次测试实验的时间:2024 年 6 月 21 日
版权所有 2024 Google LLC 保留所有权利。Google 和 Google 徽标是 Google LLC 的商标。其他所有公司名和产品名可能是其各自相关公司的商标。