Cloud Composer チュートリアル

  • このエントリーをはてなブックマークに追加

GCP においてジョブ管理・タスク管理を行う Cloud Composer というサービスのチュートリアルです。Cloud Composer は立ち上げて最初に軽く使うだけでもわかりづらいものですので、チュートリアルを書いてみました。

チュートリアル編

Composer コンソールへ

まず、 GCP コンソール にログインする。 画面左上のハンバーガーメニュー (〓みたいなやつ) より、Composer を探してクリックする。

Composer API 有効化

もし Composer を使ったことがない場合は、下記のように Composer API を有効化する。 この画面が表示されない場合はすでに API が有効化されているはずなので次へ。

Composer 初期状態

これが Composer 管理画面の初期状態。 今は Composer を立ち上げていないので何もない。

「作成」ボタンを押下して作成フォームに移動しよう。

Composer 作成フォーム

下記が Composer 作成フォームである。

「名前」は何でもいい。ここでは mycomposer とした。 「ノード数」は最低の 3 とする (1 や 2 はエラーとなる)。 場所とゾーンはどこでもよいが、最も安い us-central1 をおすすめする。 ゾーンは us-central1-a〜f があるがどこでもよい。 スクリーンショットでは省いたが、マシンタイプ・ディスクサイズ等を指定できるが、 未入力の場合それぞれ「n1-standard-1」「100GB」となる。ページの最下部にある「作成」ボタンを押下すると、作成開始である。 ここから先はお金がかかることに注意!!

Composer 作成中

Composer 作成には結構時間がかかる上に、ばらつきがある。 当ページ管理人の観測結果によると 15分だったり 30分だったり 45分だったり。 気長に待ちましょう。

下記が作成中の状態で、”mycomposer” の左でぐるぐる回っている。

Composer 作成完了

Composer が作成完了すると、下記のようになる。

Composer 詳細画面

“mycomposer” の部分をクリックしてみよう。 すると下記のように詳細情報が表示される。

  • GKE Cluster ID とあるように、GKE クラスタが自動的に起動している。
  • DAG フォルダ (Cloud Storage バケット) が自動的に作成されている。

ストレージ

ブラウザの戻るボタンでひとつ戻って、再度下記画面から始めよう。 

“DAGs” の前のフォルダアイコンをクリックすると、下記の GCS 画面に遷移する。

ここに DAG ファイルをアップロードすることで、DAG が登録されるわけである。 今は作成したばかりなので DAG ファイルはひとつも存在しない。

Airflow 画面

再度ブラウザの戻るボタンでひとつ戻って、下記の “Airflow” の直前のアイコンをクリックしてほしい。

すると下記のように Airflow 管理画面が表示される。 DAG 未登録なので、何も表示されていない。

ストレージアップロード

下記コードを bash-hello-world.py として保存し、GCS の DAG フォルダにドラッグ & ドロップでアップロードしてほしい。 ファイル名は hoge.py でも fuga.py でもよいが、拡張子は .py とすること。 .txt などでアップロードすると、永遠に何も起こらない。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
  'start_date': datetime(2018, 10, 1),
}

dag = DAG(dag_id='mydag',
          default_args=default_args,
          schedule_interval = None)

task = BashOperator(task_id='mytask',
                     bash_command='echo Hello World',
                     dag=dag)
task

GCS にアップロードした直後はこうなる。

DAG 登録完了

GCS へのアップロードが完了したら、2〜3分待つと下記のように Airflow 管理画面にて “mydag” が登録されたことを確認できる。 残念ながら画面は自動更新されないため、リロードボタンを押して更新されるのを待ってほしい。

DAG コード確認

Airflow 管理画面で画面右側のアイコンの “Code View” をクリックしてほしい。

下記のようにコード内容を確認できる。

さて、この DAG を実行してみよう。ブラウザの戻るで戻ってよいが、 画面左上の下記アイコン押下でトップ画面に戻ることもできる。

DAG 実行

“Trigger Dag” をクリックすると、確認ダイアログが出る。そこで OK とすると実行開始である。

DAG 実行開始直後

下記は実行開始直後の画面である。画面上部に “mydag” が実行された旨メッセージが表示されている。 また、”DAG Runs” 欄に緑丸で “1” と出ているのは、”running” 状態のジョブが 1つあることを表している。 さらに “Recent Tasks” でグレーで “1” とあるのは、”queued” 状態のタスクが 1つあることを表している。

DAG 実行完了

この DAG は echo するだけなので、すぐに実行が終わる。 画面が自動更新されないことにイライラしつつ、ブラウザのリロードボタンを押すと、 下記のように DAG Runs で success が 1件、Recent Task で success が 1件と表示されている。

Graph View 確認

右側アイコンの左から 3つめの “Graph View” をクリックすると、 下記画面が表示される。

タスク状況確認

この “mytask” にマウスカーソルをあわせると、下記のように実行開始・終了時刻などが表示される。 ここでは 3.12836 秒でタスクが完了したことがわかる。なお、日付は UTC である。 この “mytask” をクリックすると、タスクメニューに遷移する。

タスクメニュー

下記はタスクメニュー画面である。上の1列のボタンはすべてアクションボタン、 下の3列は、一番左だけアクションボタンで、右側のボタンはチェックボックスという、 独創的すぎてなんとも言い難い素晴らしいデザイン (控えめな表現) である。

右上の View Log を押下すると…

下記のように実行時のログが表示される。

ログ確認

ログの最後 (下記、最後から 2行目) には “Hello World” と表示されていることがわかる。

DAG ファイルを修正してアップロード

不正な DAG ファイルの場合どうなるか確認しておこう。 さきほどの bash-hello-world.py の先頭に下記のように “xxx” と書き、Python としてエラーが発生する状態にして、 DAG フォルダにアップロードする。

xxx
from airflow import DAG
from airflow.operators import BashOperator

すると “Broken DAG” としてエラーメッセージが表示される。

より複雑な DAG ファイルをアップロード

下記はツリー構造をちょっと複雑にした DAG ファイルである。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
  'start_date': datetime(2018, 10, 1),
}

dag = DAG(dag_id='task_multi_test',
          default_args=default_args,
          schedule_interval = None)

task1 = BashOperator(task_id='mytask1',
                     bash_command='sleep 60; echo task1',
                     dag=dag)
task2 = BashOperator(task_id='mytask2',
                     bash_command='sleep 60; echo task2',
                     dag=dag)
task3 = BashOperator(task_id='mytask3',
                     bash_command='sleep 10; echo task3',
                     dag=dag)
task4 = BashOperator(task_id='mytask4',
                     bash_command='echo task4',
                     dag=dag)
task5 = BashOperator(task_id='mytask5',
                     bash_command='echo task5',
                     dag=dag)
task1 >> task2
task2 >> task4
task3 >> task4
task4 >> task5

Graph View 確認

下記のように結構わかりやすく表示してくれる。ただ、最初に動くのは mytask1 に見えるが、 実際は mytask1 と mytask3 両方が DAG 実行直後の実行可能タスクとなる。

実行直後

下記のように、mytask1 と mytask3 が実行中となる。

さらにそのあと

mytask3 は sleep 10 なので早々にタスクが終了するが、mytask1 は sleep 60 なので まだ実行中である。mytask3 は終了しているが、mytask1 → mytask2 が終わらないと、 mytask4 は開始されない。

  • このエントリーをはてなブックマークに追加

SNSでもご購読できます。

Leave a Reply

*

CAPTCHA