最終更新
Cloud Dataflow とは、GCP のサービスのひとつです。 入力データを取り込み、加工し、出力することに特化したもので、いわゆる ETL と呼ばれるものです。
Cloud Dataflow は、なかなかにとっつきにくく、理解しづらいサービスです。当ページ管理人は Cloud Dataflow 導入においてすごく苦労しましたので、知識やノウハウを説明いたします。他の人が同じところではまらないよう願っております。
目次
Google Cloud Dataflow 全般
Q. Cloud Dataflow とは何?
A. Google Cloud Platform (GCP) のサービスのひとつ。入力データを加工し、出力することに特化したもの。いわゆる ETL。
Q. Dataflow を使うと何ができる?
A. ファイルを読んで、加工して、ファイルに出力する。 ファイルを読んで、加工して、DB に出力する。 DB を読んで、加工して、DB に出力する。 Pub/Sub メッセージを読んで、加工して、Datastore に出力する。 などなど。
Q. データ加工なら、普通にバッチ作ればいいんじゃない? Dataflow の場合、何がうれしいの?
分散処理が簡単にできるので、数千万や数億レコードあるようなビッグデータに向いている。 また、リアルタイム処理ができるのもメリット。
逆に言うと、リアルタイム性が不要で、数千行・数万行の小規模データなら、 ファイルをオープンして、1行ずつ処理するプログラムを自分で書けばよい。 結構学習コストが高いので、使う必要がないなら無理して使わない方がよい。
また、Dataflow は処理を行うときのみ料金がかかる。よって、 「取込処理を行うのは 1日1回の1時間だけだが、 仮想マシンでバッチサーバを立ち上げっぱなしにしている」というのと比較するとコストを低く抑えることができる (ただしこれは バッチの場合であって、ストリーミングはデータ取り込みを行っていない場合でも、起動しておくだけでお金がかかる)。
Q. 管理画面操作だけで、ジョブ処理の定義が行える?
基本的にはできない。Java や Python でプログラムをごりごり書く必要がある。
なぜ「基本的には」かと言うと、Google 提供の Dataflow テンプレートというものがあり、 これを使えば画面上操作または gcloud コマンド一発で ETL 処理が行える。 ただし Google 提供テンプレートは基本的な処理のみであるため、 ほとんどのケースではプログラムを書く必要が出てくるのではと思う次第。
Q. どんなふうにごりごり書くの?
こんな感じ。 GCS にあるテキスト内のワードをカウントし、結果を GCS に出力するサンプルコード より。
public class MinimalWordCount {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET_YOUR_PROJECT_ID_HERE");
options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(ParDo.named("ExtractWords").of(new DoFn() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
.apply(Count.perElement())
.apply("FormatResults", MapElements.via(new SimpleFunction, String>() {
@Override
public String apply(KV input) {
return input.getKey() + ": " + input.getValue();
}
}))
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
p.run();
}
}
Q. Java と Python どちらがおすすめ?
2018/8 現在、Java がおすすめ。 ずっと Python ではバッチしか対応していなかった (ストリーミングができなかった) が、 2018/7 にてストリーミングに対応した。 しかしながら、依然として Python ではできないことがあるようで (例: DynamicDestinations)、 フル機能を使えるという意味では Java の方がよいと考える。
ただ、Java だとジェネリクスを書くのがとても面倒で、当ページ管理人の個人的な意見としては Python を使いたい。
Q. 本当に Java と Python しか使えないの?
正式対応しているのは Java と Python のみ。
しかしながら Kotlin を使っている人がちらほらといる模様。 Kotlin は JVM で動作し、Java よりも簡潔に書けることを標榜した言語。 また、Spotify が提供している Scio という Scala 用ライブラリもある。
- KotlinでCloud Dataflowのバッチ処理を書く
- KotlinでDataflow書きたくて、触ってみた
- Scala + Scio で Apache Beam あるいは Google Cloud Dataflow に入門する
- Spotify Scio – Google Dataflow の Scala言語版
Q. データ入力元・出力先の種類は?
GCS・BigQuery・Pub/Sub・Datastore など。AWS の S3・SNS・SQS にも対応。ただしカスタムソース・カスタムシンクというもので定義できるので、標準で用意されていないものでも対応可能 (ある程度ソースを書く必要はある)。
Q. バッチとかストリーミングって何?
バッチは、一回限りの実行のこと。1時間に1回とか、1日に1回などの定時実行もバッチ。
ストリーミングは、動かしっぱなしにすることで、ほぼリアルタイムにデータ処理を行える。
Dataflow はジョブを実行する際、「バッチ」「ストリーミング」のいずれかを指定する必要がある。
Q. Apache Beam との関係性は?
いまだによくわかっていないのだが、 最初にできたのが Dataflow で、SDK を公開していた(?)。 Apache Beam は、そのあとに始まったプロジェクトではないかと思われる。 2016年、Google は Apache Foundation に SDK コードを寄贈。 Google の Dataflow SDK は 2.5.0 で終了。それ以降は Apache Beam SDK を使うことになっている。
Q. ネット上にあるサンプルコードが動かないんだけど
オープンソース系にはよくあることだが、Dataflow の場合、 SDK 1.x 系 → SDK 2.x 系 (≒ Apache Beam に移行) のタイミングでインタフェースが変わったため、 特にひどい状況。
例えば Dartaflow SDK 1.x 系は下記のように書いていたものが、
p.apply(TextIO.Read.from("gs://foo/bar.txt"));
SDK 2.x 系は下記のように変わった。
p.apply(TextIO.read().from("gs://foo/bar.txt"));
ネット上の情報で、2016年に書かれたコードはほぼ動かないと思ってよい。 2017年は微妙。 2018年ならだいたい動く。
さらにひどいことに、Apache Beam SDK (最新の 2.5.0) の Javadoc 記載のコードも動かないものがあったり、 Dataflow ドキュメントの日本語ページの翻訳が古くサンプルコードがエラーになったりする (英語版では動いたりする)。
とはいえ、SDK 1.x と 2.x で概念や考え方が変わったわけではなく、メソッド名などのインタフェースが微妙に変わったり、機能追加がなされたり、というのが主な変更点であるので、概念・考え方を調べるために Dataflow のドキュメントを読むのはまったく問題ない。
Q. オートスケールする?
オートスケールさせることもできるし、させないこともできる。
スタート時のインスタンス数と、インスタンス数上限を設定可能なので、スタート時 1、上限 1 とすればインスタンス数は 1固定だし、スタート時 1、上限 10 とすれば、1~10 の枠内でオートスケールしてくれる。
Q. ジョブはどこで実行される?
自動的に Google Compute Engine (GCE) インスタンスが作成され、その中でジョブが実行される。 インスタンス数 3 と指定すれば、GCE が 3 インスタンス起動される。 GCE 管理画面でインスタンスが起動していることが確認できるし、 ssh でログインもできる (ログインできても何かができるわけではない)。
Q. パイプラインを作成してジョブを実行するコードも GCE 上で実行される?
違う。大変わかりにくいが、上記の MinimalWordConut のコードは2つの部分に分けることができる。
- p.run()
- 上記以外
自動的に起動する GCE 内で処理されるのは p.run() のみである。
それ以外のあらゆるコードは、別のどこかで動かす必要がある。それは CloudShell でもよいし、適当な GCE でもいいし、GKE でも、GAE でも、Functions でも、どこでもよい。あなたが好きに選べばよい。しかしながら、「p.run() 以外」を、「自動的に起動する GCE」で実行させることはできない。
Q. Dataflow Template って何
上記の「Dataflow ジョブを作成するための、上記に提示した MinimalWordCount のようなコード」を、 テンプレート生成時に「実行」し、生成されたパイプライン情報を「テンプレート」として GCS に配置しておく。
そうすることで、Dataflow ジョブを実行する際、 「Dataflow ジョブを作成するための、上記に提示した MinimalWordCount のようなコード」を実行する必要がないため、 ジョブを生成する時には Java 環境が不要になる。 こうすることで、gcloud dataflow jobs run コマンドのみで、Dataflow ジョブを実行することができる。
そのため、Dataflow Template を作成しておけば、Cloud Functions、Composer 等、GCP コンソール、gcloud コマンドなど、Java 環境がない環境からも簡単に実行が可能になる。
Q. 料金は?
Dataflow ジョブの実行中に確保した CPU・メモリ・ディスクの量に、 それぞれの CPU 単価・メモリ単価・ディスク単価をかけた金額が請求される。 これらは「確保したリソース×実行時間」で決まるので、実際の使用量で決まるものではない。 実体としては GCE が起動されるが GCE としては請求されない。通常の GCE と比べるとおおむね 1.5倍程度になる模様。 この 0.5倍分が Dataflow の付加価値と言えるのだろう。
Q. Dataflow は、ワークフロー管理ができる?
JP1・Airflow・Digdag などのジョブワークフロー管理のようなことができるかという質問であれば、 「できない」。
Dataflow 管理画面で、下記のような一見ワークフロー管理っぽい画面があるが、 これは1つのパイプライン内の処理の流れを表示しているだけである。
そもそも、Dataflow は、 「この入力を、こう加工して、ここに出力」というパイプラインを定義するものであるが、 「このパイプラインが終わったら別のパイプラインを実行」という『逐次処理』はできない。
Q. 「逐次処理はできない」の意味がわからない。
Dataflow で作成するパイプラインとは、『一つの SQL』のようなものだと思ってほしい。 SQL では、FROM でデータ入力元を指定でき、WHERE でデータを絞り込むことができ、 SELECT でカラムを選択でき、GROUP BY で集約ができ、JOIN で複数の入力を扱える。
しかしながら、
INSERT INTO output1 SELECT * FROM input1; INSERT INTO output2 SELECT * FROM input2;
のように、データの流れがつながっていない、1つめの SQL が終わったら 2つめの SQL を実行する、 という逐次処理は、「1つの SQL」では実現できない。なので、Dataflow のパイプラインでも実現できない。
Q. 逐次処理ができないということは、つまり何ができない?
例えば下記ができない。
- Dataflow で BigQuery にデータを格納後、後続バッチを起動するために Pub/Sub メッセージを送信する。
- Dataflow で BigQuery にテーブルA を格納後、Dataflow でテーブルA を SELECT して加工し、テーブルBに格納する。
Q. では逐次処理をしたい場合はどうすれば?
「Cloud Composer を使え」が Google の方針のようだ。しかしそれはそれで苦難の道のりが待っているだろう (⇒ Composer FAQ)。
GCE インスタンスについて
Q. GCE マシンタイプは指定できる?
パイプラインオプションに –workerMachineType=n1-standard-2 などと指定する。 マシンタイプは固定で、データ処理量に応じてマシンタイプが勝手に変わったりはしない。n1-standard-8 × 1 と、n1-standard-2 × 3 といった組み合わせもできない。指定できるマシンタイプは 1種類のみである。
Q. カスタムマシンタイプは使用できる?
できる。例えば n1-standard-2 は vCPU が 2、メモリが 7.5GB であるが、メモリはそんなにいらないという場合は下記のようにメモリを少なめに指定する。
--workerMachineType=custom-2-4096
拡張メモリを使いたい場合は、下記のように末尾に “-ext” をつける。
--workerMachineType=custom-2-16384-ext
下記のような GCE のカスタムマシンタイプ一般の制限があるので注意。詳細はhttps://cloud.google.com/compute/docs/instances/creating-instance-with-custom-machine-type を参照。
- vCPU は 1個または偶数個 (3 とか 5 は不可)
- メモリは vCPU 1 個あたり 0.9 GB 以上、最大 6.5 GB まで (6.5 GB 超は拡張メモリになる)
- インスタンスの合計メモリは、256 MB の倍数にする必要があります。
Q. プリエンプティブインスタンスは使える?
安い代わりに最大 24時間で、いつ終了になるかわからないというプリエンティブインスタンスは、2019/4 に発表された「 Flexible Resource Scheduling (FlexRS)」で使えるようになった (2019/4 現在ベータ)。
CPU・メモリが4割引になるが、そのかわり実行開始が遅延される。具体的には 6時間以内に実行開始となる。
Q. GCE インスタンスで SSD は使える?
使える。デフォルトでは HDD だが、下記のパイプラインオプションで SSD を使うことができる。 オプションで下記を指定する。
--workerDiskType="compute.googleapis.com/projects//zones//diskTypes/pd-ssd"
Q. ローカル SSD は使える?
NVMe で爆速なローカル SSD は、残念ながら使えない (と思われる)。
Q. インスタンス数はどうやって指定する?
開始時のインスタンス数 (numWorkers) と、インスタンス数上限 (maxNumWorkers) を指定できる。 インスタンス数下限は指定できない。
- 例. 開始時のインスタンス数 1、インスタンス数上限を 5 とした場合:
- 開始時は 1、その後は 1〜5 のいずれになるかは Dataflow サービス側で判断。
- 例. 開始時のインスタンス数 5、インスタンス数上限を 5 とした場合:
- 開始時は 5、その後は 1〜5 のいずれになるかは Dataflow サービス側で判断。 ジョブ開始後
- 例. 開始時のインスタンス数 5、インスタンス数上限を 10 とした場合:
- 開始時は 5、その後は 1〜10 のいずれになるかは Dataflow サービス側で判断。
ジョブ管理
Q. ジョブをキャンセルできる?
できる。管理画面上か、gcloud コマンドで gcloud dataflow jobs cancel [ジョブID] で可能。 いずれにしても、GCE を終了するのに 5分くらいかかるので、待つのがめんどくさい。
Q. ジョブ再実行は?
投入した Dataflow ジョブを、再度実行することはできない。同じジョブを作るしかない。 同じパラメータで新規ジョブを作成して実行、くらいはできてもいいような気がするが、 2019/04 現在、そのような機能はない。
Q. タイムアウト設定は?
Dataflow ジョブをN時間で終了させるという簡単なオプションはないと思う。
代替策は、もし Template を使っていない場合は、下記のように一定時間待って、その後ジョブを殺す。
// 単位はミリ秒。5分待つ
pipeline.run().waitUntilFinish(new Duration(1000*60*5));
あるいは別途監視用ツールを作成し、ツール内で定期的に Dataflow job の情報を取得 (例えば gcloud dataflow jobs list の結果取得など) し、ステータスが実行中かつN時間を超えたらジョブを殺す、とする。
Q. Dataflow ジョブ完了後、PubSub などで通知を受けることは可能?
Dataflow 単体では簡単にはできない (できてほしいのだが)。
案1. デプロイプロセス側で
pipeline.run().waitUntilFinish();
で終了を待って、その後 Pub/Sub メッセージを投げる方法がある。ただしパイプライン外の処理になるので、Dataflow Template では使えない。
案2. あるいは定期的にポーリングして、ジョブ終了を検知するプログラムを作る。
案3. パイプラインの最後で TextIO でストレージに出力し、「Cloud Pub/Sub Notifications for Cloud Storage」を使って Pub/Sub メッセージを送信する。
案4. Cloud Composer を使う。
Q. リトライは?
ひとつの処理でエラーが発生した場合、自動的にリトライを 4回行ってくれる。 ただし、リトライ時にウェイトを入れてくれたりはしないので、 ごくごく短時間の外部サービス障害ならば問題ないが、 長時間落ちていた場合などはジョブがエラー終了となる。
Q. ではエラーやリトライは Dataflow にまかせておけば大丈夫ということか?
残念ながら Dataflow はそこまでよいものではない。
Dataflow job の生成に失敗する、Dataflow job 生成直後にエラーとなる、途中で処理が進まなくなる (いつまで待っても終わらない)、該当ゾーンでのリソース不足など、様々な原因で Dataflow job は失敗する。
ジョブ終了時のステータス (正常か異常終了か)、ジョブの実行時間が長すぎた場合のアラートなどを仕込んでおくことを強くおすすめする。
Q. 全く関連のないジョブを複数個実行することはできる?
できる。パイプラインのビルド処理において、パイプラインを 2つ作ればよい。 ただし、ビルド処理は Dataflow サービスの枠外であるため、例えば下記コードで p.run() した直後にインスタンスが落ちてしまったら p2 は実行されないことに注意。
Pipeline p = Pipeline.create(options);
p.apply(...);
p.run();
Pipeline p2 = Pipeline.create(options);
p2.apply(...);
p2.run();
データ処理全般
Q. データのスキップはできる?
できる。下記のように、特定条件の場合 c.output を呼ばなければよい。
Integer num = c.element();
if ( num > 10000 ){
// 異常値なのでエラーログ出力等
} else {
c.output(num);
}
filter を使ってもできると思う (未検証)。
Q. SQL での group by のような処理ははできる?
GroupByKey を使う。キーを指定すると、同じキーを持つレコードを集約できる。集約したあとに何を行うかはあなた次第 (MAX 値を取るのか、 件数をカウントするのか、タイムスタンプが最も早いものなどの条件で1つを選ぶのか)。
なお、全データが揃わないと GroupByKey が開始できないため、分散処理のうまみが減ってしまう。 また、データを貯め込む必要があるためデータ量に応じたばディスク容量が必要となる (ディスクを使うので、SSD を使うと高速になる)。
Q. SQL での join のような結合処理ははできる?
2つやり方がある。 1つは、CoGroupBy を使う方法。 もう 1つは副入力 (Side Input) を使う方法。
Q. 副入力って何?
典型的には、大量のデータと、少量のマスタがあって、 それを結合したい場合などに少量のマスタを副入力として扱う。 例えば
ID,タイムスタンプ,店舗ID,売上金額
という大量の売上データがあるとして、 さらに BigQuery などに 店舗ID,店舗名 という店舗マスタがあり、 最終的に
ID,タイムスタンプ,店舗ID,店舗名,売上金額
というふうに「店舗名」カラムを追加して出力したい場合、 店舗マスタを副入力として扱うとよい。
参考: Cloud Dataflowで複数リソースを読み込む方法
Q. 複数の出力を行うことはできる?
PTranslate は、入力の PCollection を変換し、出力 PCollection を生成するが、 複数の出力を生成することができる。 例えば売上データを読み込んで、
- 出力1: 商品ごとの売上合計
- 出力2: 店舗ごとの売上合計
- 出力3: セール品の売上合計
- 出力4: フォーマットエラー情報
というふうに出力複数を分けることができる。 たとえば、出力1 はファイルに、出力2・3 は BigQuery のテーブルに、 出力4 はファイルとして GCS に、というふうに分岐できる。
Q. SQL での DISTINCT のような重複除去はできる?
RemoveDuplicates を使う。
Q. SQL で書きたいんだけれど
試したことはないが書けるはず。Apache Beam 2.2.0 (2017-12-02 リリース) より Beam SQL DSL が実装された。 BeamSqlExample.java
ただし Dataflow SDK 2.2.0 のリリースノートにおいてはhttps://cloud.google.com/dataflow/release-notes/release-notes-java-2?hl=en にて
Known issue: SQL support is not included in this release because it is experimental and not tested on Cloud Dataflow. Using SQL is not recommended.
との記載があり、Dataflow としてはサポート対象外。
※2019/4、Beam SQL 対応のアナウンスあり。
テキストファイル関連
Q. 複数のファイルを入力とすることはできる? (集約できる?)
できる。*.csv などとワイルドカードで記述することができる
p.apply(TextIO.read().from("gs://foo/bar/*.csv"));
さらに下記のような複数のストレージなど、ワイルドカードでは表現できない場合でも、 PCollection を flatten することで、1つの入力にまとめることができる。 参考: https://stackoverflow.com/questions/44407323/textio-read-multiple-files-from-gcs-using-pattern
- gs://foo1/bar/*.csv
- gs://foo2/bar/*.csv
Q. テキストファイルの先頭ヘッダを除外することはできる?
TextIO ならできない。各インスタンスでデータを分散処理するため、「1行目」という判定ができないため。下記のように特定文字列なら処理しない、という書き方になる。
String line = c.element();
if ( line.startWith("Id,Date,Action,...") ){
return;
}
....何かしらの処理
FileIO ならできるが、Dataflow から使えるのか? (ローカルのファイルを読み書きするということなので)
Q. エラー時の情報として、入力データのファイル名や行番号を知りたい
TextIO ならできない。FileIO ならできるが、Dataflow から使えるのか? (ローカルのファイルを読み書きするということなので)
Q. テキストファイルをソートしてから出力することはできる?
できない。ソートは分散処理に不向き。 どうしてもというならhttps://stackoverflow.com/questions/44291407/dataflow-write-to-file-in-order-of-pcollectionの回答にあるようカスタムシンクを作ることでできるかもしれないが、大規模データを出力するのは困難ではと思う。
BigQuery 関連
Q. BigQuery のテーブルを入力とすることはできる?
できる。
PCollection weatherData = pipeline.apply(
BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations"));
Q. BigQuery クエリ結果を入力とすることはできる?
できる。StandardSQL も、LegacySQL いずれも可能。 下記のように usingStandardSql() をつければ StandardSQL。 つけなければ LegacySQL。
p.apply(
BigQueryIO.readTableRows
.fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
.usingStandardSql();
Q. BigQuery のテーブルに出力とすることはできる?
できる。
Q. BigQuery のテーブルを新規作成することも、既存テーブルへの追記もできる?
いずれも可能。新規作成時はテーブルスキーマを指定しないといけない。
Q. すでに存在する BigQuery のテーブルを丸ごと置き換えることはできる?
できる。下記のように WriteDisposition の WRITE_TRUNCATE を指定すればよい。
.apply(BigQueryIO.writeTableRows()
.to(TableDestination("mydataset:mytable", "description"))
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
Q. データ内容に応じて複数のテーブルに振り分けることはできる?
できる。下記参照。
- Cloud Dataflow(Beam)で Pub/Sub からメッセージを受信して複数の動的な BigQuery テーブルへの書き出しを行う
- Dataflowでデータに応じてBigQueryのテーブルを振り分け
- 有り金溶かさないように Dataflow で BigQuery テーブルを分割する
Q. BigQuery のテーブルに merge 文を使って重複除去しつつ出力することはできる?
できないと思われる。Dataflow でやるなら、出力したいデータと、出力先のデータを CoGroupBy で結合し、重複除去した上で、別テーブルに書き出すことになる。
クォータ・リミット・制限
どんなクォータ・リミット・制限がある?
引っかかりやすいものは下記。Dataflow の制限と、起動する GCE の制限両方を確認すること。
- Dataflow が使用する GCE 1000インスタンス
- Dataflow ジョブ同時実行数 25
- GCE IPアドレス 50個
- GCE SSD 合計サイズ 2TB
詳細はこちら https://cloud.google.com/dataflow/quotas
いずれも制限緩和申請が可能。
プロファイリング
Dataflow のボトルネックを解析したい。プロファイリングできる?
できる。 あらかじめ GCS にバケット my-dataflow-profile を作成しておき、 パイプラインオプションに –saveProfilesToGcs=gs://my-dataflow-profile/bar などと指定する。 パイプライン実行後、GCS のファイルを取得し、pprof でプロファイリング結果を見ることができる。
下記は Dataflow とは関係はないが、pprof の Web UI の紹介記事。めちゃくちゃ便利とのことだが、 これは確かにすごく非常に便利。
Go言語のプロファイリングツール、pprofのWeb UIがめちゃくちゃ便利なので紹介する
その他 (未整理)
Dataflow shuffle サービスって何
一般用語としての MapReduce において、Map と Reduce の間に位置するのが shuffle。 Dataflow で言うと、GroupByKey がまさに shuffle。 集約処理が必要なので、分散処理には向かない部分。 よって、GCE 内ではなく GCP のリソースで請け負ってあげましょうというのが Dataflow shuffle サービス。
参考 https://speakerdeck.com/syucream/production-ready-stream-data-pipeline-in-merpay-inc (メルペイ事例)