arrow_back

Cloud Data Fusion 2.5 でパイプライン グラフを作成して実行する

ログイン 参加
700 以上のラボとコースにアクセス

Cloud Data Fusion 2.5 でパイプライン グラフを作成して実行する

ラボ 2時間 30分 universal_currency_alt クレジット: 5 show_chart 入門
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このチュートリアルでは、Cloud Data Fusion の Wrangler 機能と Data Pipelines 機能を使用して、タクシーの走行データを詳細に分析するためにクリーニング、変換、処理を行う方法について説明します。

学習内容

このラボでは、次の作業を行います。

  • Cloud Data Fusion をいくつかのデータソースに接続する
  • 基本的な変換を適用する
  • 2 つのデータソースを結合する
  • データをシンクに書き込む

はじめに

データを分析して情報を取得するには、多くの場合、事前にいくつかの前処理を行う必要があります。たとえば、データ型の調整や異常値の削除が必要になる場合や、あいまいな識別子を区別しやすい識別子に変換する必要がある場合があります。Cloud Data Fusion は、ETL および ELT のデータ パイプラインを効率的に構築するためのサービスで、Cloud Dataproc クラスタを使用してパイプラインのすべての変換を実行します。

このチュートリアルでは、BigQuery の NYC TLC Taxi Trips データセットのサブセットを使って Cloud Data Fusion の使用方法を説明します。

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

Google Cloud Shell の有効化

Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。

Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

  2. [続行] をクリックします。

環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  • 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list

出力:

Credentialed accounts: - @.com (active)

出力例:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project =

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: gcloud ドキュメントの全文については、 gcloud CLI の概要ガイド をご覧ください。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニュー)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

タスク 1. Cloud Data Fusion インスタンスの作成

Cloud Data Fusion インスタンスの詳細な作成手順については、Cloud Data Fusion インスタンスの作成ガイドをご覧ください。基本的な手順は次のとおりです。

  1. トレーニング環境が適切に設定されるように、Cloud Data Fusion API を停止して再起動する必要があります。Cloud Shell で次のコマンドを実行します。完了するまでに数分かかります。
gcloud services disable datafusion.googleapis.com

オペレーションが正常に完了したことを示すメッセージが出力されます。

次に Cloud Data Fusion API への接続を再起動します。

  1. Google Cloud コンソール上部の検索バーに「Cloud Data Fusion API」と入力します。検索結果の「Cloud Data Fusion API」をクリックします。

  2. 表示されたページで、[有効にする] をクリックします。

  3. API が再度有効になると、ページが更新され、API を無効にするオプションと、API の使用状況やパフォーマンスについての詳細が表示されます。

  4. ナビゲーション メニューで、[Data Fusion] を選択します。

  5. [インスタンスを作成] をクリックして、Cloud Data Fusion インスタンスを作成します。

  6. インスタンスの名前を入力します。

  7. エディションは [Basic] を選択します。

  8. [認可] セクションで [権限を付与] をクリックします。

  9. その他の設定はすべてデフォルトのままにして、[作成] をクリックします。

注: インスタンスの作成には 15 分ほどかかります。
  1. インスタンスが作成されたら、最後のステップとして、インスタンスに関連付けられたサービス アカウントにプロジェクトの権限を付与する必要があります。インスタンス名をクリックして、インスタンスの詳細ページに移動します。

  2. サービス アカウントをクリップボードにコピーします。

  3. GCP コンソールで、[IAM と管理] > [IAM] に移動します。

  4. IAM の権限ページで、[+アクセスを許可] をクリックし、先ほどコピーしたサービス アカウントを新しいプリンシパルとして追加して、Cloud Data Fusion API サービス エージェントのロールを付与します。

  1. [保存] をクリックします。

タスク 2. データの読み込み

Cloud Data Fusion インスタンスの準備ができたら Cloud Data Fusion を使い始めることができます。ただし、Cloud Data Fusion にデータを取り込む前にいくつかの準備作業を行う必要があります。

  1. この例では、ストレージ バケットからデータが読み取られるため、Cloud Shell コンソールで次のコマンドを実行します。これにより、新しいバケットが作成され、関連するデータがコピーされます。
export BUCKET=$GOOGLE_CLOUD_PROJECT gcloud storage buckets create gs://$BUCKET gcloud storage cp gs://cloud-training/OCBL017/ny-taxi-2018-sample.csv gs://$BUCKET 注: 作成されるバケットの名前は、現在のプロジェクト ID です。
  1. コマンドラインで次のコマンドを実行し、Cloud Data Fusion で作成される一時的なストレージ アイテム用のバケットを作成します。
gcloud storage buckets create gs://$BUCKET-temp 注: 作成されるバケットの名前は、プロジェクト ID の末尾に「-temp」を付けた名前になります。
  1. Data Fusion インスタンス ページかインスタンスの詳細ページにある [インスタンスを表示] リンクをクリックします。ユーザー名をクリックします。サービスのガイドに進むダイアログが表示された場合は [No, Thanks] をクリックします。これで Cloud Data Fusion UI が表示されるようになります。
注: ページの読み込みが遅いときは、Cloud Fusion UI ページを再読み込みまたは更新してください。
  1. Wrangler はインタラクティブなビジュアル ツールです。データセット全体に大規模な並列処理ジョブをディスパッチする前に、データの小規模なサブセットで変換の効果を確認できます。Cloud Data Fusion UI で、[Wrangler] を選択します。左側に、Cloud Storage 接続など、データへの事前構成の接続が表示されたパネルがあります。

  2. [GCS] の [Cloud Storage Default] を選択します。

  3. プロジェクト名に対応するバケットをクリックします。

  4. [ny-taxi-2018-sample.csv] を選択します。データが行と列の形式で Wrangler の画面に読み込まれます。

  5. [Parsing Options] ウィンドウで、[Use First Row as Header] を True に設定します。データが複数の列に分割されます。

  6. [Confirm] をクリックします。

タスク 3. データのクリーニング

次に、いくつかの変換を実行してタクシーデータの解析とクリーニングを行います。

  1. trip_distance 列の隣にある矢印をクリックし、[Change data type] を選択して、[Float] をクリックします。同じ手順を total_amount 列に対して繰り返します。

  2. pickup_location_id 列の隣にある矢印をクリックし、[Change data type] を選択して、[String] をクリックします。

  3. データを細かく見ると異常値(負の値の移動距離など)が見つかることがあります。このような負の値は Wrangler ではフィルタで除外できます。trip_distance 列の隣にある矢印をクリックし、[Filter] を選択します。[If] の後で [Custom condition] を選択し、「>0.0」と入力します。

  1. [Apply] をクリックします。

タスク 4. パイプラインの作成

基本的なデータ クレンジングが完了し、データのサブセットに対して変換を実行しました。これで、すべてのデータに対して変換を行うバッチ パイプラインを作成できます。

Cloud Data Fusion を使用すると、視覚的に構築されたパイプラインを Apache Spark または MapReduce のプログラムに変換して、Cloud Dataproc のエフェメラル クラスタで並行して変換を実施できます。これにより、インフラストラクチャやテクノロジーに煩わされることなく、スケーラブルかつ信頼性の高い方法で、膨大な量のデータに対して複雑な変換を簡単に行うことができます。

  1. Cloud Data Fusion UI の右上から [Create a Pipeline] をクリックします。

  2. 表示されたダイアログで、[Batch pipeline] を選択します。

  3. Data Pipelines UI で、Wrangler ノードに接続した GCSFile ソースノードが表示されます。Wrangler ノードには、Wrangler ビューで適用した変換がすべて含まれており、ディレクティブ形式でキャプチャされています。Wrangler ノードにカーソルを合わせて [Properties] を選択します。

  1. この段階で [Wrangle] ボタンをクリックすると、さらに変換を適用できます。列名の隣にある赤いゴミ箱アイコンを押して extra 列を削除します。右上にある [Validate] をクリックして、エラーがないか確認します。右上にある [X] ボタンをクリックして Wrangler ツールを閉じます。

タスク 5: データソースの追加

タクシーデータには、そのままではアナリストがすぐに理解できない難読な列がいくつか含まれています(pickup_location_id など)。そのため、パイプラインにデータソースを追加して、pickup_location_id 列を該当する場所の名前にマッピングします。マッピング情報は BigQuery テーブルに保存します。

  1. 別のタブで、Cloud コンソールの BigQuery UI を開きます。[Cloud コンソールの BigQuery へようこそ] ページで [完了] をクリックします。

  2. BigQuery UI の [エクスプローラ] セクションで GCP Project ID(qwiklabs で始まるもの)の横のその他メニューをクリックします。

  3. 表示されたメニューから [データセットを作成] をクリックします。

  4. [データセット ID] フィールドに「trips」と入力します。

  5. [データセットを作成] をクリックします。

  6. 新しく作成したデータセットに目的のテーブルを作成するには、[展開] > [クエリ設定] に移動します。これにより、作成したテーブルに Cloud Data Fusion からアクセスできるようになります。

  7. [クエリ結果の宛先テーブルを設定する] を選択します。[データセット] に「trips」と入力し、表示されたデータセットをプルダウン メニューから選択します。[テーブル ID] に「zone_id_mapping」と入力します。[保存] をクリックします。

  1. クエリエディタに次のクエリを入力し、[実行] をクリックします。
SELECT zone_id, zone_name, borough FROM `bigquery-public-data.new_york_taxi_trips.taxi_zone_geom`

このテーブルには、zone_id からゾーンの名前や区画へのマッピング情報が含まれています。

  1. 次に、パイプラインにソースを追加して、この BigQuery テーブルにアクセスできるようにします。Cloud Data Fusion を開いたタブに戻り、左側にあるプラグイン パレットの [ソース] セクションで [BigQuery] ソースを選択します。BigQuery ソースノードが他の 2 つのノードとともにキャンバスに表示されます。

  2. 新しい BigQuery ソースノードにカーソルを合わせ、[Properties] をクリックします。

  3. [Reference Name] は「zone_mapping」と入力して構成します。これは、このデータソースを系統立ててとらえるうえで使用されます。

  1. BigQuery の [Dataset] と [Table] は、BigQuery で設定したデータセットとテーブル(前の手順で作成した tripszone_id_mapping)で構成します。[Temporary Bucket Name] には、プロジェクト名の末尾に「-temp」を付けた名前を入力します。これは、タスク 2 で作成したバケットの名前です。

  1. [Get Schema] をクリックして、このテーブルのスキーマを BigQuery から反映します。ウィザードの右側にフィールドが表示されます。

  2. 右上にある [Validate] をクリックして、エラーがないか確認します。右上にある [X] ボタンをクリックして [BigQuery Properties] ウィンドウを閉じます。

タスク 6: 2 つのソースの結合

これで、2 つのデータソース(タクシーの走行データとゾーン名)を結合して、より有意な出力を生成できるようになりました。

  1. プラグイン パレットの [Analytics] セクションで [Joiner] を選択します。キャンバスに Joiner ノードが表示されます。

  2. ソースノードの右端にある接続矢印 [>] を宛先ノードにドラッグ&ドロップして、Wrangler ノードと BigQuery ノードを Joiner ノードに接続します。

  1. 次のとおり、Joiner ノードを構成します(SQL JOIN 構文に似ています)。
  • [Joiner] の [Properties] をクリックします。

  • ラベルは Joiner のままにします。

  • [Join Type] を [Inner] に変更します。

  • Wrangler ノードの pickup_location_id 列が BigQuery ノードの zone_id 列と結合されるように、[Join Condition] を設定します。

  • [Get Schema] をクリックして、結合後のスキーマを生成します。

  • 右側の [Output Schema] テーブルで、zone_id フィールドと pickup_location_id フィールドを削除します。赤いゴミ箱アイコンをクリックすると削除できます。

  • 右上にある [Validate] をクリックして、エラーがないか確認します。右上にある [X] ボタンをクリックしてウィンドウを閉じます。

タスク 7. BigQuery への出力の保存

パイプラインの結果を BigQuery テーブルに保存します。データを格納する場所をシンクといいます。

  1. プラグイン パレットの [Sink] セクションで [BigQuery] を選択します。

  2. Joiner ノードを BigQuery ノードに接続します。ソースノードの右端から接続矢印 > をドラッグして宛先ノードでドロップします。

  1. BigQuery2 ノードの上にカーソルを合わせ、[Properties] をクリックして開きます。次に、ノードを次に示すように構成します。既存のBigQuery ソースと同様な構成を使用します。[Reference Name] フィールドに「bq_insert」、[Dataset] に「trips」と入力し、[Temporary Bucket Name] にはプロジェクト名の末尾に「-temp」を付けたものを入力します。このパイプラインの実行により作成される新しいテーブルが書き込み先となります。このテーブルの名前として、[Table] フィールドに「trips_pickup_name」と入力します。

  2. 右上にある [Validate] をクリックして、エラーがないか確認します。右上にある [X] ボタンをクリックしてウィンドウを閉じます。

タスク 8. パイプラインのデプロイと実行

最初のパイプラインの作成が完了したので、次はパイプラインをデプロイして実行します。

  1. Data Fusion UI の左上でパイプラインの名前を入力し、[保存] をクリックします。

  1. 次に、パイプラインをデプロイします。ページの右上で [Deploy] をクリックします。

  1. 次の画面で [Run] をクリックしてデータの処理を開始します。

パイプラインを実行すると、Cloud Data Fusion は Cloud Dataproc のエフェメラル クラスタをプロビジョニングし、パイプラインを実行してからクラスタを破棄します。これには数分かかることがあります。その間に、パイプラインのステータスが [Provisioning] から [Starting]、[Starting] から [Running]、[Succeeded] に変わるのを確認できます。

注: パイプラインが [succeeded] に移行するまでに 10~15 分かかる場合があります。

タスク 9. 結果の表示

パイプラインの実行後、次の手順で結果を確認します。

  • BigQuery を開いたタブに戻ります。次のクエリを実行して trips_pickup_name テーブルの値を確認します。

    SELECT * FROM `trips.trips_pickup_name`

    BigQuery の結果

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。