arrow_back

Cloud Composer:複製不同位置的 BigQuery 資料表

登录 加入
欢迎加入我们的社区,一起测试和分享您的知识!
done
学习 700 多个动手实验和课程并获得相关技能徽章

Cloud Composer:複製不同位置的 BigQuery 資料表

实验 1 小时 15 分钟 universal_currency_alt 5 积分 show_chart 中级
info 此实验可能会提供 AI 工具来支持您学习。
欢迎加入我们的社区,一起测试和分享您的知识!
done
学习 700 多个动手实验和课程并获得相关技能徽章

GSP283

Google Cloud 自修研究室標誌

總覽

假設您有資料集位於世界各地的不同位置,且您有資料放在 Google Cloud Storage bucket 或 BigQuery 資料表中。您要如何整理這些資料,以便貴公司整合及分析資料,取得洞察結果?

Cloud Composer 具備直觀的圖形檢視畫面,可助您建立工作流程,並在不同區域和儲存系統間移動資料。除了上述優點外,這項工具還提供範本,可以輕鬆可靠的在 BigQuery 和 Cloud Storage 間雙向轉移資料。

在本實驗室中,您將完成下列工作,在 Cloud Composer 中建立並執行 Apache Airflow 工作流程。

  • 查看設定檔,該檔案含有要複製的資料表清單
  • 將位於美國的 BigQuery 資料集內的資料表清單匯出至 Cloud Storage
  • 將從美國 bucket 匯出的資料表複製到歐盟地區的 Cloud Storage bucket
  • 將資料表清單匯入歐盟地區的目標 BigQuery 資料集

DAG 圖表檢視畫面

學習內容

在本實驗室中,您將瞭解如何執行下列工作:

  • 建立 Cloud Composer 環境
  • 建立 Cloud Storage bucket
  • 建立 BigQuery 資料集
  • 在 Cloud Composer 中建立並執行 Apache Airflow 工作流程,在 Cloud Storage 和 BigQuery 間移動資料

設定和需求

點選「Start Lab」按鈕前的須知事項

請詳閱以下操作說明。研究室活動會計時,而且中途無法暫停。點選「Start Lab」 後就會開始計時,讓您瞭解有多少時間可以使用 Google Cloud 資源。

您將在真正的雲端環境中完成實作研究室活動,而不是在模擬或示範環境。為達此目的,我們會提供新的暫時憑證,讓您用來在研究室活動期間登入及存取 Google Cloud。

如要完成這個研究室活動,請先確認:

  • 您可以使用標準的網際網路瀏覽器 (Chrome 瀏覽器為佳)。
注意:請使用無痕模式或私密瀏覽視窗執行此研究室。這可以防止個人帳戶和學生帳戶之間的衝突,避免個人帳戶產生額外費用。
  • 是時候完成研究室活動了!別忘了,活動一開始將無法暫停。
注意:如果您擁有個人 Google Cloud 帳戶或專案,請勿用於本研究室,以免產生額外費用。

如何開始研究室及登入 Google Cloud 控制台

  1. 按一下「Start Lab」(開始研究室) 按鈕。如果研究室會產生費用,畫面中會出現選擇付款方式的彈出式視窗。左側的「Lab Details」窗格會顯示下列項目:

    • 「Open Google Cloud console」按鈕
    • 剩餘時間
    • 必須在這個研究室中使用的暫時憑證
    • 完成這個實驗室所需的其他資訊 (如有)
  2. 點選「Open Google Cloud console」;如果使用 Chrome 瀏覽器,也能按一下滑鼠右鍵,然後選取「在無痕式視窗中開啟連結」

    接著,實驗室會啟動相關資源並開啟另一個分頁,當中顯示「登入」頁面。

    提示:您可以在不同的視窗中並排開啟分頁。

    注意:如果頁面中顯示「選擇帳戶」對話方塊,請點選「使用其他帳戶」
  3. 如有必要,請將下方的 Username 貼到「登入」對話方塊。

    {{{user_0.username | "Username"}}}

    您也可以在「Lab Details」窗格找到 Username

  4. 點選「下一步」

  5. 複製下方的 Password,並貼到「歡迎使用」對話方塊。

    {{{user_0.password | "Password"}}}

    您也可以在「Lab Details」窗格找到 Password

  6. 點選「下一步」

    重要事項:請務必使用實驗室提供的憑證,而非自己的 Google Cloud 帳戶憑證。 注意:如果使用自己的 Google Cloud 帳戶來進行這個實驗室,可能會產生額外費用。
  7. 按過後續的所有頁面:

    • 接受條款及細則。
    • 由於這是臨時帳戶,請勿新增救援選項或雙重驗證機制。
    • 請勿申請免費試用。

Google Cloud 控制台稍後會在這個分頁開啟。

注意:如要查看列出 Google Cloud 產品和服務的選單,請點選左上角的「導覽選單」「導覽選單」圖示

啟動 Cloud Shell

Cloud Shell 是搭載多項開發工具的虛擬機器,提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作。Cloud Shell 提供指令列存取權,方便您使用 Google Cloud 資源。

  1. 點按 Google Cloud 控制台上方的「啟用 Cloud Shell」圖示 「啟動 Cloud Shell」圖示

連線完成即代表已通過驗證,且專案已設為您的 PROJECT_ID。輸出內容中有一行宣告本工作階段 PROJECT_ID 的文字:

您在本工作階段中的 Cloud Platform 專案會設為「YOUR_PROJECT_ID」

gcloud 是 Google Cloud 的指令列工具,已預先安裝於 Cloud Shell,並支援 Tab 鍵自動完成功能。

  1. (選用) 您可以執行下列指令來列出使用中的帳戶:
gcloud auth list
  1. 點按「授權」

  2. 輸出畫面應如下所示:

輸出內容:

ACTIVE: * ACCOUNT: student-01-xxxxxxxxxxxx@qwiklabs.net To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (選用) 您可以使用下列指令來列出專案 ID:
gcloud config list project

輸出內容:

[core] project = <project_ID>

輸出內容範例:

[core] project = qwiklabs-gcp-44776a13dea667a6 附註:如需有關 gcloud 的完整說明,請前往 Google Cloud 並參閱「gcloud CLI overview guide」(gcloud CLI 總覽指南)。

工作 1:建立 Cloud Composer 環境

  1. 前往 Google Cloud 控制台,在標題列的「搜尋」欄位輸入 Composer,然後點選「產品和頁面」中的「Composer」來建立 Cloud Composer 環境。

  2. 接著點選「建立環境」

  3. 從下拉式選單中選取「Composer 3」

  4. 為環境設定下列參數:

  • 名稱:composer-advanced-lab

  • 位置:

  • 映像檔版本:composer-3-airflow-n.n.n-build.n (選擇最新可用的映像檔版本)

  • 在「環境變數」下,選擇「小」

  • 點選「顯示進階設定」下拉式選單,然後選取「Airflow 資料庫可用區」做為 .

將其他設定保留預設值。

  1. 點選「建立」

如果在 Cloud 控制台的「環境」頁面的環境名稱左側顯示綠色勾號,表示環境已建立完成。

注意:環境最多可能需要 20 分鐘才能完成設定程序。前往下一節 Create Cloud Storage buckets and BigQuery destination dataset

點選「Check my progress」確認目標已達成。

建立 Cloud Composer 環境。

工作 2:建立 Cloud Storage bucket

在這項工作中,您將建立兩個 Cloud Storage 多區域 bucket。這些 bucket 會用於在地區間複製匯出的資料表,例如從美國複製到歐盟地區。

建立位於美國的 bucket

  1. 依序前往「Cloud Storage」>「Bucket」,然後按一下「建立」
  2. 使用通用的不重複名稱來命名 bucket,包括專案 ID (例如 -us)。
  3. 在「位置類型」部分,選取「美國 (多個美國地區)」
  4. 將其他設定保留預設值,然後按一下「建立」
  5. 勾選「強制禁止公開存取這個值區」方塊,如果顯示「系統會禁止公開存取」彈出式視窗,請點選「確認」

建立位於歐盟地區的 bucket

重複上述步驟,建立另一個位於 EU 區域的 bucket。通用不重複名稱應將位置附在 bucket 結尾 (例如 -eu)。

點選「Check my progress」確認目標已達成。

建立兩個 Cloud Storage bucket。

工作 3:建立 BigQuery 目的地資料集

  1. 從 BigQuery 新網頁版 UI 建立歐盟地區的目的地 BigQuery 資料集。

  2. 依序前往「導覽選單」>「BigQuery」

接著,畫面中會顯示「歡迎使用 Cloud 控制台中的 BigQuery」訊息方塊,當中會列出快速入門指南的連結和使用者介面更新內容。

  1. 點選「完成」

  2. 接著按一下 Qwiklabs 專案 ID 旁的三點圖示,然後選取「建立資料集」

醒目顯示的「建立資料集」選項

  1. 使用資料集 ID nyc_tlc_EU,在「位置類型」部分選取「多區域,然後從下拉式選單選取「歐盟」

資料集 ID 文字欄位和「資料位置」下拉式選單

  1. 點選「建立資料集」

點選「Check my progress」確認目標已達成。

建立資料集。

工作 4:Airflow 與核心概念簡介

  • 在環境建立期間,請查看您將在本實驗室中使用的範例檔案。

Airflow 這個平台能以程式輔助方式建立、安排及監控工作流程。

這項服務可將工作流程建立為工作的有向非循環圖 (DAG)。Airflow 排程器會執行工作站陣列內的工作,同時追蹤指定的依附元件。

核心概念

DAG:有向無循環圖是工作的集合,用於反映工作之間的關係和依附元件。

運算子:單一工作的說明,通常為小型工作。舉例來說,「BashOperator」是用於執行 Bash 指令。

工作:運算子的參數化執行個體,在 DAG 中為節點。

工作執行個體:工作的特定執行作業,特性為 DAG、工作和時間點。它含有指示性的狀態:「執行中」、「成功」「失敗」、、「略過」...

詳情請參閱概念說明文件

工作 5:定義工作流程

Cloud Composer 工作流程是由 DAG (有向非循環圖) 組成。bq_copy_across_locations.py 中的程式碼是工作流程的程式碼,也可視為 DAG。現在開啟檔案,查看它的建構方式。接下來要詳細介紹該檔案中的許多重要元件。

為了自動化調度管理所有工作流程工作,DAG 會匯入下列運算子:

  1. DummyOperator:建立「開始」和「結束」虛擬工作,以便讓 DAG 呈現更佳的圖像結果。
  2. BigQueryToCloudStorageOperator:使用 Avro 格式將 BigQuery 資料表匯出到 Cloud Storage bucket。
  3. GoogleCloudStorageToGoogleCloudStorageOperator:在 Cloud Storage bucket 間複製檔案。
  4. GoogleCloudStorageToBigQueryOperator:從 Cloud Storage bucket 中的 Avro 檔案匯入資料表。
  • 在本範例中定義的 read_table_list() 函式是用來讀取設定檔,並建立要複製的資料表清單:
# -------------------------------------------------------------------------------- # Functions # -------------------------------------------------------------------------------- def read_table_list(table_list_file): """ Reads the master CSV file that will help in creating Airflow tasks in the DAG dynamically. :param table_list_file: (String) The file location of the master file, e.g. '/home/airflow/framework/master.csv' :return master_record_all: (List) List of Python dictionaries containing the information for a single row in master CSV file. """ master_record_all = [] logger.info('Reading table_list_file from : %s' % str(table_list_file)) try: with open(table_list_file, 'rb') as csv_file: csv_reader = csv.reader(csv_file) next(csv_reader) # skip the headers for row in csv_reader: logger.info(row) master_record = { 'table_source': row[0], 'table_dest': row[1] } master_record_all.append(master_record) return master_record_all except IOError as e: logger.error('Error opening table_list_file %s: ' % str( table_list_file), e)
  • DAG 的名稱為 bq_copy_us_to_eu_01,根據預設,系統不會排定 DAG,因此您需要手動觸發。
default_args = { 'owner': 'airflow', 'start_date': datetime.today(), 'depends_on_past': False, 'email': [''], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # DAG object. with models.DAG('bq_copy_us_to_eu_01', default_args=default_args, schedule_interval=None) as dag:
  • 如要定義 Cloud Storage 外掛程式,需定義 Cloud StoragePlugin(AirflowPlugin) 類別、對應從 Airflow 1.10 穩定分支版本下載的掛鉤和運算子。
# Import operator from plugins from gcs_plugin.operators import gcs_to_gcs

工作 6:查看環境資訊

  1. 返回「Composer」,確認環境狀態。

  2. 環境建立後,按一下環境名稱即可查看詳細資料。

「環境詳細資料」頁面有各種資訊,例如 Airflow 網路使用者介面網址、Google Kubernetes Engine 叢集 ID、連線至 DAG 資料夾的 Cloud Storage bucket。

環境設定頁面

注意:Cloud Composer 使用 Cloud Storage 儲存 Apache Airflow DAG,亦即「工作流程」。每個環境都有相關聯的 Cloud Storage bucket。Cloud Composer 只會為 Cloud Storage bucket 中的 DAG 排程。

下一個步驟會在 Cloud Shell 中完成。

建立虛擬環境

Python 虛擬環境可用來獨立安裝套件,將其與系統區隔開來。

  1. 安裝 virtualenv 環境:
sudo apt-get install -y virtualenv
  1. 建構虛擬環境:
python3 -m venv venv
  1. 啟用虛擬環境:
source venv/bin/activate

工作 7:為 DAG Cloud Storage bucket 建立變數

  • 在 Cloud Shell 中執行下列指令,從「環境詳細資料」頁面複製 DAG bucket 名稱,然後設定變數,在 Cloud Shell 中參照該名稱:
注意:請務必更改下列指令中的 bucket 名稱。依序前往「導覽選單」>「Cloud Storage」,名稱會類似 -composer-advanced-YOURDAGSBUCKET-bucket DAGS_BUCKET=<your DAGs bucket name>

本實驗室會多次使用這個變數。

工作 8:設定 Airflow 變數

Airflow 變數是 Airflow 的專屬概念,與環境變數不同。在本步驟中,您將設定三個 Airflow 變數,供我們要部署的 DAG 使用:table_list_file_pathgcs_source_bucketgcs_dest_bucket

詳細資料
table_list_file_path /home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv 列出來源和目標資料表 (包括資料集) 的 CSV 檔案
gcs_source_bucket {UNIQUE ID}-us 用於匯出 BigQuery 資料表「dest_bbucks」的來源 Cloud Storage bucket
gcs_dest_bucket {UNIQUE ID}-eu 用於匯入 BigQuery 資料表目的地的 Cloud Storage bucket

下一個 gcloud composer 指令會執行 Airflow CLI 子指令變數。子指令會將引數傳遞至 gcloud 指令列工具。

如要設定三個變數,您要為上述資料表的每一列執行一次 composer 指令。指令格式如下:

gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION variables -- \ set KEY VALUE 您可以放心忽略這個 gcloud 錯誤:(ERROR: gcloud crashed (TypeError): 'NoneType' object is not callable). 此為透過 gcloud 410.0.0 版使用 gcloud composer environments run 會發生的已知問題。即使出現錯誤訊息,變數依然會照常設定完畢。
  • ENVIRONMENT_NAME 是環境的名稱。
  • LOCATION 是環境所在的 Compute Engine 區域。執行 gcloud 指令前,gcloud composer 指令必須加入 --location 標記或設定預設位置
  • KEYVALUE 指定變數和其設定值。在左側含有 gcloud 相關引數的 gcloud 指令,以及右側的 Airflow 子指令相關引數間,依序加入空格、兩個破折號、空格 ( -- )。此外,如果使用 gcloud composer environments run 指令和變數子指令,在 KEYVALUE 引數間也要加入空格。

在 Cloud Shell 中執行下列指令,用您在工作 2 建立的 bucket 名稱取代 gcs_source_bucketgcs_dest_bucket

gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set table_list_file_path /home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set gcs_source_bucket {UNIQUE ID}-us gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set gcs_dest_bucket {UNIQUE_ID}-eu

如要查看變數的值,請透過 get 引數執行 Airflow CLI 子指令變數,或使用 Airflow 使用者介面

例如執行下列指令:

gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ get gcs_source_bucket 注意:請務必設定 DAG 要使用的三個 Airflow 變數。.

工作 9:將 DAG 和依附元件上傳至 Cloud Storage

  1. 將 Google Cloud Python 文件範例檔複製到 Cloud shell:
cd ~ gcloud storage cp -r gs://spls/gsp283/python-docs-samples .
  1. 將複製的第三方掛鉤和運算子,上傳到 Composer DAG Cloud Storage bucket 的外掛程式資料夾。
gcloud storage cp -r python-docs-samples/third_party/apache-airflow/plugins/* gs://$DAGS_BUCKET/plugins
  1. 接下來,將 DAG 和設定檔上傳到環境的 Cloud Storage bucket:
gcloud storage cp python-docs-samples/composer/workflows/bq_copy_across_locations.py gs://$DAGS_BUCKET/dags gcloud storage cp python-docs-samples/composer/workflows/bq_copy_eu_to_us_sample.csv gs://$DAGS_BUCKET/dags

Cloud Composer 會自動在 Airflow 環境註冊 DAG,DAG 會在 3 到 5 分鐘內產生變化。您可以在 Airflow 的網頁介面中查看工作狀態,並確認 DAG 是否並未依據設定排程。

工作 10:探索 Airflow 使用者介面

如要使用 Cloud 控制台存取 Airflow 網頁介面:

  1. 返回 Composer 的「環境」頁面。
  2. 在環境中的「Airflow 網路伺服器」欄位,按一下「Airflow」連結。

在「Airflow 網路伺服器」欄位中顯目顯示的「Airflow」連結

  1. 按一下實驗室憑證。
  2. Airflow 的網頁使用者介面會在新瀏覽器視窗中開啟。使用者介面開啟時,資料還在載入中。您可以在資料載入期間繼續使用實驗室。

查看變數

您先前設定的變數會保留在環境中。

  • 從 Airflow 選單列中依序選取「管理」>「變數」,即可查看變數。

變數頁面

以手動方式觸發 DAG 執行作業

  1. 按一下「DAG」分頁標籤,然後等待連結載入完成。

  2. 如要以手動方式觸發 DAG,請按一下 composer_sample_bq_copy_across_locations 的執行按鈕:

「觸發 DAG」按鈕

  1. 按一下「觸發 DAG」以確認這個動作。

點選「Check my progress」確認目標已達成。

將 DAG 和依附元件上傳至 Cloud Storage

瞭解 DAG 執行作業

當您將 DAG 檔案上傳到 Cloud Storage 中的 DAG 資料夾後,Cloud Composer 會剖析檔案。如果沒有發現錯誤,工作流程名稱會顯示在 DAG 清單中,如果符合排程條件,該工作流程會排入要立即執行的作業佇列,這時的設定依據為「無」。

按下執行按鈕後,「執行作業」狀態會顯示為綠色:

綠色的 DAG 執行作業狀態

  1. 按一下 DAG 名稱,開啟 DAG 詳細資料頁面。此頁面會透過圖形呈現工作流程工作和依附元件。

DAG 樹狀檢視

  1. 接著在工具列中點選「圖表」,然後將滑鼠游標懸停在各工作的圖表上,即可查看工作狀態。請注意,每個工作周圍的框線也會指出狀態,例如綠色框線表示執行中,紅色框線表示執行失敗等。

DAG 圖表檢視畫面

如要再次從「圖表」檢視畫面執行工作流程:

  1. 在 Airflow 使用者介面的「圖表」檢視畫面中,按一下「開始」對話方塊。
  2. 按一下「清除」會重設所有工作,點選「確定」即可確認操作。

「開始」對話方塊

在程序執行時重新整理瀏覽器,查看最新資訊。

工作 11:驗證結果

現在前往 Cloud 控制台的下列頁面,確認工作流程的狀態和結果:

  • 匯出的資料表已從美國 bucket 複製到歐盟地區的 Cloud Storage bucket。按一下「Cloud Storage」,即可查看來源 (美國) 和目的地 (歐盟地區) bucket 的中繼 Avro 檔案。
  • 資料表清單已匯入目標 BigQuery 資料集。按一下「BigQuery」,然後點選專案名稱和「nyc_tlc_EU」資料集,驗證資料表是否可從您建立的資料集存取。

刪除 Cloud Composer 環境

  1. Composer 中回到「Environments」(環境) 頁面。

  2. 勾選 Composer 環境旁的核取方塊。

  3. 點選「DELETE」(刪除)

  4. 在彈出式視窗中,再次點選「DELETE」(刪除) 來確認刪除。

恭喜!

您已透過程式輔助方式,將資料表從美國複製到歐盟地區!本實驗室的參考資料為 David Sabater Dinter 的這篇 網誌文章

後續步驟

Google Cloud 教育訓練與認證

協助您瞭解如何充分運用 Google Cloud 的技術。我們的課程會介紹專業技能和最佳做法,讓您可以快速掌握要領並持續進修。我們提供從基本到進階等級的訓練課程,並有隨選、線上和虛擬課程等選項,方便您抽空參加。認證可協助您驗證及證明自己在 Google Cloud 技術方面的技能和專業知識。

手冊上次更新日期:2024 年 6 月 21 日

實驗室上次測試日期:2024 年 6 月 21 日

Copyright 2024 Google LLC 保留所有權利。Google 和 Google 標誌是 Google LLC 的商標,其他公司和產品名稱則有可能是其關聯公司的商標。

此内容目前不可用

一旦可用,我们会通过电子邮件告知您

太好了!

一旦可用,我们会通过电子邮件告知您