Adways Advent Calendar 2017 5日目の記事です。
http://blog.engineer.adways.net/entry/advent_calendar_2017
こんにちは、12月7日担当の足立です。
12月7日はクリスマスツリーの日みたいですね。
1886(明治19)年のこの日、横浜・明治屋に日本初のクリスマスツリーが飾られた。
らしいです。
さて、本題に入ります。
今回はAWS Glueを業務で触ったので、それについて簡単に説明していきたいと思います。
AWS Glueとはなんぞや??
AWS Glue は抽出、変換、ロード (ETL) を行う完全マネージド型のサービスで、お客様の分析用データの準備とロードを簡単にします。AWS マネジメントコンソールで数回クリックするだけで、ETL ジョブを作成および実行できます。AWS Glue では、AWS に保存されたデータを指定するだけで AWS Glue によるデータ検索が行われ、テーブル定義やスキーマなどの関連するメタデータが AWS Glue データカタログに保存されます。カタログに保存されたデータは、すぐに検索、クエリ、ETL で使用できます。AWS Glue では、データ変換とデータのロードプロセスを実行するコードが生成されます。
ざっくりAWS Glueで出来ること(認識範囲内)を説明すると
- CrawlerでS3等からデータを抽出してデータカタログへ
- データカタログでカラム名・型変更可
- Jobで変換
- コンソール上でポチポチするだけで自動である程度コードが生成される
- 細かな変換がしたい場合
PySpark, Python
で可
- Jobでロード(S3等に変換したデータをアップロード)
今回のゴール
流れ
- 二つ用意したバケットの
sample_glue_for_read/blog_sample_glue.csv
からAWS GlueのCrawlerを使ってcsvからデータ抽出 - データカタログから不要なカラムの削除
- Jobを使ってデータ加工し
sample_glue_for_result
に出力
やること
下記の内容のcsvから不要なカラムidとdummy
を削除して
finalist_nameの個数 == 投票数
とし、並びかえたものを出力したいと思います。
id,finalist_name,dummy 1,ジャルジャル, 2,かまいたち, 3,カミナリ, 4,マヂカルラブリー, 5,ミキ, 6,さや香, 7,とろサーモン, 8,和牛, 9,ゆにばーす, 10,ミキ, 11,とろサーモン, 12,和牛, 13,和牛, 14,とろサーモン, 15,とろサーモン,
AWS Glue実践
準備(IAMロールの作成)
マネジメントコンソール上で IAM→ロール→ロールの作成
- ロールの作成画面で
「AWS サービス」
を選択 - このロールを使用するサービスを選択で
「Glue」
を選択 - 次のステップへ
- ポリシーの選択一覧画面で
「AmazonS3FullAccess」
「AWSGlueServiceRole」
の二つを選択 - 次のステップへ
- ロール名の入力、今回は
「AWSGlueServiceRoleDefault」
と入力 - ロールの作成
詳しくはこちらを参照 dev.classmethod.jp
実践
AWS Glueのページへ
東京リージョンはまだないので、今回はバージニア北部リージョンで実践していきたいと思います。
Crawler設定・実行
CrawlerでS3からファイルを抽出し、Data catalogに入れるために設定をしていきたいと思います。
CrawlersのAdd crawlerから
Crawler nameにsample_glue
と入力
Data storeをS3
に選択
Include pathに今回はs3://sample-glue-for-read
と入力
右のフォルダアイコンからバケット指定出来ます。
No
を選択
準備で作成したIAMロールAWSGlueServiceRoleDefault
を選択
今回はRun on demand
を選択
Add databaseを押してDatabaseを作成
作成したsample_glue_db
を選択
確認ページでFinish
を押してCrawlerの設定終了
Crawler一覧画面で作成したsample_glue
を選択し、Run crawler
で実行する。
Data catalogの確認
メニューバーのData catalog→Databases→Tables
から
Run crawler実行で作成されたsample_glue_for_read
を選択
上記は詳細の一部ですが、無事にcsvが抽出成功していることが確認出来ました。
Jobの作成・実行
Add job
でJobを作成
上記のように入力して次へ
先ほど確認したData catalogのsample_glue_for_read
を選択し次へ
Create tables in your data target
を選択
今回は加工したデータをS3のsample-glue-for-result
バケットにアップロードするのがゴールなので、
S3
を選択
Target Pathにs3://sample-glue-for-result
を入力
id
とdummy
は不要なので使わないように×を押します。
確認ページでFinish
を押します。
すると、AWS Glue側で自動でコード生成してくれます。
これを元にコーディングしていきたいと思います。
自動生成コードに少し手を加えたコードがこちらになります。
import sys, unicodedata from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.dynamicframe import DynamicFrame, DynamicFrameWriter from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql import Row ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "sample_glue_db", table_name = "sample_glue_for_read", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample_glue_db", table_name = "sample_glue_for_read", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("finalist_name", "string", "finalist_name", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("finalist_name", "string", "finalist_name", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @return: datasink2 ## @inputs: [frame = applymapping1] # ===書いたコード(ここから)=== def is_japanese(string): for ch in string: name = unicodedata.name(ch) if "CJK UNIFIED" in name \ or "HIRAGANA" in name \ or "KATAKANA" in name: return True return False result_list = [] df_list = applymapping1.toDF().filter("finalist_name is NOT NULL").collect() for row in df_list: if is_japanese(row['finalist_name']): result_list.append(Row(finalist_name=row['finalist_name'])) df = spark.createDataFrame(result_list) df = df.groupBy("finalist_name").count().sort("count", ascending=False) dyf = DynamicFrame.fromDF(df, glueContext, 'sample_glue') print "===========" print dyf.show() print "===========" # ===書いたコード(ここまで)=== datasink2 = glueContext.write_dynamic_frame.from_options(frame = dyf, connection_type = "s3", connection_options = {"path": "s3://sample-glue-for-result"}, format = "csv", transformation_ctx = "datasink2") job.commit()
本来なら
df = applymapping1.toDF().filter("finalist_name is NOT NULL") df = df.groupBy("finalist_name").count().sort("count", ascending=False) dyf = DynamicFrame.fromDF(df, glueContext, 'sample_glue')
上記の3行だけで出来ると思ったのですが、.filter()
を使ったら
どこからか謎の文字列が入り込んできてしまいました。
今回はマルチバイト文字だけを扱うので、こちらの記事を参考にゴリゴリにコードを書き、欲しいデータのDataFrameを作成しました。
(なんで.toDF
した時には入ってないのに.filter()
すると謎文字列入るんだろう。。)
Run job
でJobを実行させます。
Jobのログ確認方法
Jobの実行ログ等はCloudWatchに出力されます。
先ほどのコードの中にprint
でDynamicFrameの中身を確認できるようにしたので確認したいと思います。
print
の結果は/aws-glue/jobs/error
に書き出されます。
DynamicFrameの中身にそれっぽいデータがちゃんと入っていることが確認できました。
次はS3にcsvファイルがちゃんと書き出されているか確認したいと思います。
実践結果
S3のsample-glue-for-result
に書き出したので見てみます。
あれっ?1つのcsvファイルじゃなくて5つ出来てる!!
ログで確認したデータ良い感じだったのに。。
中身を見てみると、、、
合体してくれれば想定通りのファイルだったのに
最後のファイルなんて空ファイル。
結果: データが分散されて複数のファイルで出力される
感想
AWS Glueを扱うためにPythonを初めて書いたのでとても新鮮でした。
形にはなったものの、イメージ通りに最後まで作り上げることが出来なかったのが心残りです。
ちゃんと1つのcsvで出力して、「とろサーモン」さん優勝おめでとうございます!まで持っていく予定でした。
「こうすれば1つファイルに纏められるよ」という方がいらっしゃいましたら教えていただけると幸いです。
追記(9/18)
気がついたらもうすぐでこの記事を書いてから1年経ってしまいますが、
ファイルをまとめる方法を追記するのを忘れていたので今更ながら追記させて頂きます。
df = df.coalesce(1) dyf = DynamicFrame.fromDF(df, glueContext, 'sample_glue')
coalesce
メソッドを使うことによってパーティションで分散処理されてファイル分割されてしまっていたものが、
パーティション数を1にすることで1つにまとめてファイル出力が可能になる。
お詫び
2017年度のM1ファイナリストを使わせて頂いたのですが、
誤字をしないためにネットから引っ張ってきたら
敗者復活枠のスーパーマラドーナ
さん抜きのテストデータを作ってしまいました。
深くお詫び申し上げます。