AWS glue studioを使用してFilterクラスを設置する

目的:AWS glueでFilterクラスを使用して、必要な行を抽出する

glue studio

glue studioに移動して、「Visual with a source and target」選択し、「Create」を押下。

「Data source」をクリックして、事前に設定しているDatabaseとTableを選択。

今回データの書き出しは行わず、ログに出力するので、一番下の「Data target」は削除する。
二番目にある「ApplyMapping」をクリックしておいて、「Transform」タブから「Filter」を選択。

「Filter」ノードをクリックすると、右部分で抽出条件を設定できるので、今回はKeyに「birthday」を選択し、Python正規表現を設定する。

「birthday」は今回「2000/10/10」等文字列として登録されているので、「20*」として2000年代に生まれた人を対象にする。

「Basic properties」の「Name」と「IAM Role」を設定。 「IAM Role」 は予め設定されたものを選択。

続けて、「Basic properties」の 「Requested number of workers」を最小の2に変更。
「Job timeout (minutes)」を5分に変更。

上部タブの「Script」をクリック。

右上にある「Edit script」をクリックするとビジュアルモードから、スクリプトモードに変更され、これはもとに戻せないとの警告が出ます。

今回はこのまま「Confirm」をクリックして進みます。

今回はログを排出するので、下記赤枠部分「DynamicFrame名.show()」を追記し、右上の「Save」をクリックします。

ジョブの作成が問題なく行えたら、「Run」をクリックします。

glue左ナビゲーション「ジョブ」をクリックし、ジョブ一覧のページに移動します。

ジョブが完了したら「ログ」をクリック。

古い方のログをクリック。

無事に抽出されて表示されているのを確認します。

すべてのソース

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import re

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="from_csv_to_datacatalog",
    table_name="from_csv_to_datacatalog_upload_csv",
    transformation_ctx="S3bucket_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("id", "long", "id", "long"),
        ("firstname", "string", "firstname", "string"),
        ("surname", "string", "surname", "string"),
        ("firstname_kana", "string", "firstname_kana", "string"),
        ("surname_kana", "string", "surname_kana", "string"),
        ("zipcode", "string", "zipcode", "string"),
        ("prefectures", "string", "prefectures", "string"),
        ("tel", "string", "tel", "string"),
        ("email", "string", "email", "string"),
        ("birthday", "string", "birthday", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node Filter
Filter_node1636090838500 = Filter.apply(
    frame=ApplyMapping_node2,
    f=lambda row: (bool(re.match("20*", row["birthday"]))),
    transformation_ctx="Filter_node1636090838500",
)

Filter_node1636090838500.show()

job.commit()

参考サイト