AWS Glueを使ってS3→データ加工→S3をやってみた

Adways Advent Calendar 2017 5日目の記事です。

http://blog.engineer.adways.net/entry/advent_calendar_2017


こんにちは、12月7日担当の足立です。

12月7日はクリスマスツリーの日みたいですね。

1886(明治19)年のこの日、横浜・明治屋に日本初のクリスマスツリーが飾られた。らしいです。

さて、本題に入ります。

今回はAWS Glueを業務で触ったので、それについて簡単に説明していきたいと思います。

AWS Glueとはなんぞや??

f:id:AdwaysEngineerBlog:20171207170557p:plain

AWS Glue は抽出、変換、ロード (ETL) を行う完全マネージド型のサービスで、お客様の分析用データの準備とロードを簡単にします。AWS マネジメントコンソールで数回クリックするだけで、ETL ジョブを作成および実行できます。AWS Glue では、AWS に保存されたデータを指定するだけで AWS Glue によるデータ検索が行われ、テーブル定義やスキーマなどの関連するメタデータが AWS Glue データカタログに保存されます。カタログに保存されたデータは、すぐに検索、クエリ、ETL で使用できます。AWS Glue では、データ変換とデータのロードプロセスを実行するコードが生成されます。

ざっくりAWS Glueで出来ること(認識範囲内)を説明すると

  • CrawlerでS3等からデータを抽出してデータカタログへ
  • データカタログでカラム名・型変更可
  • Jobで変換
    • コンソール上でポチポチするだけで自動である程度コードが生成される
    • 細かな変換がしたい場合 PySpark, Pythonで可
  • Jobでロード(S3等に変換したデータをアップロード)

今回のゴール

f:id:AdwaysEngineerBlog:20171207170636p:plain

流れ

  • 二つ用意したバケットのsample_glue_for_read/blog_sample_glue.csvからAWS GlueのCrawlerを使ってcsvからデータ抽出
  • データカタログから不要なカラムの削除
  • Jobを使ってデータ加工しsample_glue_for_resultに出力

やること

下記の内容のcsvから不要なカラムidとdummyを削除して
finalist_nameの個数 == 投票数とし、並びかえたものを出力したいと思います。

id,finalist_name,dummy
1,ジャルジャル,
2,かまいたち,
3,カミナリ,
4,マヂカルラブリー,
5,ミキ,
6,さや香,
7,とろサーモン,
8,和牛,
9,ゆにばーす,
10,ミキ,
11,とろサーモン,
12,和牛,
13,和牛,
14,とろサーモン,
15,とろサーモン,

AWS Glue実践

準備(IAMロールの作成)

マネジメントコンソール上で IAM→ロール→ロールの作成

  1. ロールの作成画面で「AWS サービス」を選択
  2. このロールを使用するサービスを選択で「Glue」を選択
  3. 次のステップへ
  4. ポリシーの選択一覧画面で「AmazonS3FullAccess」 「AWSGlueServiceRole」の二つを選択
  5. 次のステップへ
  6. ロール名の入力、今回は「AWSGlueServiceRoleDefault」と入力
  7. ロールの作成

詳しくはこちらを参照 dev.classmethod.jp

実践

AWS Glueのページへ

東京リージョンはまだないので、今回はバージニア北部リージョンで実践していきたいと思います。

Crawler設定・実行

CrawlerでS3からファイルを抽出し、Data catalogに入れるために設定をしていきたいと思います。

f:id:AdwaysEngineerBlog:20171207170935p:plain

CrawlersのAdd crawlerから

f:id:AdwaysEngineerBlog:20171207170947p:plain

Crawler nameにsample_glueと入力

f:id:AdwaysEngineerBlog:20171207171003p:plain

Data storeをS3に選択 Include pathに今回はs3://sample-glue-for-readと入力
右のフォルダアイコンからバケット指定出来ます。

f:id:AdwaysEngineerBlog:20171207171031p:plain

Noを選択

f:id:AdwaysEngineerBlog:20171207171046p:plain

準備で作成したIAMロールAWSGlueServiceRoleDefaultを選択

f:id:AdwaysEngineerBlog:20171207171113p:plain

今回はRun on demandを選択

f:id:AdwaysEngineerBlog:20171207171125p:plain

Add databaseを押してDatabaseを作成

作成したsample_glue_dbを選択

確認ページでFinishを押してCrawlerの設定終了

Crawler一覧画面で作成したsample_glueを選択し、Run crawlerで実行する。

Data catalogの確認

メニューバーのData catalog→Databases→Tablesから

Run crawler実行で作成されたsample_glue_for_readを選択

f:id:AdwaysEngineerBlog:20171207171215p:plain

上記は詳細の一部ですが、無事にcsvが抽出成功していることが確認出来ました。

Jobの作成・実行

f:id:AdwaysEngineerBlog:20171207171246p:plain

Add jobでJobを作成

f:id:AdwaysEngineerBlog:20171207171258p:plain

上記のように入力して次へ

f:id:AdwaysEngineerBlog:20171207171321p:plain

先ほど確認したData catalogのsample_glue_for_readを選択し次へ

f:id:AdwaysEngineerBlog:20171207171332p:plain

Create tables in your data targetを選択

今回は加工したデータをS3のsample-glue-for-resultバケットにアップロードするのがゴールなので、

S3を選択

Target Pathにs3://sample-glue-for-resultを入力

f:id:AdwaysEngineerBlog:20171207171352p:plain

iddummyは不要なので使わないように×を押します。

確認ページでFinishを押します。

すると、AWS Glue側で自動でコード生成してくれます。

これを元にコーディングしていきたいと思います。

自動生成コードに少し手を加えたコードがこちらになります。

import sys, unicodedata
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame, DynamicFrameWriter
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import Row

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "sample_glue_db", table_name = "sample_glue_for_read", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample_glue_db", table_name = "sample_glue_for_read", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("finalist_name", "string", "finalist_name", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("finalist_name", "string", "finalist_name", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @return: datasink2
## @inputs: [frame = applymapping1]


# ===書いたコード(ここから)===

def is_japanese(string):
    for ch in string:
        name = unicodedata.name(ch) 
        if "CJK UNIFIED" in name \
        or "HIRAGANA" in name \
        or "KATAKANA" in name:
            return True
    return False

result_list = []

df_list  = applymapping1.toDF().filter("finalist_name is NOT NULL").collect()

for row in df_list:
    if is_japanese(row['finalist_name']):
        result_list.append(Row(finalist_name=row['finalist_name']))
        
df = spark.createDataFrame(result_list)
df  = df.groupBy("finalist_name").count().sort("count", ascending=False)
dyf = DynamicFrame.fromDF(df, glueContext, 'sample_glue')

print "==========="
print dyf.show()
print "==========="

# ===書いたコード(ここまで)===

datasink2 = glueContext.write_dynamic_frame.from_options(frame = dyf, connection_type = "s3", connection_options = {"path": "s3://sample-glue-for-result"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

本来なら

df  = applymapping1.toDF().filter("finalist_name is NOT NULL")
df  = df.groupBy("finalist_name").count().sort("count", ascending=False)
dyf = DynamicFrame.fromDF(df, glueContext, 'sample_glue')

上記の3行だけで出来ると思ったのですが、.filter()を使ったら

どこからか謎の文字列が入り込んできてしまいました。

今回はマルチバイト文字だけを扱うので、こちらの記事を参考にゴリゴリにコードを書き、欲しいデータのDataFrameを作成しました。

(なんで.toDFした時には入ってないのに.filter()すると謎文字列入るんだろう。。)

f:id:AdwaysEngineerBlog:20171207171451p:plain

Run jobでJobを実行させます。

Jobのログ確認方法

f:id:AdwaysEngineerBlog:20171207171528p:plain

Jobの実行ログ等はCloudWatchに出力されます。

先ほどのコードの中にprintでDynamicFrameの中身を確認できるようにしたので確認したいと思います。

printの結果は/aws-glue/jobs/errorに書き出されます。

f:id:AdwaysEngineerBlog:20171207171545p:plain

DynamicFrameの中身にそれっぽいデータがちゃんと入っていることが確認できました。

次はS3にcsvファイルがちゃんと書き出されているか確認したいと思います。

実践結果

S3のsample-glue-for-resultに書き出したので見てみます。

f:id:AdwaysEngineerBlog:20171207171612p:plain

あれっ?1つのcsvファイルじゃなくて5つ出来てる!!
ログで確認したデータ良い感じだったのに。。

f:id:AdwaysEngineerBlog:20171207171620p:plain

中身を見てみると、、、

合体してくれれば想定通りのファイルだったのに

最後のファイルなんて空ファイル。

結果: データが分散されて複数のファイルで出力される

感想

AWS Glueを扱うためにPythonを初めて書いたのでとても新鮮でした。

形にはなったものの、イメージ通りに最後まで作り上げることが出来なかったのが心残りです。

ちゃんと1つのcsvで出力して、「とろサーモン」さん優勝おめでとうございます!まで持っていく予定でした。

「こうすれば1つファイルに纏められるよ」という方がいらっしゃいましたら教えていただけると幸いです。

追記(9/18)

気がついたらもうすぐでこの記事を書いてから1年経ってしまいますが、
ファイルをまとめる方法を追記するのを忘れていたので今更ながら追記させて頂きます。

df = df.coalesce(1)
dyf = DynamicFrame.fromDF(df, glueContext, 'sample_glue')

coalesceメソッドを使うことによってパーティションで分散処理されてファイル分割されてしまっていたものが、
パーティション数を1にすることで1つにまとめてファイル出力が可能になる。

お詫び

2017年度のM1ファイナリストを使わせて頂いたのですが、

誤字をしないためにネットから引っ張ってきたら

敗者復活枠のスーパーマラドーナさん抜きのテストデータを作ってしまいました。

深くお詫び申し上げます。