AWS glue studioを使用してFilterクラスを設置する
目的:AWS glueでFilterクラスを使用して、必要な行を抽出する
glue studio
glue studioに移動して、「Visual with a source and target」選択し、「Create」を押下。
「Data source」をクリックして、事前に設定しているDatabaseとTableを選択。
今回データの書き出しは行わず、ログに出力するので、一番下の「Data target」は削除する。
二番目にある「ApplyMapping」をクリックしておいて、「Transform」タブから「Filter」を選択。
「Filter」ノードをクリックすると、右部分で抽出条件を設定できるので、今回はKeyに「birthday」を選択し、Python正規表現を設定する。
「birthday」は今回「2000/10/10」等文字列として登録されているので、「20*」として2000年代に生まれた人を対象にする。
「Basic properties」の「Name」と「IAM Role」を設定。 「IAM Role」 は予め設定されたものを選択。
続けて、「Basic properties」の 「Requested number of workers」を最小の2に変更。
「Job timeout (minutes)」を5分に変更。
上部タブの「Script」をクリック。
右上にある「Edit script」をクリックするとビジュアルモードから、スクリプトモードに変更され、これはもとに戻せないとの警告が出ます。
今回はこのまま「Confirm」をクリックして進みます。
今回はログを排出するので、下記赤枠部分「DynamicFrame名.show()」を追記し、右上の「Save」をクリックします。
ジョブの作成が問題なく行えたら、「Run」をクリックします。
glue左ナビゲーション「ジョブ」をクリックし、ジョブ一覧のページに移動します。
ジョブが完了したら「ログ」をクリック。
古い方のログをクリック。
無事に抽出されて表示されているのを確認します。
すべてのソース
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import re
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="from_csv_to_datacatalog",
table_name="from_csv_to_datacatalog_upload_csv",
transformation_ctx="S3bucket_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("id", "long", "id", "long"),
("firstname", "string", "firstname", "string"),
("surname", "string", "surname", "string"),
("firstname_kana", "string", "firstname_kana", "string"),
("surname_kana", "string", "surname_kana", "string"),
("zipcode", "string", "zipcode", "string"),
("prefectures", "string", "prefectures", "string"),
("tel", "string", "tel", "string"),
("email", "string", "email", "string"),
("birthday", "string", "birthday", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node Filter
Filter_node1636090838500 = Filter.apply(
frame=ApplyMapping_node2,
f=lambda row: (bool(re.match("20*", row["birthday"]))),
transformation_ctx="Filter_node1636090838500",
)
Filter_node1636090838500.show()
job.commit()
参考サイト