Da Vinci Studio 分析チームの末安です。
今回は 2021 年 1 月に公開した ETL ライブラリ inbulk について紹介します。
inbulk でできること
- ジョブを簡潔に定義できる
- 差分実行できる
- それらの処理を BigQuery 内で完結できる
ざっくり言うと、ほとんど yaml で定義できる BigQuery の API の wrapper のようなものになっています。
背景
分析チームでは BigQuery を利用することが多く、BigQuery 内のテーブルを加工して別のテーブルを作るといった作業がよく発生します。
以前まで、こういった処理は BigQuery の機能である Scheduled Query を利用して行っていましたが、利用していて幾つかの問題が見えてきました。
1. 依存関係を定義できない
一つの処理が終わった後に別のテーブルを作りたいということはよくあると思います。
Scheduled Query ではそういった依存関係は定義できないため、実行時間をずらすことでどうにか実現していましたが、最初の処理が失敗しても次の処理が実行されてしまうという問題がありました。
2. 差分実行のコストが大きい
バッチ処理においても冪等性を考慮することは重要だと思います。
これも BigQuery 上のみで実現できなくはないですが、宛先テーブルを全スキャンして新規追加分マージしてからそのテーブルを置換する必要がありました。
この方法の場合、宛先テーブルのデータ量が多いとかなりのコストがかかってしまうという問題がありました。
3. レビューしづらい
当時は BigQuery の UI 上で Scheduled Query を設定していたため、レビューがうまくできない状態でした。
この問題に関しては Terraform などで管理すれば解決できそうではありますが、こういった管理方法の場合、GitHub 上の定義と実態に差異が発生してしまうことが多いことと、他にも解決できない課題があったため別の手段を探すことにしました。
調査
1 の依存関係を定義できない問題は、ワークフローエンジンを利用することで解決できるのでそちらに移行することにしました。
差分実行とジョブ定義の実現方法についても調査しましたが、それらしきものは見つかりませんでした。
Embulk を利用すれば実現はできそうでしたが、Embulk 自体はデータ転送ツールであり、今回のケースでは BigQuery 内で完結できる処理のため、一度 BigQuery からデータをダウンロードして再び BigQuery に入れるという内部の流れが個人的に許せませんでした。
inbulk
そんなわけで、ETL ライブラリ inbulk を OSS として開発しました。
特徴
定義
このように Embulk っぽい記述でジョブを定義できます。
Embulk と違うのは、データを実行環境に一切ダウンロードしない点です。そのためジョブ実行するマシンのメモリ消費を抑えることができます。
init: credential-file: /.gcp/credential.json service: bigquery in: query: | select id from Dataset.users out: project: project database: Destination table: users mode: append
変数
inbulk では、事前に別のクエリを実行してその結果をクエリに組み込むことができます。
具体的には、下記のように in
のセクションの vars
で組み込みたいデータを指定して query
のなかで ${変数名}
とすることでクエリに組み込みます。
init: credential-file: /.gcp/credential.json service: bigquery in: query: | select id from Dataset.users where created > '${last_modified}' vars: - name: last_modified database: Destination table: users mode: meta field: last_modified_time out: project: project database: Destination table: users mode: append
上記の例では、last_modified
という名前で宛先テーブルの最終更新日を BigQuery のメタデータテーブルから取得して利用しています。これにより差分実行が低コストで実現できます。
また、他のテーブルを参照することも可能で、vars
に下記のように指定すると、宛先テーブルの最後の id を取得するようなこともできます。
vars: - name: last_id database: Destination table: users mode: max field: id
マージ
マージ機能も用意しています。マージモードでは、指定したフィールドが重複する場合は、指定したオーダーで最初に来るもので置き換えます。 下記の例では、id が重複した場合、最終更新日が新しいレコードで置き換えるような設定になっています。
ただし、マージモードでは内部的には宛先テーブルを全スキャンしてリプレイスしており、宛先テーブルのデータ量が多い場合はスキャン量が多くなってしまうので注意が必要です。
init: credential-file: /.gcp/credential.json service: bigquery in: query: | select id from Dataset.users out: project: project database: Destination table: users mode: merge merge: order: - column: modified desc: True keys: - id
パーティション
下記の例のように、宛先テーブルのパーティションも定義することができます。
init: credential-file: /.gcp/credential.json service: bigquery in: query: | select id from Dataset.users out: project: project database: Destination table: users mode: append partition: mode: time field: created type: DAY
利用例
日毎に新規ユーザー数を集計して保存しておきたい
宛先テーブルの最新の created
以降に作成された未集計の users を集計する
init: credential-file: /.gcp/credential.json service: bigquery in: query: | select date(created) created, count(distinct id) user_count, from Sample.users where date(created) > date('${last_created}') and date(created) <= date_sub(current_date, interval 1 day) vars: - name: last_created database: Destination table: new_users mode: max field: created out: project: project database: Destination table: new_users mode: append partition: mode: time field: created type: MONTH
特定のイベントのみ整形して保存しておきたい
宛先テーブルの最終更新日から昨日までのデータを append するように定義する
init: credential-file: /.gcp/credential.json service: bigquery in: query: | select timestamp_micros(event_timestamp) event_time, user_id, user_pseudo_id, (select value.string_value from unnest(event_params) event_param where key = 'screen_class') screen_class, device.category device_category, device.operating_system os, device.operating_system_version os_version, from Sample.events where event_name = 'screen_view' and date(timestamp_micros(event_timestamp)) between date('${last_modified}') and date_sub(current_date, interval 1 day) vars: - name: last_modified database: Destination table: screen_views mode: meta field: last_modified_time out: project: project database: Destination table: screen_views mode: append partition: mode: time field: event_time type: DAY
まとめ
今回は、ETL ライブラリの inbulk について紹介しました。 inbulk を導入したことによって、課題であった差分実行とレビューのしづらさを解消することができました。
今は BigQuery 専用となっていますが、追々 Redshift などにも対応していきたいと考えています。
なにか要望などあれば気軽に Issue をあげてもらえると嬉しいです。
We are hiring!!
Da Vinci Studio では一緒に働ける仲間を絶賛大募集中です!募集職種と詳細に関しては、以下のリンクからそれぞれ確認できます。