Da Vinci Studio Blog

株式会社 Da Vinci Studio は 2023 年 7 月に株式会社 Zaim と統合し、株式会社くふう AI スタジオになりました

BigQuery 用 ETL ライブラリ inbulk を公開したので紹介する

Da Vinci Studio 分析チームの末安です。
今回は 2021 年 1 月に公開した ETL ライブラリ inbulk について紹介します。

github.com

inbulk でできること

  • ジョブを簡潔に定義できる
  • 差分実行できる
  • それらの処理を BigQuery 内で完結できる

ざっくり言うと、ほとんど yaml で定義できる BigQuery の API の wrapper のようなものになっています。

背景

分析チームでは BigQuery を利用することが多く、BigQuery 内のテーブルを加工して別のテーブルを作るといった作業がよく発生します。

以前まで、こういった処理は BigQuery の機能である Scheduled Query を利用して行っていましたが、利用していて幾つかの問題が見えてきました。

1. 依存関係を定義できない

一つの処理が終わった後に別のテーブルを作りたいということはよくあると思います。

Scheduled Query ではそういった依存関係は定義できないため、実行時間をずらすことでどうにか実現していましたが、最初の処理が失敗しても次の処理が実行されてしまうという問題がありました。

2. 差分実行のコストが大きい

バッチ処理においても冪等性を考慮することは重要だと思います。

これも BigQuery 上のみで実現できなくはないですが、宛先テーブルを全スキャンして新規追加分マージしてからそのテーブルを置換する必要がありました。

この方法の場合、宛先テーブルのデータ量が多いとかなりのコストがかかってしまうという問題がありました。

3. レビューしづらい

当時は BigQuery の UI 上で Scheduled Query を設定していたため、レビューがうまくできない状態でした。

この問題に関しては Terraform などで管理すれば解決できそうではありますが、こういった管理方法の場合、GitHub 上の定義と実態に差異が発生してしまうことが多いことと、他にも解決できない課題があったため別の手段を探すことにしました。

調査

1 の依存関係を定義できない問題は、ワークフローエンジンを利用することで解決できるのでそちらに移行することにしました。

差分実行とジョブ定義の実現方法についても調査しましたが、それらしきものは見つかりませんでした。

Embulk を利用すれば実現はできそうでしたが、Embulk 自体はデータ転送ツールであり、今回のケースでは BigQuery 内で完結できる処理のため、一度 BigQuery からデータをダウンロードして再び BigQuery に入れるという内部の流れが個人的に許せませんでした。

inbulk

そんなわけで、ETL ライブラリ inbulk を OSS として開発しました。

特徴

定義

このように Embulk っぽい記述でジョブを定義できます。

Embulk と違うのは、データを実行環境に一切ダウンロードしない点です。そのためジョブ実行するマシンのメモリ消費を抑えることができます。

init:
  credential-file: /.gcp/credential.json
  service: bigquery
in:
  query: |
    select
      id
    from Dataset.users
out:
  project: project
  database: Destination
  table: users
  mode: append

変数

inbulk では、事前に別のクエリを実行してその結果をクエリに組み込むことができます。 具体的には、下記のように in のセクションの vars で組み込みたいデータを指定して query のなかで ${変数名} とすることでクエリに組み込みます。

init:
  credential-file: /.gcp/credential.json
  service: bigquery
in:
  query: |
    select
      id
    from Dataset.users
    where created > '${last_modified}'
  vars:
    - name: last_modified
      database: Destination
      table: users
      mode: meta
      field: last_modified_time
out:
  project: project
  database: Destination
  table: users
  mode: append

上記の例では、last_modified という名前で宛先テーブルの最終更新日を BigQuery のメタデータテーブルから取得して利用しています。これにより差分実行が低コストで実現できます。

また、他のテーブルを参照することも可能で、vars に下記のように指定すると、宛先テーブルの最後の id を取得するようなこともできます。

  vars:
    - name: last_id
      database: Destination
      table: users
      mode: max
      field: id

マージ

マージ機能も用意しています。マージモードでは、指定したフィールドが重複する場合は、指定したオーダーで最初に来るもので置き換えます。 下記の例では、id が重複した場合、最終更新日が新しいレコードで置き換えるような設定になっています。

ただし、マージモードでは内部的には宛先テーブルを全スキャンしてリプレイスしており、宛先テーブルのデータ量が多い場合はスキャン量が多くなってしまうので注意が必要です。

init:
  credential-file: /.gcp/credential.json
  service: bigquery
in:
  query: |
    select
      id
    from Dataset.users
out:
  project: project
  database: Destination
  table: users
  mode: merge
  merge:
    order:
      - column: modified
        desc: True
    keys:
      - id

パーティション

下記の例のように、宛先テーブルのパーティションも定義することができます。

init:
  credential-file: /.gcp/credential.json
  service: bigquery
in:
  query: |
    select
      id
    from Dataset.users
out:
  project: project
  database: Destination
  table: users
  mode: append
  partition:
    mode: time
    field: created
    type: DAY

利用例

日毎に新規ユーザー数を集計して保存しておきたい

宛先テーブルの最新の created 以降に作成された未集計の users を集計する

init:
  credential-file: /.gcp/credential.json
  service: bigquery
in:
  query: |
    select
      date(created) created,
      count(distinct id) user_count,
    from Sample.users
    where
      date(created) > date('${last_created}')
      and date(created) <= date_sub(current_date, interval 1 day)
  vars:
    - name: last_created
      database: Destination
      table: new_users
      mode: max
      field: created
out:
  project: project
  database: Destination
  table: new_users
  mode: append
  partition:
    mode: time
    field: created
    type: MONTH

特定のイベントのみ整形して保存しておきたい

宛先テーブルの最終更新日から昨日までのデータを append するように定義する

init:
  credential-file: /.gcp/credential.json
  service: bigquery
in:
  query: |
    select
      timestamp_micros(event_timestamp) event_time,
      user_id,
      user_pseudo_id,
      (select value.string_value from unnest(event_params) event_param where key = 'screen_class') screen_class,
      device.category device_category,
      device.operating_system os,
      device.operating_system_version os_version,
    from Sample.events
    where
      event_name = 'screen_view'
      and date(timestamp_micros(event_timestamp)) between date('${last_modified}') and date_sub(current_date, interval 1 day)
  vars:
    - name: last_modified
      database: Destination
      table: screen_views
      mode: meta
      field: last_modified_time
out:
  project: project
  database: Destination
  table: screen_views
  mode: append
  partition:
    mode: time
    field: event_time
    type: DAY

まとめ

今回は、ETL ライブラリの inbulk について紹介しました。 inbulk を導入したことによって、課題であった差分実行とレビューのしづらさを解消することができました。

今は BigQuery 専用となっていますが、追々 Redshift などにも対応していきたいと考えています。

なにか要望などあれば気軽に Issue をあげてもらえると嬉しいです。

We are hiring!!

Da Vinci Studio では一緒に働ける仲間を絶賛大募集中です!募集職種と詳細に関しては、以下のリンクからそれぞれ確認できます。