AWS glue studio ETL実行時間を取得して、日付、時間ごとのカラムを追加する
目的:AWS glue で ETLジョブの実行時間(日本時間)を取得して、年、月、日、時間、分、秒ごとのカラムを追加する。
glue studio
最終的なノードの設定は下記になります。
S3 bucket > ApplyMapping > Custom transform > Select From Collection > Amazon S3
glue studioに移動して、「Visual with a source and target」選択し、「Create」を押下。
Custom transform の設置
ApplyMapping をクリックした後、Transform タブから、 Custom transform を選択。
右に表示される、入力フォームにソースコードを書いていきます。
Custom transform にソースコードを書いていく
言語は Python となります。PySparkが使用できます。
ソースコードは下記となります。一つ一つ解説していきます。
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
import datetime
dt_now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9)))
dt_year = dt_now.strftime('%Y')
dt_month = dt_now.strftime('%m')
dt_day = dt_now.strftime('%d')
dt_hour = dt_now.strftime('%H')
dt_minits = dt_now.strftime('%M')
dt_seconds = dt_now.strftime('%S')
from pyspark.sql.functions import lit
df = dfc.select(list(dfc.keys())[0]).toDF()
df = df.withColumn("etl_year",lit(dt_year))
df = df.withColumn("etl_month",lit(dt_month))
df = df.withColumn("etl_day",lit(dt_day))
df = df.withColumn("etl_hour",lit(dt_hour))
df = df.withColumn("etl_minits",lit(dt_minits))
df = df.withColumn("etl_seconds",lit(dt_seconds))
output_df = DynamicFrame.fromDF(df, glueContext, "output_df")
return DynamicFrameCollection({"output_df": output_df}, glueContext)
一行目は Custom transform を作成したときから生成され、削除できませんので、そのまま、その下を書いて行きます。
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
下記ソースコードの1行目、datetime モジュールを読み込んでいます。
2行目、AWS上では東京リージョンでもUTCなので、9をたして東京時間にします。
datetime.timezone() はローカル時刻と UTC の差分を表す datetime.timedelta() を引数に指定しなければなりません。
https://docs.python.org/ja/3/library/datetime.html#timezone-objects
3行目以降は strftime() を使用して「年」から「秒」までを変数にいれています。
import datetime # datetime モジュールを読み込んでいます
dt_now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))) #UTCから9時間たして東京時間にします
dt_year = dt_now.strftime('%Y')
dt_month = dt_now.strftime('%m')
dt_day = dt_now.strftime('%d')
dt_hour = dt_now.strftime('%H')
dt_minits = dt_now.strftime('%M')
dt_seconds = dt_now.strftime('%S')
DataFrame を扱う場合、そのまま変数を書くとエラー( AssertionError: col should be Column )になってしまうので、 lit() を使用して、リテラル値を返す必要があります。
ですので、下記ソースコードの 1行目で lit モジュールを読み込んでいます。実際に3行目以降で、 withColumn() を使用してそれぞれカラムを追加する際に lit() を使用しています(モジュールを読み込んでいないと NameError: name ‘lit’ is not defined というエラーが出ます)。
2行目は辞書型である DynamicFrameCollection を DataFrame に変換しています。
3行目以降は 「年」から「秒」までを withColumn() で追加していきます。
from pyspark.sql.functions import lit #忘れないように literal を import します。
df = dfc.select(list(dfc.keys())[0]).toDF() #辞書型である DynamicFrameCollection を DataFrame に変換しています
df = df.withColumn("etl_year",lit(dt_year))
df = df.withColumn("etl_month",lit(dt_month))
df = df.withColumn("etl_day",lit(dt_day))
df = df.withColumn("etl_hour",lit(dt_hour))
df = df.withColumn("etl_minits",lit(dt_minits))
df = df.withColumn("etl_seconds",lit(dt_seconds))
最後に下記ソースコードの 1行目で、DataFrame から DynamicFrame に変換し、
2行目で返り値の DynamicFrameCollection を返します。
output_df = DynamicFrame.fromDF(df, glueContext, "output_df")
return DynamicFrameCollection({"output_df": output_df}, glueContext)
パーティションを分けてparquetファイルを書き出す場合は下記の partitionKeys に書きだすキーを追記する。
# Script generated for node Amazon S3
AmazonS3_node1636787298846 = glueContext.write_dynamic_frame.from_options(
frame=SelectFromCollection_node1636787889574,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://datalake-test-datacatalog-s3/",
"partitionKeys": ['etl_year','etl_month','etl_day','etl_hour'],
},
format_options={"compression": "snappy"},
transformation_ctx="AmazonS3_node1636787298846",
)
RUN
Job details の Number of retries が 3 に設定されているので、適宜変更。今回はテストなので 0 に変更。
parquet形式で書き出してS3selectにて確認
無事追加されていました(データはテストデータです、実在しない人物です)。