arrow_back

Cloud Composer:跨不同位置复制 BigQuery 表

登录 加入
欢迎加入我们的社区,一起测试和分享您的知识!
done
学习 700 多个动手实验和课程并获得相关技能徽章

Cloud Composer:跨不同位置复制 BigQuery 表

实验 1 小时 15 分钟 universal_currency_alt 5 个积分 show_chart 中级
info 此实验可能会提供 AI 工具来支持您学习。
欢迎加入我们的社区,一起测试和分享您的知识!
done
学习 700 多个动手实验和课程并获得相关技能徽章

GSP283

Google Cloud 自定进度实验

概览

假设您的数据集位于全球不同位置,且数据存储在 Google Cloud Storage 存储桶或 BigQuery 表中。为了进行整合和分析,从而为您的业务提供分析洞见,您应该如何整理这些数据?

Cloud Composer 提供直观的图形视图,可帮助您构建工作流,在不同区域和存储系统之间移动数据。此产品有众多优势,其中包括提供了模板,让您能够在 BigQuery 与 Cloud Storage 之间轻松可靠地完成双向数据传输。

在本实验中,您将在 Cloud Composer 中创建并运行完成以下任务的 Apache Airflow 工作流:

  • 读取列出了要复制的表的配置文件
  • 将位于美国 BigQuery 数据集的表的列表导出到 Cloud Storage
  • 将导出的表从位于美国的 Cloud Storage 存储桶复制到位于欧盟的 Cloud Storage 存储桶
  • 将表的列表导入位于欧盟的目标 BigQuery 数据集

DAG 图表视图

您将执行的操作

在本实验中,您将学习如何完成以下操作:

  • 创建一个 Cloud Composer 环境
  • 创建 Cloud Storage 存储桶
  • 创建 BigQuery 数据集
  • 在 Cloud Composer 中创建和运行 Apache Airflow 工作流,目的是在 Cloud Storage 和 BigQuery 之间移动数据

设置和要求

点击“开始实验”按钮前的注意事项

请阅读以下说明。实验是计时的,并且您无法暂停实验。计时器在您点击开始实验后即开始计时,显示 Google Cloud 资源可供您使用多长时间。

此实操实验可让您在真实的云环境中开展实验活动,免受模拟或演示环境的局限。我们会为您提供新的临时凭据,让您可以在实验规定的时间内用来登录和访问 Google Cloud。

为完成此实验,您需要:

  • 能够使用标准的互联网浏览器(建议使用 Chrome 浏览器)。
注意:请使用无痕模式或无痕浏览器窗口运行此实验。这可以避免您的个人账号与学生账号之间发生冲突,这种冲突可能导致您的个人账号产生额外费用。
  • 完成实验的时间 - 请注意,实验开始后无法暂停。
注意:如果您已有自己的个人 Google Cloud 账号或项目,请不要在此实验中使用,以避免您的账号产生额外的费用。

如何开始实验并登录 Google Cloud 控制台

  1. 点击开始实验按钮。如果该实验需要付费,系统会打开一个弹出式窗口供您选择付款方式。左侧是实验详细信息面板,其中包含以下各项:

    • 打开 Google Cloud 控制台按钮
    • 剩余时间
    • 进行该实验时必须使用的临时凭据
    • 帮助您逐步完成本实验所需的其他信息(如果需要)
  2. 点击打开 Google Cloud 控制台(如果您使用的是 Chrome 浏览器,请右键点击并选择在无痕式窗口中打开链接)。

    该实验会启动资源并打开另一个标签页,显示登录页面。

    提示:请将这些标签页安排在不同的窗口中,并将它们并排显示。

    注意:如果您看见选择账号对话框,请点击使用其他账号
  3. 如有必要,请复制下方的用户名,然后将其粘贴到登录对话框中。

    {{{user_0.username | "<用户名>"}}}

    您也可以在实验详细信息面板中找到用户名

  4. 点击下一步

  5. 复制下面的密码,然后将其粘贴到欢迎对话框中。

    {{{user_0.password | "<密码>"}}}

    您也可以在实验详细信息面板中找到密码

  6. 点击下一步

    重要提示:您必须使用实验提供的凭据。请勿使用您的 Google Cloud 账号凭据。 注意:在本次实验中使用您自己的 Google Cloud 账号可能会产生额外费用。
  7. 继续在后续页面中点击以完成相应操作:

    • 接受条款及条件。
    • 由于该账号为临时账号,请勿添加账号恢复选项或双重验证。
    • 请勿注册免费试用。

片刻之后,系统会在此标签页中打开 Google Cloud 控制台。

注意:如需查看列有 Google Cloud 产品和服务的菜单,请点击左上角的导航菜单导航菜单图标

激活 Cloud Shell

Cloud Shell 是一种装有开发者工具的虚拟机。它提供了一个永久性的 5GB 主目录,并且在 Google Cloud 上运行。Cloud Shell 提供可用于访问您的 Google Cloud 资源的命令行工具。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell “激活 Cloud Shell”图标

如果您连接成功,即表示您已通过身份验证,且当前项目会被设为您的 PROJECT_ID 环境变量所指的项目。输出内容中有一行说明了此会话的 PROJECT_ID

Your Cloud Platform project in this session is set to YOUR_PROJECT_ID

gcloud 是 Google Cloud 的命令行工具。它已预先安装在 Cloud Shell 上,且支持 Tab 自动补全功能。

  1. (可选)您可以通过此命令列出活跃账号名称:
gcloud auth list
  1. 点击授权

  2. 现在,输出的内容应如下所示:

输出:

ACTIVE: * ACCOUNT: student-01-xxxxxxxxxxxx@qwiklabs.net To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (可选)您可以通过此命令列出项目 ID:
gcloud config list project

输出

[core] project = <project_ID>

输出示例

[core] project = qwiklabs-gcp-44776a13dea667a6 Note: For full documentation of gcloud, in Google Cloud, refer to the gcloud CLI overview guide.

任务 1. 创建一个 Cloud Composer 环境

  1. 在 Google Cloud 控制台标题栏中,在“搜索”字段中输入 Composer,然后点击“产品和页面”部分中的 Composer 以创建 Cloud Composer 环境。

  2. 然后点击创建环境

  3. 在下拉菜单中,选择 Composer 3

  4. 为您的环境设置以下参数:

  • 名称:composer-advanced-lab

  • 位置:

  • 映像版本:composer-3-airflow-n.n.n-build.n(选择可用的最高数值映像)

  • 环境资源下,选择

  • 点击下拉菜单中的显示高级配置,然后选择 Airflow 数据库可用区作为

将所有其他设置保留为默认设置。

  1. 点击创建

当 Cloud 控制台中的“环境”页面上环境名称左侧显示绿色对勾标记时,即表示环境创建过程已完成。

注意:环境设置过程可能需要长达 20 分钟时间才能完成。请转到下一部分创建 Cloud Storage 存储桶和 BigQuery 目标数据集

点击检查我的进度以验证是否完成了以下目标:

创建 Cloud Composer 环境。

任务 2. 创建 Cloud Storage 存储桶

在此任务中,您将创建两个 Cloud Storage 多区域存储桶。这些存储桶将用于在不同位置之间(即美国至欧盟)复制导出的表。

创建位于美国的存储桶

  1. 前往 Cloud Storage > 存储桶,然后点击创建
  2. 为存储桶指定一个包含项目 ID 的全局唯一名称(例如 -us)。
  3. 对于位置类型,请选择 us(美国的多个区域)
  4. 将其他值保留为默认值,然后点击创建
  5. 勾选禁止公开访问此存储桶复选框,然后点击系统将禁止公开访问弹出式窗口中的确认(如果出现该弹出式窗口)。

创建位于欧盟的存储桶

重复上述步骤,在 EU 区域创建另一个存储桶。全局唯一名称应将位置作为存储桶的后缀(例如 -eu)。

点击检查我的进度以验证是否完成了以下目标:

创建两个 Cloud Storage 存储桶。

任务 3. 创建 BigQuery 目标数据集

  1. 从 BigQuery 的新版 Web 界面中,创建位于欧盟的目标 BigQuery 数据集。

  2. 前往导航菜单 > BigQuery

您会看到欢迎在 Cloud 控制台中使用 BigQuery 消息框,其中会显示快速入门指南的链接以及界面更新。

  1. 点击完成

  2. 然后,点击 Qwiklabs 项目 ID 旁边的三个点,选择创建数据集

突出显示的“创建数据集”

  1. 使用数据集 ID nyc_tlc_EU,为位置类型选择多区域,然后从下拉菜单中选择 EU

“数据集 ID”文本字段和“数据位置”下拉菜单

  1. 点击创建数据集

点击检查我的进度以验证是否完成了以下目标:

创建数据集。

任务 4. Airflow 和核心概念简介

  • 在环境构建期间,请阅读您将在本实验中使用的示例文件。

Airflow 是一个以编程方式编写、安排和监控工作流的平台。

使用 Airflow 将工作流编写为任务的有向无环图 (DAG)。Airflow 调度器会在遵循指定依赖项的同时在一组工作器上执行您的任务。

核心概念

DAG - 有向无环图是任务的集合,即为了反映任务的关系和依赖项而整理的集合。

执行器 (Operator) - 单个任务的描述,通常为原子方式。例如,“BashOperator”用于执行 Bash 命令。

任务 - 参数化的执行器实例;DAG 中的节点。

任务实例 - 任务的特定运行作业;特征包括:DAG、任务和时间点。它具有指示性状态:running(正在运行)、success(成功)、failed(失败)、skipped(已跳过)...

如需详细了解 Airflow 概念,请参阅“概念”文档

任务 5. 定义工作流

Cloud Composer 工作流由 DAG(有向无环图)组成。bq_copy_across_locations.py 中显示的代码就是工作流代码,也称为 DAG。现在,打开该文件,了解其构建方式。接下来,我们来详细了解该文件的一些关键组成部分。

为了编排所有工作流任务,DAG 会导入以下执行器:

  1. DummyOperator:创建 Start 和 End 虚拟任务,以更好的视觉方式呈现 DAG。
  2. BigQueryToCloudStorageOperator:使用 Avro 格式将 BigQuery 表导出到 Cloud Storage 存储桶。
  3. GoogleCloudStorageToGoogleCloudStorageOperator:跨 Cloud Storage 存储桶复制文件。
  4. GoogleCloudStorageToBigQueryOperator:从 Cloud Storage 存储桶中的 Avro 文件导入表。
  • 此示例定义了函数 read_table_list() 来读取配置文件并构建要复制的表的列表:
# -------------------------------------------------------------------------------- # Functions # -------------------------------------------------------------------------------- def read_table_list(table_list_file): """ Reads the master CSV file that will help in creating Airflow tasks in the DAG dynamically. :param table_list_file: (String) The file location of the master file, e.g. '/home/airflow/framework/master.csv' :return master_record_all: (List) List of Python dictionaries containing the information for a single row in master CSV file. """ master_record_all = [] logger.info('Reading table_list_file from : %s' % str(table_list_file)) try: with open(table_list_file, 'rb') as csv_file: csv_reader = csv.reader(csv_file) next(csv_reader) # skip the headers for row in csv_reader: logger.info(row) master_record = { 'table_source': row[0], 'table_dest': row[1] } master_record_all.append(master_record) return master_record_all except IOError as e: logger.error('Error opening table_list_file %s: ' % str( table_list_file), e)
  • 此 DAG 的名称为 bq_copy_us_to_eu_01,默认情况下未调度,因此需要手动触发。
default_args = { 'owner': 'airflow', 'start_date': datetime.today(), 'depends_on_past': False, 'email': [''], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # DAG object. with models.DAG('bq_copy_us_to_eu_01', default_args=default_args, schedule_interval=None) as dag:
  • 为了定义 Cloud Storage 插件,系统定义了 Cloud StoragePlugin(AirflowPlugin) 类,以便映射从 Airflow 1.10-stable 分支下载的钩子和执行器。
# Import operator from plugins from gcs_plugin.operators import gcs_to_gcs

任务 6. 查看环境信息

  1. 返回 Composer 查看环境状态。

  2. 创建环境后,点击环境名称可查看其详细信息。

环境详情页面会提供 Airflow 网页界面网址、Google Kubernetes Engine 集群 ID、与 DAG 文件夹关联的 Cloud Storage 存储桶的名称等信息。

“环境配置”页面

注意:Cloud Composer 使用Cloud Storage 存储 Apache Airflow DAG,也称为“工作流”。每个环境都有一个关联的 Cloud Storage 存储桶。Cloud Composer 仅安排 Cloud Storage 存储桶中的 DAG。

后续步骤应在 Cloud Shell 中完成。

创建虚拟环境

Python 虚拟环境用于将软件包安装与系统隔离开来。

  1. 安装 virtualenv 环境:
sudo apt-get install -y virtualenv
  1. 构建虚拟环境:
python3 -m venv venv
  1. 激活虚拟环境。
source venv/bin/activate

任务 7. 为 DAG Cloud Storage 存储桶创建一个变量

  • 在 Cloud Shell 中,运行以下命令以从“环境详情”页面复制 DAG 存储桶的名称,并设置一个变量以在 Cloud Shell 中引用该名称:
注意:请务必在以下命令中替换您的 DAG 存储桶名称。前往导航菜单 > Cloud Storage,该名称将类似于 -composer-advanced-YOURDAGSBUCKET-bucket DAGS_BUCKET=<your DAGs bucket name>

在本实验中,您将多次使用此变量。

任务 8. 设置 Airflow 变量

Airflow 变量是 Airflow 特有的一个概念,此类变量不同于环境变量。在此步骤中,您将设置我们要部署的 DAG 使用的以下三个 Airflow 变量table_list_file_pathgcs_source_bucketgcs_dest_bucket

详细信息
table_list_file_path /home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv 列出源表和目标表的 CSV 文件(包括数据集)
gcs_source_bucket {UNIQUE ID}-us 用于从来源导出 BigQuery 表 tabledest_bbucks 的 Cloud Storage 存储桶
gcs_dest_bucket {UNIQUE ID}-eu 用于在目标位置导入 BigQuery 表的 Cloud Storage 存储桶

下一个 gcloud composer 命令会执行 Airflow CLI 子命令 variables。该子命令会将参数传递给 gcloud 命令行工具。

如需设置这三个变量,需要针对上表中的每一行运行一次 composer command。该命令的形式如下:

gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION variables -- \ set KEY VALUE 您可放心忽略以下 gcloud 错误:(ERROR: gcloud crashed (TypeError): 'NoneType' object is not callable)。这是将 gcloud composer environments run与 410.0.0 版 gcloud 一起使用时会出现的已知问题。尽管会显示错误消息,但您的变量仍会相应得到设置。
  • ENVIRONMENT_NAME 是环境的名称。
  • LOCATION 是环境所在的 Compute Engine 区域。在运行 gcloud 命令之前,gcloud composer 命令需要包含 --location 标志或设置默认位置
  • KEYVALUE 用于指定要设置的变量及变量值。在左侧包含 gcloud 相关参数的 gcloud 命令与右侧包含 Airflow 子命令相关参数之间添加一个空格两个短划线一个空格 ( -- )。此外,在 KEYVALUE 参数之间添加一个空格。使用 gcloud composer environments run 命令,并在其中使用变量子命令

在 Cloud Shell 中运行以下命令,将 gcs_source_bucketgcs_dest_bucket 替换为您在任务 2 中创建的存储桶的名称。

gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set table_list_file_path /home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set gcs_source_bucket {UNIQUE ID}-us gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set gcs_dest_bucket {UNIQUE_ID}-eu

如需查看变量的值,请运行带 get 参数的 Airflow CLI 子命令 variables,或使用 Airflow 界面

例如,运行以下命令:

gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ get gcs_source_bucket 注意:请务必设置 DAG 使用的所有三个 Airflow 变量。

任务 9. 将 DAG 及其依赖项上传到 Cloud Storage

  1. 将 Google Cloud Python 文档示例文件复制到 Cloud Shell 中:
cd ~ gcloud storage cp -r gs://spls/gsp283/python-docs-samples .
  1. 将第三方钩子和执行器的副本上传到 Composer DAG 中 Cloud Storage 存储桶的“plugins”文件夹:
gcloud storage cp -r python-docs-samples/third_party/apache-airflow/plugins/* gs://$DAGS_BUCKET/plugins
  1. 接下来,将 DAG 和配置文件上传到环境的 DAG Cloud Storage 存储桶:
gcloud storage cp python-docs-samples/composer/workflows/bq_copy_across_locations.py gs://$DAGS_BUCKET/dags gcloud storage cp python-docs-samples/composer/workflows/bq_copy_eu_to_us_sample.csv gs://$DAGS_BUCKET/dags

Cloud Composer 会在您的 Airflow 环境中自动注册 DAG,然后 DAG 会在 3-5 分钟内发生更改。您可以在 Airflow 网页界面中查看任务状态,并确认 DAG 未按设置进行调度。

任务 10. 浏览 Airflow 界面

如需使用 Cloud 控制台访问 Airflow 网页界面,请执行以下操作:

  1. 返回 Composer 环境页面。
  2. 在环境对应的 Airflow Web 服务器列中,点击 Airflow 链接。

Airflow 链接已在 Airflow Web 服务器列中突出显示

  1. 点击您的实验凭据。
  2. Airflow 网页界面会在新的浏览器窗口中打开。在您到达此页面时,系统将仍在加载数据。在等待期间,您可以继续该实验。

查看变量

您之前设置的变量会保留在您的环境中。

  • 从 Airflow 菜单栏中选择 Admin > Variables,以查看变量。

“Variables”(变量)页面

手动触发 DAG 运行

  1. 点击 DAG 标签页,然后等待链接完成加载。

  2. 如需手动触发 DAG,请点击 composer_sample_bq_copy_across_locations 对应的执行按钮:

“触发 DAG”按钮

  1. 点击触发 DAG,确认执行此操作。

点击检查我的进度以验证是否完成了以下目标:

将 DAG 及其依赖项上传到 Cloud Storage

探索 DAG 运行

当您将 DAG 文件上传到 Cloud Storage 中的 DAG 文件夹时,Cloud Composer 会解析该文件。如果未发现任何错误,工作流的名称会显示在 DAG 列表中,并且如果满足调度条件(本例设置为“无”),工作流会排入队列以立即运行。

按下执行按钮后,运行状态会变为绿色:

绿色 DAG 运行状态

  1. 点击 DAG 的名称,打开 DAG 详情页面。此页面包含工作流任务和依赖项的图形表示。

DAG 树状视图

  1. 现在,在工具栏中点击图表,然后将鼠标悬停在每个任务对应的图形上,查看其状态。请注意,每个任务的边框也指示状态(绿色边框表示任务正在运行中;红色边框表示任务失败,等等)。

DAG 图表视图

如需从图表视图重新运行工作流,请执行以下操作:

  1. 在 Airflow 界面的“图表视图”中,点击启动图形。
  2. 点击清除重置所有任务,然后点击 OK 进行确认。

启动对话框

在该流程运行期间,请刷新浏览器以查看最新信息。

任务 11. 验证结果

现在,请转到以下 Cloud 控制台页面,检查工作流的状态和结果:

  • 导出的表已从位于美国的存储桶复制到位于欧盟的 Cloud Storage 存储桶中。点击 Cloud Storage,查看源(美国)和目标(欧盟)存储桶中的中间 Avro 文件。
  • 表列的表已导入目标 BigQuery 数据集。点击 BigQuery,然后点击您的项目名称和 nyc_tlc_EU 数据集,验证是否可以从您创建的数据集访问这些表。

删除 Cloud Composer 环境

  1. 返回 Composer 中的环境页面。

  2. 选择 Composer 环境旁边的复选框。

  3. 点击删除

  4. 再次点击删除以确认弹出式内容。

恭喜!

您已通过编程方式将表从美国复制到欧盟!本实验基于 David Sabater Dinter 的这篇博文

后续步骤

Google Cloud 培训和认证

…可帮助您充分利用 Google Cloud 技术。我们的课程会讲解各项技能与最佳实践,可帮助您迅速上手使用并继续学习更深入的知识。我们提供从基础到高级的全方位培训,并有点播、直播和虚拟三种方式选择,让您可以按照自己的日程安排学习时间。各项认证可以帮助您核实并证明您在 Google Cloud 技术方面的技能与专业知识。

上次更新手册的时间:2024 年 6 月 21 日

上次测试实验的时间:2024 年 6 月 21 日

版权所有 2024 Google LLC 保留所有权利。Google 和 Google 徽标是 Google LLC 的商标。其他所有公司名和产品名可能是其各自相关公司的商标。

此内容目前不可用

一旦可用,我们会通过电子邮件告知您

太好了!

一旦可用,我们会通过电子邮件告知您