LocalStackのS3環境を利用したAWS Glue Jobローカル実行・テスト方法 (original) (raw)

AWS Glue Jobをローカル環境で開発する際、AWS公式が提供してるDocker imageを活用する方法があります。

Developing and testing AWS Glue job scripts locally

Glue Jobを利用する場合、S3からデータを取得・保存するユースケースが多いかと思います。

本記事では、ローカル環境にAWS環境をエミュレートするLocalStackを活用して、実際のAWSリソースへのデータをやり取りを行わずGlue Jobの動作検証・テストを行う方法を書きます。
Overview | Docs

Glue version 4.0のdocker imageであるamazon/aws-glue-libs:glue_libs_4.0.0_image_01 を使用します。

Glue versionごとにdocker imageが異なるので、ご注意ください。

本文中コードgithub.com

ディレクトリ構成

/ ├─ src | └─ glue_job.py ├─ tests │ └─ test_glue_job.py └─ compose.yaml

Glue Jobの実行スクリプト

import sys from typing import Dict

from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext

S3_ENDPOINT_URL = "http://s3.dev:4566" AWS_REGION = "ap-northeast-1" S3_BUCKET = "test-job-bucket"

def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame: dyf = glue_context.create_dynamic_frame.from_options( format_options={ "quoteChar": '"', "withHeader": True, "separator": ",", }, connection_type="s3", format="csv", connection_options={ "paths": [source_s3_path], "recurse": True, }, ) return dyf

def write_dynamic_frame_to_s3(glue_context: GlueContext, dyf: DynamicFrame, destination_s3_path: str) -> None: glue_context.write_dynamic_frame.from_options( frame=dyf, connection_type="s3", connection_options={"path": destination_s3_path}, format="parquet", format_options={"writeHeader": True}, )

def main(args: Dict[str, str]) -> None: sc = SparkContext() if args["JOB_NAME"] == "test": sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL) sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION) sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") glue_context = GlueContext(sc)

job = Job(glue_context)
job.init(args["JOB_NAME"], args)

dyf = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=f"s3://{S3_BUCKET}/test_data.csv")
write_dynamic_frame_to_s3(glue_context=glue_context, dyf=dyf, destination_s3_path=f"s3://{S3_BUCKET}/output")

job.commit()

if name == "main": args = getResolvedOptions(sys.argv, ["JOB_NAME"]) main(args)

以下の処理を実行するGlue Jobのスクリプトを用意しました

SparkContextにLocalStackでエミュレートしたS3にアクセスする設定を追加しています。

本番環境のGlueJobでは実際のAWSリソースにアクセスするため、以下の設定はローカル開発時のみ追加する必要があります。

実行引数のJOB_NAMEがtestの場合は、LocalStackへアクセスする設定を追加することでリソースの使い分けを行っています。

sc = SparkContext() if args["JOB_NAME"] == "test": sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL) sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION) sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") glue_context = GlueContext(sc)

docker composeの設定

services: glue.dev.s3.local: container_name: s3.dev image: localstack/localstack:3.8.0 environment: - SERVICES=s3 - AWS_DEFAULT_REGION=ap-northeast-1 - AWS_DEFAULT_OUTPUT=json - AWS_ACCESS_KEY_ID=test - AWS_SECRET_ACCESS_KEY=test networks: - glue.dev.network glue.dev: container_name: glue.dev image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01 volumes: - ./:/home/glue_user/workspace/ environment: - DISABLE_SSL=true - AWS_REGION=ap-northeast-1 - AWS_OUTPUT=json - AWS_ACCESS_KEY_ID=test - AWS_SECRET_ACCESS_KEY=test networks: - glue.dev.network tty: true stdin_open: true networks: glue.dev.network: name: glue.dev.network

LocalStackでエミュレートしたAWS環境にGlue Jobのコンテナがアクセスできるように、Glue Jobのコンテナの環境変数に、LocalStackの起動設定で指定したAWS_ACCESS_KEY_IDとAWS_SECRET_ACCESS_KEYを追加します。

compose.yamlはこちらの実装を参考にさせてもらいました。
GitHub - n-yokota/aws_glue_test_concept

docker containerを起動します。

$ docker compose up -d

Glue Jobをローカル環境で実行する

LocalStackのS3 bucket準備

Glue Jobのコンテナ環境に入ります。

$ docker compose exec glue.dev bash

LocalStackのS3に test-job-bucket Bucketを作成します。

$ aws s3 mb s3://test-job-bucket --endpoint-url http://s3.dev:4566

テスト用ファイルをLocalStackのS3 Bucketに追加します。

$ aws s3 mv ./data/test_data.csv s3://test-job-bucket/test_data.csv --endpoint-url http://s3.dev:4566

S3 Bucketにテスト用ファイルが保存されていることを確認できます。

$ aws s3api list-objects-v2 --bucket test-job-bucket --endpoint-url http://s3.dev:4566 { "Contents": [ { "LastModified": "2024-10-08T14:31:52.000Z", "ETag": ""19ee3f2027cea3841e74c3aa3520b5ed"", "StorageClass": "STANDARD", "Key": "test_data.csv", "Size": 100 } ] }

コンテナ環境でGlue Job実行

Glue Jobのスクリプトを通常のpyhonスクリプトとして実行します。

$ python3 src/glue_job.py --JOB_NAME test

対象のS3 Bucketにparquet形式でファイルが保存されていることを確認できます。

$ aws s3api list-objects-v2 --bucket test-job-bucket --endpoint-url http://s3.dev:4566 { "Contents": [ { "LastModified": "2024-10-08T14:32:23.000Z", "ETag": ""fa768a3a4c9659604c161e45a17ec02f"", "StorageClass": "STANDARD", "Key": "output/part-00000-3479d3db-5a89-4bd7-856c-fd714291c2f3-c000.snappy.parquet", "Size": 981 }, { "LastModified": "2024-10-08T14:31:52.000Z", "ETag": ""19ee3f2027cea3841e74c3aa3520b5ed"", "StorageClass": "STANDARD", "Key": "test_data.csv", "Size": 100 } ] }

LocalStackのS3を使用したGlue Jobのテスト実行方法

テスト用GlueContextのfixture作成

@pytest.fixture(scope="session") def glue_context() -> GlueContext: spark = ( SparkSession.builder.master("local[1]")

    .config("spark.sql.shuffle.partitions", "1")
    .config("spark.ui.showConsoleProgress", "false")
    .config("spark.ui.enabled", "false")
    .config("spark.ui.dagGraph.retainedRootRDD", "1")
    .config("spark.ui.retainedJobs", "1")
    .config("spark.ui.retainedStages", "1")
    .config("spark.ui.retainedTasks", "1")
    .config("spark.sql. ui.retainedExecutions", "1")
    .config("spark.worker.ui.retainedExecutors", "1")
    .config("spark.worker.ui.retainedDrivers", "1")
    .getOrCreate()
)


spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.change.detection.mode", "None")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.change.detection.version.required", "false")

yield GlueContext(spark.sparkContext)
spark.stop()

テスト実行速度を高速化するためにspark設定はこちらの記事を参考にしてます.
AWS GlueのCI/CD環境を作ってみた - KAKEHASHI Tech Blog

pytest実行時にLocalStackのS3へアクセスするための設定はこちらの記事を参考にしてます.
AWS Glueの開発環境の構築(2022) | フューチャー技術ブログ

テスト用S3 Bucketのfixture作成

@pytest.fixture(scope="session") def s3_client(): return boto3.client( "s3", endpoint_url=S3_ENDPOINT_URL, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name=AWS_REGION, )

@pytest.fixture(scope="session") def s3_bucket(s3_client: boto3.client) -> str: bucket_name = "test-s3-bucket"

try:
    s3_client.head_bucket(Bucket=bucket_name)
except Exception:
    s3_client.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={"LocationConstraint": AWS_REGION},
    )

yield bucket_name

try:
    s3_client.delete_bucket(Bucket=bucket_name)
except Exception as e:
    print(f"Failed to clean up test bucket: {e}")

@pytest.fixture(scope="session") def setup_s3_data(s3_client: boto3.client, s3_bucket: str) -> dict[str, str]: key = "test_data.csv" inputs = [ {"col1": "val1", "col2": 1, "col3": "2000/01/01 01:00:00"}, {"col1": "val2", "col2": 2, "col3": "2000/01/02 02:00:00"}, {"col1": "val3", "col2": 3, "col3": "2000/01/03 03:00:00"}, ] input_str = io.StringIO() w = csv.DictWriter(input_str, fieldnames=inputs[0].keys()) w.writeheader() for input in inputs: w.writerow(input)

body = input_str.getvalue()
s3_client.put_object(Bucket=s3_bucket, Key=key, Body=body)

yield {"bucket_name": s3_bucket, "key": key}

try:
    s3_client.delete_object(Bucket=s3_bucket, Key=key)
except Exception as e:
    print(f"Failed to clean up test data: {e}")

@pytest.fixture def get_s3_objects(s3_client): def _get_s3_objects(s3_bucket: str, prefix: str) -> list[str] | None: try: response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=prefix) if "Contents" in response: return [obj["Key"] for obj in response["Contents"]] except Exception: return

return _get_s3_objects

@pytest.fixture(scope="module") def sample_dynamicframe(glue_context: GlueContext) -> DynamicFrame: spark = glue_context.spark_session df = spark.createDataFrame( [ ("val1", 1, "2000/01/01 01:00:00"), ("val2", 2, "2000/01/02 02:00:00"), ("val3", 3, "2000/01/03 03:00:00"), ], ["col1", "col2", "col3"], ) dyf = DynamicFrame.fromDF(df, glue_context, "dyf")

return dyf

各関数の役割

S3アクセスを伴う関数のテスト

def test_get_dynamic_frame_from_s3(glue_context: GlueContext, setup_s3_data: dict[str, str]) -> None: source_s3_path = f"s3://{setup_s3_data['bucket_name']}/{setup_s3_data['key']}" result = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=source_s3_path)

assert isinstance(result, DynamicFrame)
assert result.count() == 3

df = result.toDF()
assert len(df.columns) == 3
assert df.columns == ["col1", "col2", "col3"]

rows = df.collect()
assert rows == [
    Row(col1="val1", col2="1", col3="2000/01/01 01:00:00"),
    Row(col1="val2", col2="2", col3="2000/01/02 02:00:00"),
    Row(col1="val3", col2="3", col3="2000/01/03 03:00:00"),
]

def test_write_dynamic_frame_from_s3( glue_context: GlueContext, s3_bucket, sample_dynamicframe: DynamicFrame, get_s3_objects, ) -> None: file_key = "test_write_data" destination_s3_path = f"s3://{s3_bucket}/{file_key}" write_dynamic_frame_to_s3( glue_context=glue_context, dyf=sample_dynamicframe, destination_s3_path=destination_s3_path, ) actual_s3_objects = get_s3_objects(s3_bucket=s3_bucket, prefix=file_key)

assert len(actual_s3_objects) > 0
assert any([object for object in actual_s3_objects if object.endswith(".parquet")])

LocalStackのS3 Bucketを使用して、S3とデータのやり取りを行う関数をテストします。

Glue Jobのコンテナ内でpytestコマンドを実行します。

$ pytest tests

参考