サンタさん と BigData

Adways Advent Calendar 2018 15日目の記事です。

http://blog.engineer.adways.net/entry/advent_calendar_2018


 

こんにちは。もうすぐクリスマスですね。

f:id:AdwaysEngineerBlog:20181220110021p:plain

子どもの頃、クリスマスが近づくと、両親にこう言われました。
「いい子にしていないとサンタさんがプレゼントをくれないよ」
と。

大人になり、そしてシステムエンジニアとなった僕は、今こう思います。
「サンタさんは、どのように大量の子どもの行動ログからプレゼントをあげるかどうかの判断を高速に処理しているのか」
と。

プレゼント対象になる子どもの数は日本だけで考えても相当数いるはずです。そして、サンタさんはそんな子ども達へのプレゼント配布を1晩で行わなければいけません。相当な仕事量であることは容易に想像がつきます。
その中でさらに、プレゼントをあげる・あげないの判断を下しているわけです。
これは裏で相当でかいシステムが動いているのでは、と僕は推測しました。

ということで、サンタさんが子どもの行動ログというビッグデータを処理し、プレゼントをあげるかどうかの判断を下すアプリケーションを構築してみようと思います。

概要

「大量の子どもの行動ログからプレゼントをあげるかどうかの判断を高速に処理」
こちらの仕様を満たすため、主に2つのサブシステムを構築する必要があります。

  • 大量の子ども行動ログを保存するためのデータウェアハウス
  • リクエストを受け取り、保存したデータから高速で対象の子どもがプレゼントをあげる対象かどうかのレスポンスを返すアプリケーション

この仕様を満たすシステムを、AWSのリソース使い構築してみます。

データウェアハウス

はじめに、全ての元となる、行動ログの収集するデータウェアハウスを構築してみます。

どのようなリソースを選択するかの意思決定のために、想定されるログの規模を検討します。
子どもの数 x 1日あたりの行動ログ数 x 日数 = 想定されるログ数
で出します。

まず、子どもの数を検討します。
総務省によると、0 ~ 14歳の子どもの男女合計が約1571万人だそうです。この数字を使います。

次に、1日あたりの行動ログ数です。
これはさっぱりわからないので、それぞれの子どもが毎日5回何かしらのクリスマスに影響のある、善悪に関わる行動を起こすと仮定します。

最後に、必要な期間を考えます。
今回は、1月1日 ~ 12月24日とします。去年悪い子でプレゼントをもらえなかった子ども達が心入れ替える可能性を考慮し、今年のデータのみを有効とします。

つまり、
15,710,000 x 5 x 358 = 28,120,900,000
合計約300億レコードになると仮定します。

単純に分割すると、
1人あたり: 約1800レコード
1月あたり: 約27億
1日あたり: 約9000万
1時間あたり: 約360万
1分あたり: 約6万レコード
1秒あたり: 約1000レコード
なかなかのデータ量です。

ログの規模が予想できたので、次にログの取集方法についてです。
バッチ処理かストリーム処理かになると思うのですが、今回は、ストリーム処理を採用しようと思います。
何故ならば、クリスマスが近づき、ギリギリになって善行を重ねる子どもがいないとも限らないので、できるだけリアルタイムでデータを見れるようにしたいからです。

よって、以下の仕様を満たすデータウェアハウスを構築します。

  • 300億のレコードを保存できる
  • ストリーム処理

ストリーム処理ですが、これはkinesisにします。

1つのシャードは 1秒あたり最大 1 MiB のデータを取り込むことができ (パーティションキーを含む)、書き込みについては 1秒あたり 1,000 レコードを取り込むことができます。

と書いてあるので、おそらく少ないシャードでも余裕でしょう。

データの保存場所ですが、S3にします。S3ならばデータ入れ放題なので、楽だと予想できるためです。

データの保存場所、保存方法が決まったので、データについて検討していきます。

まず、リクエストの形式を決めておきます。 以下のようにくるとしましょう。

{name: 'yamada taro', address: 'tokyo shinjukuku nishisinjuku 8 17 1', good_thing: true}

次に、大切なデータ構造ですが、最終的には子どもれぞれのデータを取り出したいので、子どもそれぞれにユニークなIDを生成する必要があります。
1つの住所に同じ名前の子どもは1人ときめて、リクエストの住所と名前から、hash化してIDを作ってレコードの中に入れておきます。

なので、このような構成になります。

f:id:AdwaysEngineerBlog:20181220110124p:plain

実際に設定していきます。

kinesis data stream

data streamは、リクエストを受け取り、任意の場所に流すことができます。
ポチポチと設定します。 とはいっても、

Kinesis ストリームの名前: action (任意です)
シャード数: 2

を設定して、終わりです。
1シャードで秒間1000リクエストをさばけるらしいので、予備で2にしてば余裕でしょう。

kinesis firehose

firehoseは、data streamと繋がり、データを一時的にため、必要であれば形式を変え、任意のストレージに保存します。

ここで、firehoseの機能を使い、データを扱いやすくします。
firehoseにはLambdaを呼び出し、送られてきたリクエストの形式を少し変えることができます。
この機能を使い、ここで送られてきたリクエストを加工して、子どもそれぞれのIDを生成した上で、S3に送ります。

Lambda
名前と住所からidを生成して、S3にデータを送ります。 せっかくなのでRubyで書きます。

require 'json'
require 'digest/md5'
require 'base64'

def lambda_handler(event:, context:)
    records = event["records"].map do |record|
        data = JSON.parse(Base64.decode64(record["data"]))
        name    = data["name"]
        address = data["address"]
        id      = Digest::MD5.new.update(name + address).to_s
        t       = Time.now
        data = data.merge({"id" => id, "year" => t.strftime("%Y"), "month" => t.strftime("%m"), "day" => t.strftime("%d"), "dt" => t.strftime("%Y-%m-%dT%H"), "created_at" => t})
        {
            "recordId" => record["recordId"],
            "result" => "Ok",
            "data" => Base64.encode64(JSON.generate(data)) +'Cg=='
        }
    end
    
    {records: records}
end

ここで注意なのが、Timeoutを1min以上にしないと、firehoseで設定するところで怒られます。

firehose
Transform source records with AWS Lambda に、上で作ったLambdaを設定します。
Amazon S3 destination に、任意のS3 Bucketを設定します。

テスト

request

aws kinesis put-record --stream-name action --data '{"name": "yamada taro", "address": "tokyo shinjukuku nishisinjuku 8 17 1", "good_thing": false}' --partition-key foo

S3に入った事を確認して、

ファイルの中身

{"name":"yamada taro","address":"tokyo shinjukuku nishisinjuku 8 17 1","good_thing":true,"id":"d2a8ef3f8d3775d58b1c9fe24fa45e27","year":"2018","month":"12","day":"19","dt":"2018-12-19T15","created_at":"2018-12-19 15:33:49 +0000"}

いい感じですね。
改行コードも入れているので、複数行が入っても大丈夫です。

アプリケーション

次に、先ほど作ったデータを使い、プレゼントをあげるべきなのかを即座にレスポンスとして返すアプリケーションが必要です。

概要

構築するアプリケーションは、サンタさんが住所・名前を送信し、過去の行動ログからプレゼントをあげる基準を満たしているかどうかをレスポンスとして受け取るアプリケーションです。

前提として、

  • 過去の行動ログは変更されない。

とします。

仕様は、

  • 12月から遠ければ遠いほどいいスコアをつける。(クリスマス直前のいい事とクリスマスと無関係な時期のいい事の重みを変えるため)
  • 待ち時間がないように、高速でレスポンスを返す

以上の事を踏まえて、アプリケーションを構築します。

構築

完成のイメージは、以下のような感じです。

f:id:AdwaysEngineerBlog:20181220110111p:plain

このDB、となっているところをどうするか、が肝になります。
今回は、S3上の複数のファイルを集計して、スコアを算出する必要があります。
しかし、データ量が非常に多くなるため、リクエストのたびにデータ全てを集計していたら、絶対に無理です。

なので、リクエストごとに全データからの集計は候補から除外します。

ここで大事なのは過去のログは変更がない、ということです。
つまり、あらかじめ子どもそれぞれのデータを集計をしておけば、1571万レコードで済むはずです。
これぐらいならば、NoSQLで高速に処理できるのではないでしょうか。

なので、kinesisからS3にデータが入ったタイミングで、DynamoDBのレコード更新をかけます。

つまり、このような感じであれば、非常にスムーズに処理ができそうな気がします。

f:id:AdwaysEngineerBlog:20181220110045p:plain

構築していきます。

DynamoDB

AWSが提供するNoSQLです。
非常に速いようですし、求める規模までスケールさせることができるので、大丈夫でしょう。
カラムあたりのバイト数などの制限がありますが、今回は非常に小さなデータに対してアップデートを繰り返すことになるので、あっているのではないでしょうか。

Table作成
ぽちぽちと作成します。
今回はidだけがindexとして動けばいいです。

DynamoDBでは読み込み・書き込みそれぞれのキャパシティーユニットを設定して、性能をプロビジョニングしておく必要があります。

1 つの読み込みキャパシティーユニットは、最大サイズ 4 KB の項目について、1 秒あたり 1 回の強力な整合性のある読み込み、あるいは 1 秒あたり 2 回の結果整合性のある読み込みを表します。

1 つの書き込みキャパシティーユニットは、最大でサイズが 1 KB までの項目について、1 秒あたり 1 回の書き込みを表します。

このように書かれています。

まず、今回は1レコードあたり約250byteになります。
そして、最大で秒間1000レコードのupdateが発生するとしています。
update時に書き込み・読み込み両方する必要があるため、
計算すると、以下のようになります。

書き込み: 250
読み込み: 62

しかし、夜中はほとんどアクセスがないと考え、

書き込み: 200
読み込み: 50

と設定しておきます。

そしてAutoScalingで、各最大でも耐えられるくらいを設定しておきます。

必要無かったら下げる感じですかね。

Lambda

次に、S3のPUT通知を受け取り、DynamoDBにデータを入れるLambdaを用意します。
ここでは、DynamoDBに入れる際に、レコードがあった場合アップデートをしていきます。

受け取ったリクエスト中のgood_thingがtrueの場合加算し、そうでない場合は減算する事とします。
そして先述の通り、時期によって重みを変えたいので、数字を時期によって変えようと思います。

require 'json'
require 'aws-sdk'
require 'time'

def score(t)
    case t
    when (Time.parse('1/1')..Time.parse('3/31'))
        3
    when (Time.parse('4/1')..Time.parse('6/31'))
        2.5
    when (Time.parse('7/1')..Time.parse('9/31'))
        2
    when (Time.parse('10/1')..Time.parse('12/10'))
        1.5
    when (Time.parse('12/10')..Time.parse('12/25'))
        1
    end
end

def lambda_handler(event:, context:)
    s3 = Aws::S3::Client.new
    dynamo = Aws::DynamoDB::Client.new
    table_name = "aggr_children_action_day"

    s3.get_object(:bucket => 'kinesis-action-records', :key => event["Records"][0]["s3"]["object"]["key"]).body.read.split("\n").each do |record|
        record = JSON.parse(record)
        t = Time.parse(record['created_at'])
        s = record["good_thing"] ? score(t) : -(score(t))
        id = record["id"]
        resp = dynamo.get_item({
            key: {
                "id": id,
            },
            table_name: table_name, 
        })
        if !resp.empty?
            dynamo.update_item(
                table_name: table_name,
                key: {"id" => id},
                attribute_updates: {
                    "score" => {
                        value: resp.to_h[:item]["score"].to_i + s,
                        action: "PUT",
                    },
                }
            )
        else
            new_record = record.merge({created_at: Time.now.to_i, score: s, id: id})
            new_record.delete('good_thing')
            dynamo.put_item(
                table_name: table_name,
                item: new_record
            )
        end
    end

    {}
end

S3

次に、firehoseを構築するときに作ったS3のバケットに、Notificationの設定をして上のLambdaを実行します。
これでKinesisにデータを流すと、DynamoDBにデータがアップデートされます。

テスト

想定の約300分の1の
5万レコードを入れてクエリーを実行してみます。

$ aws dynamodb scan --table-name aggr_children_action_day --select "COUNT"
{
    "Count": 50000,
    "ScannedCount": 50000,
    "ConsumedCapacity": null
}
$ time aws dynamodb get-item --table-name aggr_children_action_day  --key '{ "id": {"S": "0dbd1a75e0610c7f4eb2c2088857c9fe" }  }'

...

real    0m1.232s
user    0m0.425s
sys 0m0.096s

100レコードだけ入れたテーブルへの検索と比べても、変わらないので、おそらく増えても大丈夫でしょう。

$ aws dynamodb scan --table-name aggr_children_action_day_2 --select "COUNT"
{
    "Count": 100,
    "ScannedCount": 100,
    "ConsumedCapacity": null
}
$ time aws dynamodb get-item --table-name aggr_children_action_day_2  --key '{ "id": {"S": "0a26e3ca1b2ad83629f800a7e3633859" }  }'

...

real    0m1.257s
user    0m0.447s
sys 0m0.097s

レイテンシーを 10 ミリ秒未満に維持でき

とドキュメントに書いてあるので、おそらく単一の読み込みでindexを使って検索すれば、速度は維持できるのでしょう。

Lambda

最後に、リクエストを受け取り、プレゼントをあげるかどうかの判断をするLambdaを作ります。
いい子の閾値が難しいのですが、、、ここはとりあえず、以下のように定義します。
- いい事悪い事の1日の合計行動数を5として、1年で約1800回
- 1800回のうち、6割いい子だったらいい子とする。
- しかし、時期によって点数に重みがあるので、1800回のうちの6割かつそれぞれ均等にスコアをとった状態をいい子とする。

1800 * 0.6 / 5 = 216
216 * 3 + 216 * 2.5 + 216 * 2 + 216 * 1.5 + 216 * 1 = 2160

1800 * 0.4 / 5 = 144
144 * 3 + 144 * 2.5 + 144 * 2 + 144 * 1.5 + 144 * 1 = 1440

2160 - 1440 = 720

よって、720を閾値とします。
以下のようなLabmdaになりました。

require 'json'
require 'digest/md5'
require 'aws-sdk'

def lambda_handler(event:, context:)
    # TODO implement
    param   = event["queryStringParameters"]
    name    = param["name"]
    address = param["address"]
    id      = Digest::MD5.new.update(name + address).to_s
    p id
    dynamo = Aws::DynamoDB::Client.new
    table_name = "aggr_children_action_day"
    resp = dynamo.get_item({
                key: {
                    "id": id,
                },
                table_name: table_name, 
            })

    r = resp.to_h[:item]["score"] > 720
    
    { good: r }
end

これでAPIGatewayと繋げれば、レスポンスが以下のように返ってきます。

{
  "good": true # or false
}

分析

せっかくログをとっておいているので、分析できるようにしてみましょう。

Athena

Athenaは、S3に保存されている構造化されたデータをSQLを使って集計したりすることができるサービスです。
今回、元のデータはjsonで保存しているので、Athenaが使えます。

  • Table構造 *
CREATE EXTERNAL TABLE IF NOT EXISTS santa.actions (
  `id` string,
  `name` string,
  `address` string,
  `good_thing` boolean,
  `dt` date,
  `created_at` date 
) PARTITIONED BY (
  year string,
  month string,
  day string 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://kinesis-action-records/actions/'
TBLPROPERTIES ('has_encrypted_data'='false');

これで、S3のs3://kinesis-action-records/actions/からデータを取り出すことができます。
さらに、日毎に集計することが予想されるため、パーティションを作ります。

ALTER TABLE actions ADD PARTITION (year='2018',month='12',day='19') location 's3://kinesis-action-records/actions/2018/12/19/'

検索速度維持のため、パーティションを毎日作る必要がありますが、これはCloudWatch Event + Lambdaとかで実行すればいいでしょう。

こんな感じで実行できます。

f:id:AdwaysEngineerBlog:20181220152846p:plain

BIツールとかにつなげば、いい感じに可視化もできるでしょう。

まとめ

最終的にはこのような構造のアプリケーションが出来上がりました。

f:id:AdwaysEngineerBlog:20181220110034p:plain

ストリーム処理をあまり触ったことがなかったため、今回はこのような事をしてみました。
比較的大きなデータでもストリーム処理かつAWSならば余裕で処理ができますね。

もしシステム化をお考えのサンタさん、いらっしゃったら声をかけてください。


  

次は山口さんの記事です。

http://blog.engineer.adways.net/entry/advent_calendar_2018/16