AWS glue studioを使用してFilterクラスを設置する
目的:AWS glueでFilterクラスを使用して、必要な行を抽出する
glue studio
glue studioに移動して、「Visual with a source and target」選択し、「Create」を押下。
data:image/s3,"s3://crabby-images/62a32/62a3222935f06db6cb64980a34078bdcd66d649c" alt=""
「Data source」をクリックして、事前に設定しているDatabaseとTableを選択。
data:image/s3,"s3://crabby-images/0f064/0f06483a5dce1ae2d9e535bce55b2ba774402df0" alt=""
今回データの書き出しは行わず、ログに出力するので、一番下の「Data target」は削除する。
二番目にある「ApplyMapping」をクリックしておいて、「Transform」タブから「Filter」を選択。
data:image/s3,"s3://crabby-images/98d49/98d49f73361ddef8f62c286997a1858e0a7f08af" alt=""
「Filter」ノードをクリックすると、右部分で抽出条件を設定できるので、今回はKeyに「birthday」を選択し、Python正規表現を設定する。
「birthday」は今回「2000/10/10」等文字列として登録されているので、「20*」として2000年代に生まれた人を対象にする。
data:image/s3,"s3://crabby-images/d9d27/d9d274246573967ce617e33e4d543d8a1bea02a7" alt=""
「Basic properties」の「Name」と「IAM Role」を設定。 「IAM Role」 は予め設定されたものを選択。
data:image/s3,"s3://crabby-images/aecbd/aecbd95db80d8a838d49166b59125b98ce8cf85a" alt=""
続けて、「Basic properties」の 「Requested number of workers」を最小の2に変更。
「Job timeout (minutes)」を5分に変更。
data:image/s3,"s3://crabby-images/8ac4d/8ac4dd2368a3320e3cae1630a3ee5e116d850cd0" alt=""
上部タブの「Script」をクリック。
data:image/s3,"s3://crabby-images/98f65/98f65de23628b8bceefa11c6b669494fb938249e" alt=""
右上にある「Edit script」をクリックするとビジュアルモードから、スクリプトモードに変更され、これはもとに戻せないとの警告が出ます。
今回はこのまま「Confirm」をクリックして進みます。
data:image/s3,"s3://crabby-images/209b8/209b89948da2b39173d643c500da2d08ef8ae4cf" alt=""
今回はログを排出するので、下記赤枠部分「DynamicFrame名.show()」を追記し、右上の「Save」をクリックします。
ジョブの作成が問題なく行えたら、「Run」をクリックします。
data:image/s3,"s3://crabby-images/73959/739591c338d93ca0a2e547f3b62545ccf938ed4b" alt=""
glue左ナビゲーション「ジョブ」をクリックし、ジョブ一覧のページに移動します。
ジョブが完了したら「ログ」をクリック。
data:image/s3,"s3://crabby-images/bcce3/bcce33525622c8601148c8ceb55a4c9a71010bfc" alt=""
古い方のログをクリック。
data:image/s3,"s3://crabby-images/6e5bd/6e5bd71a9220359893ef1760846004d77958dfe0" alt=""
無事に抽出されて表示されているのを確認します。
data:image/s3,"s3://crabby-images/c3fc2/c3fc2d7e9d5bad5ec462ce528c017b18db970caa" alt=""
すべてのソース
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import re
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="from_csv_to_datacatalog",
table_name="from_csv_to_datacatalog_upload_csv",
transformation_ctx="S3bucket_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("id", "long", "id", "long"),
("firstname", "string", "firstname", "string"),
("surname", "string", "surname", "string"),
("firstname_kana", "string", "firstname_kana", "string"),
("surname_kana", "string", "surname_kana", "string"),
("zipcode", "string", "zipcode", "string"),
("prefectures", "string", "prefectures", "string"),
("tel", "string", "tel", "string"),
("email", "string", "email", "string"),
("birthday", "string", "birthday", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node Filter
Filter_node1636090838500 = Filter.apply(
frame=ApplyMapping_node2,
f=lambda row: (bool(re.match("20*", row["birthday"]))),
transformation_ctx="Filter_node1636090838500",
)
Filter_node1636090838500.show()
job.commit()
参考サイト