检查点
Disable and re-enable the Dataflow API
/ 10
Create a Cloud Storage Bucket
/ 10
Copy Files to Your Bucket
/ 10
Create the BigQuery Dataset (name: lake)
/ 20
Build a Data Ingestion Dataflow Pipeline
/ 10
Build a Data Transformation Dataflow Pipeline
/ 10
Build a Data Enrichment Dataflow Pipeline
/ 10
Build a Data lake to Mart Dataflow Pipeline
/ 20
使用 Dataflow 和 BigQuery (Python) 在 Google Cloud 上进行 ETL 处理
- GSP290
- 概览
- 设置和要求
- 任务 1. 确保 Dataflow API 已成功启用
- 任务 2. 下载起始代码
- 任务 3. 创建 Cloud Storage 存储桶
- 任务 4. 将文件复制到存储桶
- 任务 5. 创建 BigQuery 数据集
- 任务 6. 构建 Dataflow 流水线
- 任务 7. 使用 Dataflow 流水线进行数据注入
- 任务 8. 查看流水线 Python 代码
- 任务 9. 运行 Apache Beam 流水线
- 任务 10. 数据转换
- 任务 11. 运行 Dataflow 转换流水线
- 任务 12. 数据丰富化
- 任务 13. 查看数据丰富化流水线 Python 代码
- 任务 14. 运行数据丰富化 Dataflow 流水线
- 任务 15. 从数据湖到数据集市,并查看流水线 Python 代码
- 任务 16. 运行 Apache Beam 流水线,以执行数据联接并在 BigQuery 中创建所产生的表
- 检验您的掌握情况
- 恭喜!
GSP290
概览
在 Google Cloud 中,您可以使用以下 Google Cloud 服务构建执行 Python 代码的数据流水线,以便将可公开访问的数据集中的数据注入到 BigQuery 中并进行转换:
- Cloud Storage
- Dataflow
- BigQuery
在本实验中,您将使用上述服务创建自己的数据流水线(包括设计注意事项和实施细节),以确保您的原型符合要求。请务必打开 Python 文件,并按照说明查看注释。
您将执行的操作
在本实验中,您将学习如何完成以下操作:
- 构建并运行用于进行数据注入的 Dataflow 流水线 (Python)
- 构建并运行用于进行数据转换和丰富化的 Dataflow 流水线 (Python)
设置和要求
点击“开始实验”按钮前的注意事项
请阅读以下说明。实验是计时的,并且您无法暂停实验。计时器在您点击开始实验后即开始计时,显示 Google Cloud 资源可供您使用多长时间。
此实操实验可让您在真实的云环境中开展实验活动,免受模拟或演示环境的局限。我们会为您提供新的临时凭据,让您可以在实验规定的时间内用来登录和访问 Google Cloud。
为完成此实验,您需要:
- 能够使用标准的互联网浏览器(建议使用 Chrome 浏览器)。
- 完成实验的时间 - 请注意,实验开始后无法暂停。
如何开始实验并登录 Google Cloud 控制台
-
点击开始实验按钮。如果该实验需要付费,系统会打开一个弹出式窗口供您选择付款方式。左侧是实验详细信息面板,其中包含以下各项:
- 打开 Google Cloud 控制台按钮
- 剩余时间
- 进行该实验时必须使用的临时凭据
- 帮助您逐步完成本实验所需的其他信息(如果需要)
-
点击打开 Google Cloud 控制台(如果您使用的是 Chrome 浏览器,请右键点击并选择在无痕式窗口中打开链接)。
该实验会启动资源并打开另一个标签页,显示登录页面。
提示:请将这些标签页安排在不同的窗口中,并将它们并排显示。
注意:如果您看见选择账号对话框,请点击使用其他账号。 -
如有必要,请复制下方的用户名,然后将其粘贴到登录对话框中。
{{{user_0.username | "<用户名>"}}} 您也可以在实验详细信息面板中找到用户名。
-
点击下一步。
-
复制下面的密码,然后将其粘贴到欢迎对话框中。
{{{user_0.password | "<密码>"}}} 您也可以在实验详细信息面板中找到密码。
-
点击下一步。
重要提示:您必须使用实验提供的凭据。请勿使用您的 Google Cloud 账号凭据。 注意:在本次实验中使用您自己的 Google Cloud 账号可能会产生额外费用。 -
继续在后续页面中点击以完成相应操作:
- 接受条款及条件。
- 由于该账号为临时账号,请勿添加账号恢复选项或双重验证。
- 请勿注册免费试用。
片刻之后,系统会在此标签页中打开 Google Cloud 控制台。
激活 Cloud Shell
Cloud Shell 是一种装有开发者工具的虚拟机。它提供了一个永久性的 5GB 主目录,并且在 Google Cloud 上运行。Cloud Shell 提供可用于访问您的 Google Cloud 资源的命令行工具。
- 点击 Google Cloud 控制台顶部的激活 Cloud Shell 。
如果您连接成功,即表示您已通过身份验证,且当前项目会被设为您的 PROJECT_ID 环境变量所指的项目。输出内容中有一行说明了此会话的 PROJECT_ID:
gcloud
是 Google Cloud 的命令行工具。它已预先安装在 Cloud Shell 上,且支持 Tab 自动补全功能。
- (可选)您可以通过此命令列出活跃账号名称:
-
点击授权。
-
现在,输出的内容应如下所示:
输出:
- (可选)您可以通过此命令列出项目 ID:
输出:
输出示例:
gcloud
, in Google Cloud, refer to the gcloud CLI overview guide.
任务 1. 确保 Dataflow API 已成功启用
为了确保能访问这个必要的 API,请重新启动与 Dataflow API 的连接。
-
在 Cloud 控制台的顶部搜索栏中输入“Dataflow API”。点击 Dataflow API 的搜索结果。
-
点击管理。
-
点击停用 API。
如果系统要求您确认,请点击停用。
- 点击启用。
该 API 再次启用后,页面将会显示停用选项。
验证您已完成的任务
点击检查我的进度以验证您已完成的任务。
任务 2. 下载起始代码
- 在 Cloud Shell 中运行以下命令,从 Google Cloud 的专业服务 GitHub 获取 Dataflow Python 示例:
- 现在,在 Cloud Shell 中设置一个值为您的项目 ID 的变量。
任务 3. 创建 Cloud Storage 存储桶
- 使用 Cloud Shell 中的创建存储桶命令,在项目内的
区域中新建一个区域级存储桶:
验证您已完成的任务
点击检查我的进度以验证您已完成的任务。
任务 4. 将文件复制到存储桶
- 在 Cloud Shell 中,使用
gsutil
命令将文件复制到刚才创建的 Cloud Storage 存储桶中:
验证您已完成的任务
点击检查我的进度以验证您已完成的任务。
任务 5. 创建 BigQuery 数据集
- 在 Cloud Shell 中,在 BigQuery 数据集中创建一个名为
lake
的数据集。您的所有表都将加载到 BigQuery 中的这个数据集中:
验证您已完成的任务
点击检查我的进度以验证您已完成的任务。
任务 6. 构建 Dataflow 流水线
在此部分,您将创建一个仅附加 Dataflow,用于将数据注入到 BigQuery 表中。您可以使用内置代码编辑器,在 Google Cloud 控制台中查看和修改代码。
打开 Cloud Shell 代码编辑器
- 点击 Cloud Shell 中的打开编辑器图标以访问源代码:
- 如果出现提示,点击在新窗口中打开。代码编辑器即会在新窗口中打开。使用 Cloud Shell 编辑器可以在 Cloud Shell 环境中修改文件,在编辑器中点击打开终端即可返回到 Cloud Shell。
任务 7. 使用 Dataflow 流水线进行数据注入
接下来,您将构建一个具有 TextIO 来源和 BigQueryIO 目标的 Dataflow 流水线,用于将数据注入到 BigQuery 中。具体而言,该流水线将执行以下操作:
- 从 Cloud Storage 提取文件。
- 滤除文件中的标题行。
- 将读取的行转换为字典对象。
- 将行输出到 BigQuery。
任务 8. 查看流水线 Python 代码
在代码编辑器中,前往 dataflow-python-examples
> dataflow_python_examples
,打开 data_ingestion.py
文件。查看文件中的注释,这些注释解释了代码的作用。该代码将使用 BigQuery 中的表来填充数据集 lake。
任务 9. 运行 Apache Beam 流水线
- 您需要返回到 Cloud Shell 会话来执行此步骤。现在,您要为必需的 python 库进行一些设置。
在本实验中,Dataflow 作业需要使用 Python3.8
。为确保您使用的是正确的版本,您需要在 Python 3.8 Docker 容器中运行 Dataflow 进程。
- 在 Cloud Shell 中运行以下命令来启动 Python 容器:
此命令会拉取包含 Python 3.8 最新稳定版本的 Docker 容器,并执行一个命令 shell,以便在容器内运行后续命令。-v
标志会将源代码作为容器的一个卷
提供,这样,我们在 Cloud Shell 编辑器中修改源代码的同时,仍能在运行的容器中访问源代码。
- 容器完成拉取并开始在 Cloud Shell 中执行后,运行以下命令将
apache-beam
安装在运行的容器中:
- 接下来,在 Cloud Shell 中运行的容器内,将目录切换到链接源代码的位置:
在云端运行注入 Dataflow 流水线
- 以下命令将启动必需的工作器,并在其完成工作后将它们关闭:
- 返回到 Cloud 控制台,打开导航菜单 > Dataflow 以查看作业状态。
-
点击作业名称查看其进度。作业状态显示为成功后,您便可执行下一步。此 Dataflow 流水线从启动、完成工作到关闭大约需要五分钟时间。
-
前往 BigQuery(导航菜单 > BigQuery),确认是否已填充数据。
- 点击项目名称,查看
lake
数据集下的 usa_names 表。
- 点击此表,然后前往预览标签页,查看
usa_names
数据的示例。
usa_names
表,请尝试刷新页面或使用经典版 BigQuery 界面来查看此表。
验证您已完成的任务
点击检查我的进度以验证您已完成的任务。
任务 10. 数据转换
接下来,您将构建一个具有 TextIO 来源和 BigQueryIO 目标的 Dataflow 流水线,用于将数据注入到 BigQuery 中。具体操作为:
- 从 Cloud Storage 提取文件。
- 将读取的行转换为字典对象。
- 将包含年份的数据转换为 BigQuery 视为日期的格式。
- 将行输出到 BigQuery。
查看转换流水线 Python 代码
在代码编辑器中,打开 data_transformation.py
文件。查看文件中的注释,这些注释解释了代码的作用。
任务 11. 运行 Dataflow 转换流水线
您将在云端运行 Dataflow 流水线。这会启动必需的工作器,并在其完成工作后将它们关闭。
- 请运行以下命令来完成该任务:
-
前往导航菜单 > Dataflow,然后点击此作业的名称查看作业状态。此 Dataflow 流水线从启动、完成工作到关闭大约需要五分钟时间。
-
当 Dataflow“作业状态”屏幕中的作业状态显示为成功后,前往 BigQuery 确认是否已填充数据。
-
您应该可在
lake
数据集下看到 usa_names_transformed 表。 -
点击此表,然后前往预览标签页,查看
usa_names_transformed
数据的示例。
usa_names_transformed
表,请尝试刷新页面或使用经典版 BigQuery 界面来查看此表。
验证您已完成的任务
点击检查我的进度以验证您已完成的任务。
任务 12. 数据丰富化
接下来,您将构建一个具有 TextIO 来源和 BigQueryIO 目标的 Dataflow 流水线,用于将数据注入到 BigQuery 中。具体操作为:
- 从 Cloud Storage 提取文件。
- 滤除文件中的标题行。
- 将读取的行转换为字典对象。
- 将行输出到 BigQuery。
任务 13. 查看数据丰富化流水线 Python 代码
-
在代码编辑器中,打开
data_enrichment.py
文件。 -
查看注释,这些注释解释了代码的作用。该代码将在 BigQuery 中填充数据。
第 83 行现在如下所示:
- 将该行修改为如下所示的内容:
- 修改好该行后,请务必在编辑器中选择文件下拉菜单并点击保存,以保存更新后的文件
任务 14. 运行数据丰富化 Dataflow 流水线
接下来,您将在云端运行 Dataflow 流水线。
- 在 Cloud Shell 中运行以下命令,以启动必需的工作器并在其完成工作后将它们关闭:
-
前往导航菜单 > Dataflow,查看作业状态。此 Dataflow 流水线从启动、完成工作到关闭大约需要五分钟时间。
-
当 Dataflow“作业状态”屏幕中的作业状态显示为成功后,前往 BigQuery 确认是否已填充数据。
您应该可在 lake
数据集下看到 usa_names_enriched 表。
- 点击此表,然后前往预览标签页,查看
usa_names_enriched
数据的示例。
usa_names_enriched
表,请尝试刷新页面或使用经典版 BigQuery 界面来查看此表。
测试您已完成的数据丰富化任务
点击检查我的进度以验证您已完成的任务。
任务 15. 从数据湖到数据集市,并查看流水线 Python 代码
接下来,您需要构建一条 Dataflow 流水线,以从两个 BigQuery 数据源读取数据,然后联接这两个数据源。具体操作是:
- 从两个 BigQuery 数据源提取文件。
- 联接两个数据源。
- 滤除文件中的标题行。
- 将读取的行转换为字典对象。
- 将行输出到 BigQuery。
在代码编辑器中,打开 data_lake_to_mart.py
文件。查看文件中的注释,这些注释解释了代码的作用。该代码将联接两个表并在 BigQuery 中填充所产生的数据。
任务 16. 运行 Apache Beam 流水线,以执行数据联接并在 BigQuery 中创建所产生的表
接下来,在云端运行 Dataflow 流水线。
- 在 Cloud Shell 中运行以下代码块,以启动必需的工作器并在其完成工作后将它们关闭:
-
前往导航菜单 > Dataflow,然后点击该新作业的名称查看作业状态。此 Dataflow 流水线从启动、完成工作到关闭大约需要五分钟时间。
-
当 Dataflow“作业状态”屏幕中的作业状态显示为成功后,前往 BigQuery 确认是否已填充数据。
您应该可在 lake
数据集下看到 orders_denormalized_sideinput 表。
- 点击此表,然后前往预览部分,查看
orders_denormalized_sideinput
数据的示例。
orders_denormalized_sideinput
表,请尝试刷新页面或使用经典版 BigQuery 界面来查看此表。
测试您已完成的联接任务
点击检查我的进度以验证您已完成的任务。
检验您的掌握情况
下面的选择题可加强您对本实验所涉概念的理解。请尽您所能回答问题。
恭喜!
您使用 Dataflow 执行了 Python 代码,将数据注入到 BigQuery 中并转换了数据。
后续步骤/了解详情
想要查找更多参考信息?请点击以下链接查看官方文档:
- Dataflow
- BigQuery
- 查看 Apache Beam 程序设计指南,了解更多高级概念。
- 参阅以下实验:
Google Cloud 培训和认证
…可帮助您充分利用 Google Cloud 技术。我们的课程会讲解各项技能与最佳实践,可帮助您迅速上手使用并继续学习更深入的知识。我们提供从基础到高级的全方位培训,并有点播、直播和虚拟三种方式选择,让您可以按照自己的日程安排学习时间。各项认证可以帮助您核实并证明您在 Google Cloud 技术方面的技能与专业知识。
上次更新手册的时间:2024 年 2 月 11 日
上次测试实验的时间:2023 年 10 月 12 日
版权所有 2024 Google LLC 保留所有权利。Google 和 Google 徽标是 Google LLC 的商标。其他所有公司名和产品名可能是其各自相关公司的商标。