データ基盤をサーバーレスで構築したので概要を紹介

あけましておめでとうございます。本年もよろしくお願いいたします。

久しぶりに登場しました菊池です。

僕は昨年から新しいデータ基盤を構築するプロジェクトを担当しておりまして、最近システムが無事に実稼働してホッと一息したところです。思い起こせば入社時はインフラ担当部署に配属だったのが、広告配信システムの開発をやったり、カジュアルゲーム作ったり。新規事業のスマホアプリを作りつつサーバーサイドの API を作って立ち上げたり、海外向けのサービスを作ったり。いつのまにかメディア運営に関わったりしてきましたが、最近はデータ基盤の開発もやってます。そんなキャリアを歩んできましたが、いつか森の中の開けた草原にあるネット環境の整ったポツンと一軒家で、庭にチャボを放飼にしつつ養蜂をやってみたいと思っています。

話は戻りますが、今回はこの稼働したてホカホカ状態のデータ基盤について概要を紹介したいと思います。よろしくおねがいします。

システム概要

今回構築したデータ基盤は、外部の各種 API を定期的に呼び出してデータを収集して BigQuery に入れて、レポーティングや分析に使いやすい形式に整えて社内へ提供するものです。最近ではよくあるデータ基盤かなと思います。

このシステムの設計にあたって、念頭においていたのが以下の内容です。

  • 取得対象 API のバージョンアップに楽々対応
  • 取得対象 API の新規追加も簡単対応
  • 取得対象 API の Rate Limit を守った安全運転
  • 取得対象 API へのリクエスト状況が簡単に見える安心設計
  • サーバーレスで実装したオトクな運用料金
  • などなど

それぞれ、こうなってたら楽なのにな! と思っていた部分を念頭に置いて設計しました。結果、だいたい実現できたんじゃないかな? という感想です。とはいえ、実運用に入るとまた予想してなかった用件も出てくるものなので、これからな部分も出てくると思いますが、今のところ想定の範囲内で対応できています。

構築にあたって、このシステムは以下の図のように三つのシステムで構成することにしました。

f:id:AdwaysEngineerBlog:20220107102011p:plain

それぞれ後ほどやや詳しく書きますが、

  • データ収集システム
  • データインポートシステム
  • 活用データ作成システム

の三つになります。それぞれ、取得対象 API からデータを集めるシステム。集めたデータを BigQuery にインポートするシステム。BigQuery にインポートしたデータを活用しやすい形式に整えて提供するシステムになります。それでは、ひとつづつ概要を紹介していきましょう。

データ収集システム

このシステムは取得対象 API を呼び出して JSON や CSV データを取得し、取得したデータをそのまま Google Cloud Storage に保存するシステムになります。取得したデータを直接 Google Cloud BigQuery にインサートする方法も取れますが、取得対象 API のレスポンスデータは予告なしに項目が増減する場合があるので、取得したデータを一旦そのままファイルとして保存する構成にしています。

ここで気をつけたいのが、取得対象 API には秒間や分間のリクエスト数を制限する Rate Limit が存在することです。この Rate Limit を超えて大量にリクエストを送信すると、一定期間リクエストの受付を停止してエラーレスポンスが返ってくるようになるので、API の呼び出しリクエストを Rate Limit 内に収める必要があります。

サーバーレスで Rate Limit を守った取得対象の API 呼び出しを実現するために、ここは以下の技術要素で構成しました。

それぞれ、大まかにどのような使い方をしているのかというと

Google Cloud Functions

取得対象の API を呼び出して Google Cloud Storage に保存する部分になります。引数に取得対象の URL や保存先の Google Cloud Storage バケット名などを指定して呼び出すと、対象 API を呼び出してレスポンスデータを Google Cloud Storage に保存するようになっています。ひとつの Google Cloud Functions 呼び出しが、ひとつの取得対象 API 呼び出しになるように実装しています。

取得対象 API によっては親子関係があるので、親項目となる API を呼び出して子項目の ID リストを取得し、子項目を取得するタスクをひとつづつ Google Cloud Tasks に登録するような実装になっています。

また Google Cloud Functions の最大実行時間には制限があるので、時間がかかる処理は細かく分割して Google Cloud Functions でパラレルに実行するような工夫をしています。

Google Cloud Tasks

取得対象 API 呼び出しには、単位時間あたりのリクエスト数の制限があります。この制限を守ってリクエストを送信するために、この Google Cloud Tasks を使っています。

取得対象の API 呼び出しは前述の Google Cloud Functions で実行しますが、全てこの Google Cloud Tasks を経由することで Rate Limit を考慮した取得対象 API の呼び出しが行われるようになっています。

Google Cloud Storage

Google Cloud Functions で取得したデータを JSON や CSV で保存しています。保存の際にはレスポンスデータだけでなく、リクエスト時のパラメータや実行時間などの情報も併せて保存するようにしています。

取得したデータは取得対象 API の URL にあるホスト名やパス名をそのまま Google Cloud Storage のパス名に反映させて保存するようにしています。取得対象 API のドキュメントにあるエンドポイントの URL と Google Cloud Storage のパス名が対応するようになっているので、内容を理解しやすいのではと感じています。

Google Cloud BigQuery

Google Cloud Tasks 経由で Google Cloud Functions を呼び出した場合、API 呼び出し全体の処理がどの程度進んでいるのか判断するのが難しくなります。このため、取得対象 API 呼び出し毎に以下の内容を BigQuery にストリーミングインサートしています。

  • Google Cloud Tasks にタスク登録
  • Google Cloud Functions で実行開始
  • Google Cloud Functions で実行終了 (正常終了/異常終了)

これにより、登録済みタスク数と実行終了済みタスク数の数を見て、全体の処理が終了したのか判定できるようにしています。

Google Cloud Workflows

取得対象 API 呼び出しを開始して、全体の処理を管理するために使っています。具体的には、

  • データ取得処理のブートストラップとなる最初の Cloud Tasks 登録
  • 全ての処理が終了するまで Google Cloud BigQuery のタスク数をチェックして待つポーリング処理
  • 全ての処理が終了したら後述のデータインポート処理を開始

といった内容になっています。Google Cloud Workflows は扱うデータ量のサイズに制限があったりしますが、うまく使うと Airflow や Google Cloud Composer のようにクラスタ管理をしなくて済むので、料金的にも運用面でもメリットがあるように感じています。

データインポートシステム

このシステムはデータ収集システムが Google Cloud Storage に保存した JSON や CSV データを Google Cloud BigQuery のテーブルにインサートするものです。

当初は Google Cloud Dataflow で実装しようと思っていましたが、よくよく調べたら Google Cloud Workflows の設定ファイルを書くだけで実装できたので、当初想定していた実装時間をだいぶ圧縮することができました。

ここで使っている技術要素は以下の通りです。

それぞれ、大まかにどのような使い方をしているのかというと

Google Cloud Storage

前述のデータ収集システムが取得した JSON や CSV のファイルが保存されています。

Google Cloud BigQuery

Google Cloud BigQuery には JSON データの配列やオブジェクトの形式をそのままインポートできるような、繰り返し項目やレコード形式のスキーマ定義ができるようになっています。これを利用して、データ収集システムが Google Cloud Storage に保存した JSON や CSV のファイル形式をそのまま Google Cloud BigQuery のテーブルスキーマに反映させてインポートしています。

Google Cloud Workflows

インポート処理は Google Cloud Workflows の BigQuery API Connector を使って実装しています。Google Cloud Storage に保存されているインポート対象のファイルをワイルドカードで指定して、インポート対象になる Google Cloud BigQuery のテーブルスキーマ定義に従ってインポートしています。Google Cloud Workflows の YAML 定義を記述するだけで実装できるのでとても簡単でした。処理時間も結構早い印象です。

活用データ作成システム

データインポートシステムが Google Cloud BigQuery にインポートしたデータは JSON 形式をそのまま反映させたテーブルになっています。これを活用しやすい形式に変換したテーブルを作ったり、各種レポートデータとして集計したテーブルを作成するようなデータ変換パイプラインを実行するシステムになります。

ここで使っている技術要素は以下の通りです。

それぞれ、大まかにどのような使い方をしているのかというと

Google Cloud BigQuery

変換前の生データがテーブルとして保存されています。また、Dataform で変換された活用しやすい形式や集計されたレポートデータのテーブルやビューも保存されています。

Dataform

Google Cloud BigQuery と連携して、データ変換パイプラインを SQL を若干拡張した SQLX で記述し定期実行できるサービスです。このサービスを使って、JSON フォーマットをそのまま反映させた形式で保存されているテーブルのデータをフラットで活用しやすい形式に変換したり、レポートデータを生成するための各種集計処理を行うパイプラインを構築しています。

Dataform では SQL の SELECT 文でデータ変換プロセスを記述することができ、変換後のデータはテーブルやビューとして参照することができます。これらのテーブルやビューの依存関係の全体像をツリー構造としてビジュアルで参照する機能があります。また、それぞれのテーブルやビューの一意性が保たれているかチェックしたり、変換プロセスの途中でレポートの数値が合っているかチェックするような assertion を記述することができます。

データ変換処理を SQL で宣言的に記述することができて、assertion で途中結果をチェックすることができるので、Python 等でデータ変換処理を書くよりも生産性が高いように感じました。最近は、とりあえずデータを Google Cloud BigQuery に入れてしまって、それを Dataform などで加工して使うのが楽なように思います。

感想

このプロジェクトを通して、必要なロジックや設定だけをクラウド環境にアップして、システムを構築するような技術環境が既に整ってると確認することができました。また、実際に必要な量だけコンピュータ資源を利用する構成になったので、料金的にもだいぶリーズナブルに稼働させることができています。具体的な金額は差し控えますがだいぶ安価です。しかも、サーバーインスタンスやクラスタの管理からも解放されるので運用も楽になったように思います。

クラウドサービスを前提としたサーバーレスのシステム開発では、実行時間やメモリー容量に制限があったり冪等性を求められるなどの制限があります。しかし、その辺りをスマートに解決できると技術力も高まるし面白味も出てくるのではと思います。また、同じ仕様のシステムでも作り方で料金面の差が出やすい世界だなと感じています。

さて今回のプロジェクトで手元に面白いデータが集まったので、今後は Google Colaboratory で予測したり傾向見たりしてみるのも面白そうだなと思っています。また、このシステムの稼働状況をモニタするダッシュボードを Google データポータルで作るのも良さそう。

それではみなさんごきげんよう。今年もよろしくおねがいします。菊池でした。