AWS Glueを使ってS3→データ加工→S3をやってみた

Adways Advent Calendar 2017 5日目の記事です。

http://blog.engineer.adways.net/entry/advent_calendar_2017


こんにちは、12月7日担当の足立です。

12月7日はクリスマスツリーの日みたいですね。

1886(明治19)年のこの日、横浜・明治屋に日本初のクリスマスツリーが飾られた。らしいです。

さて、本題に入ります。

今回はAWS Glueを業務で触ったので、それについて簡単に説明していきたいと思います。

AWS Glueとはなんぞや??

f:id:AdwaysEngineerBlog:20171207170557p:plain

AWS Glue は抽出、変換、ロード (ETL) を行う完全マネージド型のサービスで、お客様の分析用データの準備とロードを簡単にします。AWS マネジメントコンソールで数回クリックするだけで、ETL ジョブを作成および実行できます。AWS Glue では、AWS に保存されたデータを指定するだけで AWS Glue によるデータ検索が行われ、テーブル定義やスキーマなどの関連するメタデータが AWS Glue データカタログに保存されます。カタログに保存されたデータは、すぐに検索、クエリ、ETL で使用できます。AWS Glue では、データ変換とデータのロードプロセスを実行するコードが生成されます。

ざっくりAWS Glueで出来ること(認識範囲内)を説明すると

  • CrawlerでS3等からデータを抽出してデータカタログへ
  • データカタログでカラム名・型変更可
  • Jobで変換
    • コンソール上でポチポチするだけで自動である程度コードが生成される
    • 細かな変換がしたい場合 PySpark, Pythonで可
  • Jobでロード(S3等に変換したデータをアップロード)

今回のゴール

f:id:AdwaysEngineerBlog:20171207170636p:plain

流れ

  • 二つ用意したバケットのsample_glue_for_read/blog_sample_glue.csvからAWS GlueのCrawlerを使ってcsvからデータ抽出
  • データカタログから不要なカラムの削除
  • Jobを使ってデータ加工しsample_glue_for_resultに出力

やること

下記の内容のcsvから不要なカラムidとdummyを削除して
finalist_nameの個数 == 投票数とし、並びかえたものを出力したいと思います。

id,finalist_name,dummy
1,ジャルジャル,
2,かまいたち,
3,カミナリ,
4,マヂカルラブリー,
5,ミキ,
6,さや香,
7,とろサーモン,
8,和牛,
9,ゆにばーす,
10,ミキ,
11,とろサーモン,
12,和牛,
13,和牛,
14,とろサーモン,
15,とろサーモン,

AWS Glue実践

準備(IAMロールの作成)

マネジメントコンソール上で IAM→ロール→ロールの作成

  1. ロールの作成画面で「AWS サービス」を選択
  2. このロールを使用するサービスを選択で「Glue」を選択
  3. 次のステップへ
  4. ポリシーの選択一覧画面で「AmazonS3FullAccess」 「AWSGlueServiceRole」の二つを選択
  5. 次のステップへ
  6. ロール名の入力、今回は「AWSGlueServiceRoleDefault」と入力
  7. ロールの作成

詳しくはこちらを参照 dev.classmethod.jp

実践

AWS Glueのページへ

東京リージョンはまだないので、今回はバージニア北部リージョンで実践していきたいと思います。

Crawler設定・実行

CrawlerでS3からファイルを抽出し、Data catalogに入れるために設定をしていきたいと思います。

f:id:AdwaysEngineerBlog:20171207170935p:plain

CrawlersのAdd crawlerから

f:id:AdwaysEngineerBlog:20171207170947p:plain

Crawler nameにsample_glueと入力

f:id:AdwaysEngineerBlog:20171207171003p:plain

Data storeをS3に選択 Include pathに今回はs3://sample-glue-for-readと入力
右のフォルダアイコンからバケット指定出来ます。

f:id:AdwaysEngineerBlog:20171207171031p:plain

Noを選択

f:id:AdwaysEngineerBlog:20171207171046p:plain

準備で作成したIAMロールAWSGlueServiceRoleDefaultを選択

f:id:AdwaysEngineerBlog:20171207171113p:plain

今回はRun on demandを選択

f:id:AdwaysEngineerBlog:20171207171125p:plain

Add databaseを押してDatabaseを作成

作成したsample_glue_dbを選択

確認ページでFinishを押してCrawlerの設定終了

Crawler一覧画面で作成したsample_glueを選択し、Run crawlerで実行する。

Data catalogの確認

メニューバーのData catalog→Databases→Tablesから

Run crawler実行で作成されたsample_glue_for_readを選択

f:id:AdwaysEngineerBlog:20171207171215p:plain

上記は詳細の一部ですが、無事にcsvが抽出成功していることが確認出来ました。

Jobの作成・実行

f:id:AdwaysEngineerBlog:20171207171246p:plain

Add jobでJobを作成

f:id:AdwaysEngineerBlog:20171207171258p:plain

上記のように入力して次へ

f:id:AdwaysEngineerBlog:20171207171321p:plain

先ほど確認したData catalogのsample_glue_for_readを選択し次へ

f:id:AdwaysEngineerBlog:20171207171332p:plain

Create tables in your data targetを選択

今回は加工したデータをS3のsample-glue-for-resultバケットにアップロードするのがゴールなので、

S3を選択

Target Pathにs3://sample-glue-for-resultを入力

f:id:AdwaysEngineerBlog:20171207171352p:plain

iddummyは不要なので使わないように×を押します。

確認ページでFinishを押します。

すると、AWS Glue側で自動でコード生成してくれます。

これを元にコーディングしていきたいと思います。

自動生成コードに少し手を加えたコードがこちらになります。

import sys, unicodedata
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame, DynamicFrameWriter
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import Row

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "sample_glue_db", table_name = "sample_glue_for_read", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample_glue_db", table_name = "sample_glue_for_read", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("finalist_name", "string", "finalist_name", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("finalist_name", "string", "finalist_name", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @return: datasink2
## @inputs: [frame = applymapping1]


# ===書いたコード(ここから)===

def is_japanese(string):
    for ch in string:
        name = unicodedata.name(ch) 
        if "CJK UNIFIED" in name \
        or "HIRAGANA" in name \
        or "KATAKANA" in name:
            return True
    return False

result_list = []

df_list  = applymapping1.toDF().filter("finalist_name is NOT NULL").collect()

for row in df_list:
    if is_japanese(row['finalist_name']):
        result_list.append(Row(finalist_name=row['finalist_name']))
        
df = spark.createDataFrame(result_list)
df  = df.groupBy("finalist_name").count().sort("count", ascending=False)
dyf = DynamicFrame.fromDF(df, glueContext, 'sample_glue')

print "==========="
print dyf.show()
print "==========="

# ===書いたコード(ここまで)===

datasink2 = glueContext.write_dynamic_frame.from_options(frame = dyf, connection_type = "s3", connection_options = {"path": "s3://sample-glue-for-result"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

本来なら

df  = applymapping1.toDF().filter("finalist_name is NOT NULL")
df  = df.groupBy("finalist_name").count().sort("count", ascending=False)
dyf = DynamicFrame.fromDF(df, glueContext, 'sample_glue')

上記の3行だけで出来ると思ったのですが、.filter()を使ったら

どこからか謎の文字列が入り込んできてしまいました。

今回はマルチバイト文字だけを扱うので、こちらの記事を参考にゴリゴリにコードを書き、欲しいデータのDataFrameを作成しました。

(なんで.toDFした時には入ってないのに.filter()すると謎文字列入るんだろう。。)

f:id:AdwaysEngineerBlog:20171207171451p:plain

Run jobでJobを実行させます。

Jobのログ確認方法

f:id:AdwaysEngineerBlog:20171207171528p:plain

Jobの実行ログ等はCloudWatchに出力されます。

先ほどのコードの中にprintでDynamicFrameの中身を確認できるようにしたので確認したいと思います。

printの結果は/aws-glue/jobs/errorに書き出されます。

f:id:AdwaysEngineerBlog:20171207171545p:plain

DynamicFrameの中身にそれっぽいデータがちゃんと入っていることが確認できました。

次はS3にcsvファイルがちゃんと書き出されているか確認したいと思います。

実践結果

S3のsample-glue-for-resultに書き出したので見てみます。

f:id:AdwaysEngineerBlog:20171207171612p:plain

あれっ?1つのcsvファイルじゃなくて5つ出来てる!!
ログで確認したデータ良い感じだったのに。。

f:id:AdwaysEngineerBlog:20171207171620p:plain

中身を見てみると、、、

合体してくれれば想定通りのファイルだったのに

最後のファイルなんて空ファイル。

結果: データが分散されて複数のファイルで出力される

感想

AWS Glueを扱うためにPythonを初めて書いたのでとても新鮮でした。

形にはなったものの、イメージ通りに最後まで作り上げることが出来なかったのが心残りです。

ちゃんと1つのcsvで出力して、「とろサーモン」さん優勝おめでとうございます!まで持っていく予定でした。

「こうすれば1つファイルに纏められるよ」という方がいらっしゃいましたら教えていただけると幸いです。

お詫び

2017年度のM1ファイナリストを使わせて頂いたのですが、

誤字をしないためにネットから引っ張ってきたら

敗者復活枠のスーパーマラドーナさん抜きのテストデータを作ってしまいました。

深くお詫び申し上げます。