Cloud Composer FAQ

  • このエントリーをはてなブックマークに追加

Cloud Composer は GCP のサービスのひとつで、ジョブ管理・タスク管理ツールです。Cloud Composer は非常にわかりづらいポイントが多々あり、当ページ管理人はとても苦労しました。皆様が同じところでハマらないよう願っております。

目次

Google Cloud Composer 基本編

Q. Cloud Composer って何?

A. Google Cloud Platform (GCP) のサービスのひとつ。 ジョブ管理・タスク管理・ジョブオーケストレーションツールと言われるもの。 cron をちょっと立派にしたもの。

もともと Apache Airflow というオープンソースのソフトウェアがあり、 Cloud Composer はそれを GCP にマネージドサービスとして組み込んだもの。

Q. Cloud Composer で何ができるの?

  • ジョブを起動できる。
  • ジョブA が終わったらジョブB を実行、といった依存関係を定義できる。
  • ジョブA・B・C を並列実行し、終わったらジョブD を実行するといった待ち合わせができる。
  • ジョブのタイムアウト設定やリトライが可能 (リトライ回数・リトライ間隔指定可能)。
  • ジョブ実行が一定時間を超えたら、通知ができる (1時間超過で警告的な)
  • 「このジョブは最大N個まで同時実行可能」という定義ができる (N個を超えた分は待たされる)
  • 「前段のジョブがすべて成功したら後続を実行 (ひとつでも失敗したら後続ジョブを実行しない)」がデフォルトであるが、「すべて失敗」「ひとつでも成功」「ひとつでも失敗」「全部失敗」などの条件も指定できる。
  • ジョブの定期実行ができる。cron 形式で記述できるため、毎時N分、毎日N時、毎週、毎月、9時と18時など柔軟な起動が可能。
  • GCS・Pub/Sub・BigQuery・S3・HTTP などを通じ、ある条件を満たしている間、ジョブを実行することができる
  • ブラウザで、どこまで終わっているか、エラー件数・ログ等の情報を確認可能。
  • 環境変数や Airflow 変数を定義することができる。

Q. ジョブ定義は画面上から行うの?

画面上からのジョブ定義は一切 (!) できない。 ジョブ定義は DAG (ダグ) ファイルと呼ばれる Python スクリプトを作成し、それをアップロードすることで Composer に登録できる。

Q. DAG ファイルはどんなふうに書く?

DAG ファイルの一例を上げると、下記。これは bash 経由で echo 文を出力するだけのもの。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
  'start_date': datetime(2018, 10, 1),
}

dag = DAG(dag_id='mydag',
          default_args=default_args,
          schedule_interval = None)

task = BashOperator(task_id='mytask',
                     bash_command='echo Hello World',
                     dag=dag)
task

Airflow と Cloud Composer 編

Q. GCP で Cloud Composer ではなく Airflow を使うことはできる?

できる。GCE を立ち上げて、普通に Airflow をインストールすればよい。 Composer のように冗長化はなされていないが、Google が誇るライブマイグレーション機能 (サーバの物理障害発生時に無停止で自動的に別サーバに移動してくれる) があるため、 それほど心配はいらないのではと考える。

Q. Cloud Composer と Airflow の関係は?

ジョブ管理ツールとしての機能追加やバグ修正は Airflow 側で行う。 Airflow の新バージョンがリリースされたら、Composer が追随する。 Airflow のバージョンが変わらなくても、Composer として機能追加がなされることもある。

現時点での Cloud Composer のリリースとバージョン表記は下記のとおり。

  • 2018/10/02 composer-1.2.0-airflow-1.9.0
  • 2018/09/17 composer-1.1.1-airflow-1.9.0
  • 2018/08/17 composer-1.1.0-airflow-1.9.0
  • 2018/07/19 composer-1.0.0-airflow-1.9.0

2018/08 に Airflow-1.10 がリリースされているので、 そのうち composer-1.3.0-airflow-1.10.0 的なものが出てくるのではと想像する。

2018/10/25 に、composer-1.3.0-airflow-1.9.0 と composer-1.3.0-airflow-1.10.0 がリリースされました。2018/10 末までに全ゾーンで利用可能になるとのこと。Airflow 複数バージョンを選択可能、ということですね。少なくとも現時点では、既存 Airflow-1.9.0 系が 1.10.0 に自動更新されることはないようです。

Airflow のドキュメントに書いてある機能が Composer で使えないようなんだけど。

↑に書いたとおり、Composer は Airflow 1.9.0 ベースであるが、 Airflow 公式サイトは 1.10.0 の情報になっている。

https://airflow.readthedocs.io/en/stable/ にて過去バージョンのドキュメントを閲覧可能なので、そちらを読むとよいだろう。 画面右下の “v: stable” をクリックすると、1.9.0 などのバージョンを選択できる。

Q. Cloud Composer のバージョンアップは?

自動的に行われる。バージョンを固定したり、バージョンアップタイミングを指定したりする方法はないと思われる。

料金編

Q. 料金は?

東京 (asia-northeast) なら $504/月 程度。 アイオワ (us-central) なら $386/月 程度。

まず前提知識として、Composer を作成すると下記の 2つが立ち上がることを理解してほしい。

  • Composer 本体
  • Worker ノード

Composer 本体はフルマネージメントサービスで、GCP 利用者側のリソースではなく、 Google 側内部のリソースを使って実行される。インスタンスサイズ・インスタンス数などは一切指定できない。 一度起動したら止めることもできない。Composer を削除するまでお金がかかり続ける。

一方、Worker ノードは GKE として GCP 利用者側のリソースで実行される。 Composer 作成時に n1-standard-1 x 3 などと指定しているのは Worker ノードの話である。

それをふまえて、わかりやすく書くと下記になる。

分類ItemPrice (USD)個数月額 (東京・31日計算)
Composer 本体Web core hours$0.098 / vCPU hour2 (固定)$145.82
Database core hours$0.163 / vCPU hour2 (固定)$242.54
Web and database storage$0.354 per GB / month20GB (固定)$7.08
Network egress$0.156 / GB処理内容によるとりあえず $0 とする
小計$395.45
Worker ノードGKE クラスタ無料1無料
GCE インスタンス$31.17/月 (n1-standard-1 の場合)3 以上必須$93.51
永続ディスク$0.052 GB/月100GB$15.6
小計$109.11
合計$504.56 (東京)

参考までに、アイオワ (us-central) の金額は下記。

  • Composer 本体
    • Web $0.074 x 24時間 x 31日 x 2台 = $110.112
    • Database $0.125 x 24時間 x 31日 x 2台 = $186
    • Web and database storage $0.273 x 20GB = $5.46
    • Network egress $0.156 x とりあえず0 = $0
  • Worker
    • GCE インスタンス n1-standard-1 $24.2725 x 3ノード = $72.8175
    • 永続ディスク $0.040 x 100GB x 3 ノード = $12
  • $110.112 + $186 + $5.46 + $72.8175 + $12 = $386.38

Q. 高くない? 何かいい方法はない?

高い!!! 単体でも高いが、開発・ステージング・本番と 3環境作るとしたら、3倍である。

最初に考えるべきは「Airflow を使う」である。これなら GCE 分の費用だけで済む。

次に東京リージョンを使わず、us-central や us-east を使うだけで 25% 安くなる。

Composer 本体の $395/月 は、Composer を立てっぱなしにするという前提ならば完全固定費となる。ここはどうやっても節約しようがない。Composer は停止できないので、ここを節約したいなら、Composer を削除し、使うときに Composer 起動するしかない。開発環境では夜間・土日に削除 & 起動は有効だろう。

実際に試したことはないが、下記のような感じでいけるのではなかろうか。

# 夜、Composer 設定データ export・Composer 削除
gcloud composer environments storage dags export (等々で、頑張って現在の設定をファイルに落とす)
gcloud composer environments delete (Composer 削除)
# 朝、Composer 作成・設定データ復元
gcloud composer environments create
gcloud composer environments run [環境名] variables --
gcloud composer environments run [環境名] connections -- ...
gcloud composer environments storage dags import

Composer を立ち上げっぱなしにする前提であれば、節約ポイントは Worker ノードしかない。 Worker ノード数は Composer 画面からは 3 未満に変更することができないが、 GKE として直接ノード数を 0 にすれば Worker ノードの費用はかからなくなる。 使いたいときはノード数を 3 に上げればよい。 障害時に多少困るかもしれないが、ノード数を 2 や 1 にしても動くかも知れない (リソース的に不安はある)。

また、共用タイプの g1-small を使う手もある。一応起動はしたが、リソース不足が心配である。 なお f1-micro を試してみたが、リソース不足で起動しなかった。

DAG ファイル書き方編

Q. DAG 登録はどうやって行う?

DAG 配置用 GCS にファイルを配置することで登録される。具体的には下記のいずれかで登録可能。

  • ブラウザで GCS の画面を開き、 DAG ファイルをドラッグ & ドロップでアップロードする。
  • gsutil コマンドを使って GCS にコピーする (gcloud cp)。
  • gcloud コマンドを使ってインポートする (gcloud composer environments storage dags import)

Q. Operator を使わず直接 gsutil コマンドや bq コマンドで操作すればよくない?

GCS にアップロードした DAG は、数秒〜数十秒おきに実行される(!)。 例えば DAG 内に print(‘HOGE’) と書くと、Stackdrive Logging に数秒〜数十秒おきに HOGE と出力されることを確認してほしい。

これは、DAG ファイルは設定ファイルというよりは Python プログラムなので、

'start_date': datetime.now()

などと書いた場合、実行してみないと値が取れないし、実行するたびにどんどん時刻が変わっていくため、 定期的に DAG ファイルを Python プログラムとして実行するしか方法がないのではと推測する。

そして、この定期実行の際に Operator 内部のみ実行されない。 そのため、gsutil コマンド等を直接実行するのではなく、Operator 経由で実行する必要がある。

Q. DAG ファイルをアップロードしないとエラーチェックできないのは面倒なんだけど

Cloud Shell にて、

% sudo pip install airflow
% python foo.py

で BashOperator のみを使った DAG ファイルのチェックはできた。 しかしながら下記のような contrib の operator は、コマンドラインではエラーになる。なんとかして contrib のモジュールをどこかに置けば動くのだろうとは思う。

from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator

Q. 定期実行はどのように書く?

こんな感じで cron っぽく書ける。

dag = DAG(...
          schedule_interval='0 1 * * *'
          )

Q. タイムアウトを指定するには?

execution_timeout を指定する。下記は 1時間でタイムアウトする例。 hours 以外に、days, seconds, minutes などが指定可能。 timedelta は Python 標準モジュールなので、そちらのドキュメントを読んでほしい。 なお、import するのを忘れずに。

from datetime import datetime, timedelta

task = BashOperator(task_id='mytask',
                    bash_command='sleep 100; echo Hello World',
                    execution_timeout=timedelta(hours=1),
                    dag=dag)

Q. タスクの同時実行数を制御するには?

pool を使うのがよいだろう。 Airflow 管理画面の Admin > Pools にて “Create” を押下し、

  • Pool: mypool など適当な名前
  • Slots: 8 (同時実行上限数)

などと入力して Save する。

その上で下記のように Operator に pool=’mypool’ を指定すると、同時実行する上限が 8 となる。 9個目以降は実行中のタスクが終了次第、順次実行が開始される。

task = BashOperator(task_id='mytask',
                    bash_command='...',
                    pool='mypool',
                    dag=dag)

Pool は何個でも作れるので、メール送信には mail_pool、Slack 通知には slack_pool など、 外部リソースごとに定義可能である。 また、Composer 自体の使用リソース抑制のためにも使える。 同時に 1000個のタスクを実行すると、GKE にて 1000個のワーカーが起動してしまい、 よほどのリソースがないとまともに動かなくなってしまう。

Q. 通常 5分で終わる処理が 15分経っても終わらなかったら通知してほしい。ただ、15分経過しても処理を実行してほしい。

sla を使う。下記は 15分を超えると通知する例。

from datetime import datetime, timedelta

def sla_mmiss_callback:
   書き方調査中

task = BashOperator(task_id='mytask',
                    bash_command='...',
                    sla=timedelta(minutes=15),
                    sla_miss_callback=sla_miss_callback,
                    dag=dag)

Q. DAG の import 文で「Importing BashOperator directly from … has been deprecated.」という警告が出るんだけど

A.

from airflow.operators import BashOperator

という書き方は古い書き方のようで、実行はできるものの下記の警告が出る。 また、メッセージ中にあるように将来リリースされる Airflow 2.0 でこの書き方は使えなくなるとのこと。

/usr/local/lib/python2.7/dist-packages/airflow/utils/helpers.py:406: DeprecationWarning: Importing BashOperator directly from  has been deprecated. Please import from '.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
    DeprecationWarning)

下記のように修正すればよい模様。

from airflow.operators import BashOperator
    ↓
from airflow.operators.bash_operator import BashOperator

Operator 編

Q. どのような Operator がある?

基本的な Operator と GCP 絡みの Operator をリストアップする。 ただしこれは Airflow-1.10 の情報であり、2018/10/17 現在の Cloud Composer は Airflow-1.9 ベースであるため、 まだ使えないものも多い。

  • Bash
    • BashOperator
  • Python
    • PythonOperator
  • Python
    • PythonBranchOperator: if文のようなもの。
  • BigQuery
    • BigQueryOperator
    • BigQueryToCloudStorageOperator
    • BigQueryToBigQueryOperator
    • BigQueryTableDeleteOperator
    • BigQueryGetDataOperator
    • BigQueryCheckOperator
  • Cloud Dataflow
    • DataflowPythonOperator
    • DataflowJavaOperator
    • DataflowTemplateOperator
  • Google Cloud Storage (GCS)
    • GoogleCloudStorageDownloadOperator
    • GoogleCloudStorageListOperator
    • GoogleCloudStorageCreateBucketOperator
    • GoogleCloudStorageCreateBucketOperator
    • GoogleCloudStorageToBigQueryOperator
    • GoogleCloudStorageToGoogleCloudStorageOperator
    • GoogleCloudStorageToS3Operator
    • FileToGoogleCloudStorageOperator
    • MySqlToGoogleCloudStorageOperator
    • PostgresToGoogleCloudStorageOperator
    • S3ToGoogleCloudStorageOperator
    • BigQueryToCloudStorageOperator
    • CassandraToGoogleCloudStorageOperator

その他

  • EmailOperator
  • MsSqlOperator
  • SimpleHttpOperator
  • SlackAPIOperator

定期実行に関するややこしい話 (start_date・schedule_interval・catchup 等) 編

Composer・Airflow は 1日1回など定期実行するだけでも、とても大変である。 まず、下記のような設計方針であることを理解しよう。

  • 定期実行は、開始時刻〜終了時刻を定義した上で、終了時刻に達したら実行が行われる。
  • DAG は開始日を持つ (必須)。
  • ジョブが未実行の場合、過去にさかのぼって実行する。直近 3日間未実行であれば、3日分実行するというイメージ。
  • なぜか、DAG ファイルアップロード時に直近日付の実行を行ってしまう。

さっぱりわからないと思うので、まずは下記 DAG を登録してほしい。 下記は 毎日 10:00 に echo 文を実行する DAG 定義である。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
  'start_date': datetime(2018, 10, 1),
}

dag = DAG(dag_id='daily',
          default_args=default_args,
          schedule_interval = '0 10 * * *')

task = BashOperator(task_id='mytask',
                     bash_command='echo Hello World at {{ ds }}',
                     dag=dag)
task

これを 2018/10/21 にアップロードすると、2018/10/01〜10/20 分の20個(!) のジョブが、 自動的に、一気に、実行される。なぜならば、start_date が 2018/10/01 であるにも関らず、 2018/10/01〜10/20 分がまだ実行されていないことを Airflow が検知したため、 親切にも実行してくれたのである。

これは、cron 等における、 「その時間にちょうどマシンが落ちていたら定期ジョブが動かない」という問題を解決するための仕組みであろう。

このような挙動を避けたい場合、

  'start_date': datetime(2018, 10, 20),

と書くべきなのである。 ただ問題なのは、DAG ファイルアップロードのたびに、start_date を「正しく」 書き換える必要があるということ。もしこのタスクが 実行完了までに 6時間かかり、その間 DB に負荷をかけまくるものであったなら? 本当に毎回正しくミスなく更新できるのか? と考えると、難易度が高いと考える。

この、さかのぼって実行する仕組みを Airflow 的には catchup と読んでいるのだが、 下記のように書くとこの catchup を OFF にすることができる。

  'start_date': datetime(2018, 10, 20),
  'catchup': False,

しかしながら…これだと直近のタスクが実行されてしまい…かといってdatetime.now() にすると永遠に実行されず… 的なことを書く。

ログ編

Q. ログはどこに出力される?

Airflow としてのログは Stackdriver Logging に出力される。、 DAG の標準出力・標準エラー出力も Stackdriver Logging に出力される。 それ以外の View Log で見ることができる情報は、Stackdriver Logging には出力されず、 GCS のログ用フォルダに出力される。

Q. ログ量はどれくらい?

毎秒結構な量のログを結構多い。Composer を起動して数個の DAG ファイルを登録した状態で、 1日あたり 900MB 程度。1ヶ月で 27GB ほど。Stackdriver Logging の無料枠が月間 50GB なので、 半分以上食いつぶしてしまう。すでに無料枠を使い切っているなら、 $0.50/GB なので $14 ほど上乗せとなる。

通知編

権限編

Q. Composer の権限はどうなっている?

Google 側で動く Composer 本体の権限は指定できない。 Worker ノードについては、デフォルトは roles/editor 権限である。 ほぼなんでもできる権限であるため、BigQuery・GCS・PubSub・Cloud SQL 等、 なんでも読み書きできる。

Q. Composer を操作できるユーザの権限管理はどうなっている?

Composer 画面については、参照更新可能・参照のみ可・参照更新不可のロールが用意されている。 つまり、Composer を作成したり、削除したり、環境変数や Airflow Configuration を修正したりについては、 コントロールできる。DAG フォルダのある GCS についても、コントロール可能である。

しかしながら、Airflow 画面についてはなかなか厳しい状況で、 参照更新可能・参照更新不可のいずれかしかない。 つまり、ジョブ状況を確認したいが、DAG 実行や設定変更はできない、というロール設定は 少なくとも 2018/10 現在では不可能である。

その他編

Q. タイムゾーンは JST にできる?

2018/10/17 現在の composer-1.2.0-airflow-1.9.0 では UTC のみ。 Airflow 本体は Airflow-1.10 でタイムゾーンに対応したため、 次期 Composer にて JST 対応ができるものと期待される。

Q. Airflow 画面で「An internal server error occurred while authorizing your request. Error code 28」と出るんだけれども。

よく発生する。リロードすればなおる。

Q. DAG 登録・更新した際、Airflow 画面をリロードするたびに情報が異なるんだけど

Web サーバが複数台あって、どちらに振られるかで情報が異なる模様。 2〜3分待てばおおむね収束する。 おそらくは画面表示だけの問題で、ジョブ実行そのものは問題ない (と思いたい)。

Q. Composer/Airflow の競合は?

cron、Digdag、Luigi、Jenkins、Hinemos、JP1、Tivoli 等。

  • このエントリーをはてなブックマークに追加

SNSでもご購読できます。

Leave a Reply

*

CAPTCHA