AWS glue studio ETL実行時間を取得して、日付、時間ごとのカラムを追加する

目的:AWS glue で ETLジョブの実行時間(日本時間)を取得して、年、月、日、時間、分、秒ごとのカラムを追加する。

glue studio

最終的なノードの設定は下記になります。
S3 bucket > ApplyMapping > Custom transform > Select From Collection > Amazon S3

glue studioに移動して、「Visual with a source and target」選択し、「Create」を押下。

Custom transform の設置

ApplyMapping をクリックした後、Transform タブから、 Custom transform を選択。

右に表示される、入力フォームにソースコードを書いていきます。

Custom transform にソースコードを書いていく

言語は Python となります。PySparkが使用できます。

ソースコードは下記となります。一つ一つ解説していきます。

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    import datetime
    dt_now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9)))
    dt_year = dt_now.strftime('%Y')
    dt_month = dt_now.strftime('%m')
    dt_day = dt_now.strftime('%d')
    dt_hour = dt_now.strftime('%H')
    dt_minits = dt_now.strftime('%M')
    dt_seconds = dt_now.strftime('%S')
    
    from pyspark.sql.functions import lit
    df = dfc.select(list(dfc.keys())[0]).toDF()
    df = df.withColumn("etl_year",lit(dt_year))
    df = df.withColumn("etl_month",lit(dt_month))
    df = df.withColumn("etl_day",lit(dt_day))
    df = df.withColumn("etl_hour",lit(dt_hour))
    df = df.withColumn("etl_minits",lit(dt_minits))
    df = df.withColumn("etl_seconds",lit(dt_seconds))
    output_df = DynamicFrame.fromDF(df, glueContext, "output_df")
    return DynamicFrameCollection({"output_df": output_df}, glueContext)
    

一行目は Custom transform を作成したときから生成され、削除できませんので、そのまま、その下を書いて行きます。

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:

下記ソースコードの1行目、datetime モジュールを読み込んでいます。
2行目、AWS上では東京リージョンでもUTCなので、9をたして東京時間にします。
datetime.timezone() はローカル時刻と UTC の差分を表す datetime.timedelta() を引数に指定しなければなりません。
https://docs.python.org/ja/3/library/datetime.html#timezone-objects

3行目以降は strftime() を使用して「年」から「秒」までを変数にいれています。

    import datetime # datetime モジュールを読み込んでいます
    dt_now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))) #UTCから9時間たして東京時間にします
    dt_year = dt_now.strftime('%Y')
    dt_month = dt_now.strftime('%m')
    dt_day = dt_now.strftime('%d')
    dt_hour = dt_now.strftime('%H')
    dt_minits = dt_now.strftime('%M')
    dt_seconds = dt_now.strftime('%S')

DataFrame を扱う場合、そのまま変数を書くとエラー( AssertionError: col should be Column )になってしまうので、 lit() を使用して、リテラル値を返す必要があります。
ですので、下記ソースコードの 1行目で lit モジュールを読み込んでいます。実際に3行目以降で、 withColumn() を使用してそれぞれカラムを追加する際に lit() を使用しています(モジュールを読み込んでいないと NameError: name ‘lit’ is not defined というエラーが出ます)。

2行目は辞書型である DynamicFrameCollection を DataFrame に変換しています。

3行目以降は 「年」から「秒」までを withColumn() で追加していきます。

from pyspark.sql.functions import lit #忘れないように literal を import します。
    df = dfc.select(list(dfc.keys())[0]).toDF() #辞書型である DynamicFrameCollection を DataFrame に変換しています
    df = df.withColumn("etl_year",lit(dt_year))
    df = df.withColumn("etl_month",lit(dt_month))
    df = df.withColumn("etl_day",lit(dt_day))
    df = df.withColumn("etl_hour",lit(dt_hour))
    df = df.withColumn("etl_minits",lit(dt_minits))
    df = df.withColumn("etl_seconds",lit(dt_seconds))

最後に下記ソースコードの 1行目で、DataFrame から DynamicFrame に変換し、
2行目で返り値の DynamicFrameCollection を返します。

    output_df = DynamicFrame.fromDF(df, glueContext, "output_df")
    return DynamicFrameCollection({"output_df": output_df}, glueContext)

パーティションを分けてparquetファイルを書き出す場合は下記の partitionKeys に書きだすキーを追記する。

# Script generated for node Amazon S3
AmazonS3_node1636787298846 = glueContext.write_dynamic_frame.from_options(
    frame=SelectFromCollection_node1636787889574,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://datalake-test-datacatalog-s3/",
        "partitionKeys": ['etl_year','etl_month','etl_day','etl_hour'],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="AmazonS3_node1636787298846",
)

RUN

Job details の Number of retries が 3 に設定されているので、適宜変更。今回はテストなので 0 に変更。

parquet形式で書き出してS3selectにて確認

無事追加されていました(データはテストデータです、実在しない人物です)。