Containers

Optimizing data lakes with Amazon S3 Tables and Apache Spark on Amazon EKS

This blog post was authored by Aritra Gupta (Senior Product Manager – S3), Vara Bonthu (Principal, Open Source Software Specialist Solutions Architect), Ratnopam Chakrabarti (Senior Solutions Architect – Containers & Open Source Software), and Manabu McCloskey (Senior Open Source Engineer).

Introduction

Managing business data has become increasingly challenging as companies collect more information than ever before. Apache Iceberg has emerged as a popular solution, helping companies organize and analyze their expanding data collections efficiently. Much like a well-organized library system, Iceberg helps businesses keep track of their data, make updates when needed, and make sure that nothing gets lost or duplicated in the process.

Although Apache Iceberg on Amazon S3 has become a widely adopted format for building data lakehouses, managing Iceberg tables at scale comes with operational challenges. Users must handle table optimizations, metadata management, compaction, and transaction consistency manually, which can introduce performance bottlenecks and administrative overhead. Furthermore, high-frequency transactions and optimizing query performance necessitate continuous tuning.

To address these complexities, Amazon S3 Tables delivers a fully managed table storage service with built-in Apache Iceberg support. Unlike unmanaged Iceberg tables on Amazon S3, S3 Tables automate table optimizations, delivering up to three times faster query performance and supporting up to ten times higher transactions per second compared to Iceberg tables stored in general purpose S3 buckets. These optimizations improve query performance, automate table maintenance, and streamline security, allowing users to focus on analytics rather than infrastructure management. Native integration into Amazon Web Services (AWS) analytics services through Amazon SageMaker Lakehouse allows S3 Tables to provide a high-performance, cost-optimized, and streamlined approach to managing Iceberg tables.

For organizations running Apache Spark on Amazon Elastic Kubernetes Service (Amazon EKS) with Iceberg tables in general purpose S3 buckets, S3 Tables streamlines management and improves performance. Users can build Iceberg-backed data lakes on Amazon S3 with Apache Spark on Amazon EKS, using S3 Tables for seamless scalability and built-in maintenance.

This post walks through how to integrate S3 Tables with Apache Spark on Amazon EKS, demonstrating how users can use this managed table service for scalable and high-performance data analytics on Amazon EKS.

Architecture

The following diagram shows how Apache Spark on Amazon EKS writes data to S3 Tables using the Spark Operator. The Spark Operator deploys and manages Spark applications within the EKS cluster, enabling scalable data processing. Spark jobs interact with S3 Tables, using built-in Iceberg support for efficient table storage and metadata management. IAM Roles for Service Accounts (IRSA) provide secure access to S3 Tables, making sure of seamless authentication and permission control.

In this post, we want to demonstrate how to get started with S3 Tables using Apache Spark on Amazon EKS. The first query retrieves data file details, such as Parquet format, storage location, record count, and file size. The second query explores table history, capturing snapshot IDs, parent relationships, and commit timestamps to track schema changes over time. The third query lists Iceberg snapshots, detailing committed changes, operations (for example append), and metadata for time-travel queries, as shown in the following figure.

Iceberg Output

Iceberg Output

Solution deployment

In this section, we walk through deploying Apache Spark on Amazon EKS and integrating it with S3 Tables for scalable and efficient data processing. We provision an EKS cluster using the Data on EKS Blueprints, deploy the open source Kubeflow Spark Operator, configure a table bucket, and set up Spark jobs to write and query Iceberg tables.

S3 Tables now supports Apache Iceberg’s REST catalog interface, enabling a consistent and cloud native way to manage Iceberg tables directly over REST APIs. You can connect your Iceberg REST client to the S3 Tables Iceberg REST endpoint and make REST API calls to create, update, or query tables in S3 table buckets. The endpoint implements a set of standardized Iceberg REST APIs specified in the Apache Iceberg REST Catalog Open API specification. The endpoint works by translating Iceberg REST API operations into corresponding S3 Tables operations.

In this post we use open source Apache Spark. However, you can also use the AWS Glue iceberg REST endpoint to perform the same tasks.

Prerequisites

Make sure that you have installed the following tools on your machine:

  1. AWS Command Line Interface (AWS CLI)
  2. kubectl
  3. Terraform

Step 1. Create EKS cluster

To streamline deployment, use the Data on EKS Terraform-based blueprint. This blueprint automates the provisioning of the following components:

  • VPC and subnets: Networking infrastructure for Amazon EKS.
  • EKS cluster: Kubernetes control plane for running Spark workloads.
  • Karpenter: An autoscaler for dynamically provisioning compute nodes.
  • Spark Operator: Manages Spark applications on Kubernetes.
  • Prometheus and Grafana: For monitoring and metrics visualization.
  • FluentBit: For log aggregation and forwarding.

Clone the repository.

git clone https://github.com/awslabs/data-on-eks.git
cd data-on-eks
export DOEKS_HOME=$(pwd)

Navigate to the example directory and run the install.sh script.

cd ${DOEKS_HOME}/analytics/terraform/spark-k8s-operator
chmod +x install.sh
./install.sh

The install.sh script takes approximately 15 minutes to finish execution. When it’s completed, you should see an output similar to the following:

cluster_arn = "arn:aws:eks:us-west-2:<account-id>:cluster/spark-operator-doeks"
s3_bucket_id_spark_history_server = "spark-operator-doeks-spark-logs-xxxxxxx"
s3_bucket_region_spark_history_server = "<region>"

Note the S3 bucket ID. Create an environment variable S3_BUCKET that holds the name of the bucket created during the install. This bucket is used later to store sample data.

export S3_BUCKET=$(terraform output -raw s3_bucket_id_spark_history_server)

Step 2. Set up S3 Tables

In this step, you create a table bucket that stores Iceberg tables. This bucket is used by our PySpark job running on Amazon EKS to read and write data.

Run the following command to create a table bucket. Replace <S3TABLE_BUCKET_NAME> with your desired bucket name and <REGION> with your AWS Region.

aws s3tables create-table-bucket \
    --region "<REGION>" \
    --name "<TABLE_BUCKET_NAME>"

When the command executes, it generates a table bucket Amazon Resource Name (ARN). Note this ARN, as it is necessary for Spark job configurations.

Step 3. Create test data for the Spark Job

Before running the Spark job on Amazon EKS, you need sample data to process. In this step, you generate a test dataset that is written to S3 Tables.

cd analytics/terraform/spark-k8s-operator/examples/s3-tables
./input-data-gen.sh

This script creates a file named employee_data.csv in your current directory. By default, it generates 100 records.

If you need to adjust the number of records, then you can modify the input-data-gen.sh script. Look for the loop that generates the data and change the iteration count as needed.

Step 4. Upload test data to S3 bucket

Replace <YOUR_S3_BUCKET> with the name of the S3 bucket created by your blueprint and run the following command.

aws s3 cp employee_data.csv s3://<S3_BUCKET>/s3table-example/input/

This command uploads the CSV file to your S3 bucket. The Spark job later references this path to read the input data. Make sure that you have the necessary permissions to write to this bucket before executing the command.

Step 5. Upload PySpark script to S3 bucket

We created a PySpark script (s3table-iceberg-pyspark.py) to configure Apache Spark for data processing using S3 Tables. It reads an input CSV file from a general purpose S3 bucket, writes the processed data to an S3 Tables bucket as an Iceberg table, and queries it to verify the data.

Run the following command, replacing <S3_BUCKET> with your S3 bucket name created in previous steps. This makes sure that the script is available for execution within the Spark job running on Amazon EKS.

aws s3 cp s3table-iceberg-pyspark.py s3://<S3_BUCKET>/s3table-example/scripts/

Step 6. Update Spark operator manifest

Update the s3table-spark-operator.yaml file to configure the Spark job:

  • Replace <S3_BUCKET> with the S3 bucket name from Terraform outputs.
  • Replace <S3TABLE_ARN> with the S3 Table ARN captured previously.

This makes sure that the Spark job reads data from Amazon S3, writes to S3 Tables, and runs on Amazon EKS with the correct configurations.

Step 7. Execute Spark Job

Before running a Spark job that reads and writes to S3 Tables, you need a Spark Docker Image for S3 Tables (Dockerfile-S3Table) with the necessary dependencies.

To communicate with S3 Tables using Spark, the Docker image must include the following:

  1. Hadoop AWS Connector and Dependencies: Enables S3A filesystem support for Spark.
  2. Apache Iceberg Runtime; Provides Iceberg table management capabilities.
  3. AWS SDK Bundle: Necessary for interacting with AWS.

Customizing and using the image

You can build your own Docker image with these dependencies and push it to a container registry (for example Amazon Elastic Container Registry (Amazon ECR)). However, to streamline this process, we have prebuilt and published a container image, which is referenced in the Spark Operator YAML file.

Run the Spark job

Apply the updated Spark Operator YAML to submit and execute the Spark job:

cd ${DOEKS_HOME}/analytics/terraform/spark-k8s-operator/examples/s3-tables
kubectl apply -f s3table-spark-operator.yaml

This schedules the Spark job on the EKS cluster. Spark Operator handles submitting the job to the Kubernetes API Server. Kubernetes schedules the Spark driver and executor pods to run on separate worker nodes. Karpenter automatically provisions new nodes if needed, based on the nodepool configuration. When the Spark Job is submitted, it creates the Driver and Executor Pods to do the processing. You can check the status of the Spark Pods as follows:

❯ kubectl get po -n spark-team-a
NAME                     READY   STATUS    RESTARTS   AGE
s3table-example          1/1     Running   0          69s
s3table-example-exec-1   1/1     Running   0          13s

When the executor Pod successfully finishes the processing, it is terminated and eventually the Driver Pod status becomes Completed.

❯ kubectl get po -n spark-team-a
NAME              READY   STATUS      RESTARTS   AGE
s3table-example   0/1     Completed   0          3m33s

Step 8. Check the Spark Driver logs

List the pods running under the spark-team-a namespace:

kubectl get pods -n spark-team-a

Furthermore, verify the Spark driver logs to see the full output of the Spark job. The job reads the CSV data from the S3 bucket and writes it back to the table bucket using the Iceberg format. It also counts the number of records processed and displays the first 10 records:

kubectl logs <spark-driver-pod-name> -n spark-team-a

Step 9. Check the S3 Tables using S3 Tables API

Confirm that the Iceberg table was successfully created using the S3 Tables API. Replace <ACCOUNT_ID> and <REGION> with your details and run the following:

aws s3tables get-table --table-bucket-arn arn:aws:s3tables:<REGION>:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table \
--namespace doeks_namespace \
--name employee_s3_table

Under the hood, it invokes GetTable API. For detailed information, refer to GetTable and GetTableBucket API Reference.

You should see the following output:

{
    "name": "employee_s3_table",
    "type": "customer",
    "tableARN": "arn:aws:s3tables:us-west-2:<ACCOUNT_ID>:bucket/doeks-spark-on-eks-s3table/table/55511111-7a03-4513-b921-e372b0030daf",
    "namespace": [
        "doeks_namespace"
    ],
    "versionToken": "aafc39ddd462690d2a0c",
    "metadataLocation": "s3://55511111-7a03-4513-asdfsafdsfdsf--table-s3/metadata/00004-62cc4be3-59b5-4647-a78d-1cdf69ec5ed8.metadata.json",
    "warehouseLocation": "s3://55511111-7a03-4513-asdfsafdsfdsf--table-s3",
    "createdAt": "2025-01-07T22:14:48.689581+00:00",
    "createdBy": "<ACCOUNT_ID>",
    "modifiedAt": "2025-01-09T00:06:09.222917+00:00",
    "ownerAccountId": "<ACCOUNT_ID>",
    "format": "ICEBERG"
}

This confirms that the Spark job successfully wrote data to S3 Tables in Iceberg format.

JupyterHub set up and execution

If you’d like to interactively work with S3 Tables, then the blueprint includes a way to enable JupyterHub in a single user configuration within the cluster. To enable it, create a Terraform variable file and set the enable_jupyterhub value to true.

⚠️ Warning: This configuration is intended for testing purposes only.

Configure and access JupyterHub web interface

  1. Enable JupyterHub by creating a Terraform variable file:
cd ${DOEKS_HOME}/analytics/terraform/spark-k8s-operator

echo 'enable_jupyterhub = true' >> spark-operator.tfvars
terraform apply -var-file spark-operator.tfvars
  1. Verify JupyterHub deployment is ready. This command should return 1 when ready.
kubectl get deployment hub -n jupyterhub -o jsonpath='{.status.readyReplicas}'
  1. Make JupyterHub web interface available locally.
kubectl port-forward svc/proxy-public 8888:80 -n jupyterhub

Accessing JupyterHub

  1. Navigate to http://localhost:8888.
  2. Enter any username, leave the password field empty,
  3. Choose Sign in.

Choose your environment:

  • Option 1: PySpark image with S3 Tables support.
  • Option 2: Base PySpark image (needs more library installation: go to this Dockerfile).

Choose Start. It takes a few minutes for the server to be ready.

Configuring Spark for S3 Tables

To configure Spark for S3 Tables, follow the documentation.

In the Amazon EKS environments, we recommend using

WebIdentityTokenFileCredentialsProvider (IRSA) or ContainerCredentialsProvider (Pod Identity).

You can explore the example JupyterLab notebook s3table-iceberg-pyspark.ipynb to interactively run Spark queries on S3 Tables.

Perform Iceberg queries on S3 Tables

Now that you have loaded data into the table, add new columns to the table and perform a time travel to go back to the previous state of the table.

Step 1: Alter the employee_s3_table in the doeks_namespace and add a new column named is_manager.

# Alter table and add a new column
import pyspark.sql.utils
try:
    spark.sql(f"ALTER TABLE {namespace}.{table_name} ADD COLUMNS is_manager STRING")
except pyspark.sql.utils.AnalysisException:
    print("Column already exists")

Step 2: Fetch the first five rows of the table to verify that the table was altered.

spark.sql("select * from doeks_namespace.employee_s3_table").show(5)

The output should look like the following, which shows that the new column is_manager has been added in the table (with NULL values).

+---+----------+------+--------+----------+
| id| name| level| salary|is_manager|
+---+----------+------+--------+----------+
| 1|Employee_1| Exec| 90500.0| NULL|
| 2|Employee_2|Junior| 68000.0| NULL|
| 3|Employee_3|Senior|155000.0| NULL|
| 4|Employee_4|Senior|113000.0| NULL|
| 5|Employee_5| Exec|159500.0| NULL|
+---+----------+------+--------+----------+

Step 3. Update all the rows with value Y for the “newly added column. Then, retrieve the rows to verify the outcome of the update operation.

from pyspark.sql import functions as sf
employee_df.withColumn("is_manager",sf.lit("Y")).writeTo("doeks_namespace.employee_s3_table").createOrReplace()

# fetch the updated records
spark.sql("select * from doeks_namespace.employee_s3_table").show(5)

The output is as follows, which confirms that the rows have been updated with value Y for the is_manager column.

+---+-----------+------+--------+----------+
| id| name| level| salary|is_manager|
+---+-----------+------+--------+----------+
| 1| Employee_1| Exec| 90500.0| Y|
| 2| Employee_2|Junior| 68000.0| Y|
| 3| Employee_3|Senior|155000.0| Y|
| 4| Employee_4|Senior|113000.0| Y|
| 5| Employee_5| Exec|159500.0| Y|

Step 4. Test the Time travel feature. Query the table from a previous snapshot. This snapshot doesn’t have the column you newly created while altering the table.

First, check the snapshot history

# check snapshot history
spark.sql(f"SELECT * FROM {namespace}.{table_name}.history LIMIT 10").show()

+--------------------+-------------------+---------+-------------------+
| made_current_at| snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2025-01-24 18:31:...|7415823990595480314| NULL| false|
|2025-01-26 22:51:...|6259991690916370439| NULL| false|
|2025-01-26 22:51:...|6556793190588413349| NULL| true|
+--------------------+-------------------+---------+-------------------+

Go back to a state before the ALTER table operation. The output shows no is_manager column.

# check the snapshot BEFORE table alteration had happened
spark.sql(f"SELECT * FROM {namespace}.{table_name} for system_version as of <snapshot-id-before-alter>
 LIMIT 5").show()
 
Output:
+---+----------+------+--------+
| id| name| level| salary|
+---+----------+------+--------+
| 1|Employee_1| Exec| 90500.0|
| 2|Employee_2|Junior| 68000.0|
| 3|Employee_3|Senior|155000.0|
| 4|Employee_4|Senior|113000.0|
| 5|Employee_5| Exec|159500.0|
+---+----------+------+--------+ 

Query the latest snapshot, where the newly inserted column is_manager is available.

spark.sql(f"SELECT * FROM {namespace}.{table_name} for system_version as of <snapshot-id-after-alter>
 LIMIT 5").show()
 
Output:
=================================

+---+----------+------+--------+----------+
| id| name| level| salary|is_manager|
+---+----------+------+--------+----------+
| 1|Employee_1| Exec| 90500.0| Y|
| 2|Employee_2|Junior| 68000.0| Y|
| 3|Employee_3|Senior|155000.0| Y|
| 4|Employee_4|Senior|113000.0| Y|
| 5|Employee_5| Exec|159500.0| Y|
+---+----------+------+--------+----------+

Cleaning up to avoid unwanted charges to your AWS account, delete all the AWS resources created during this deployment.

Delete the table:

aws s3tables delete-table \
  --namespace doeks_namespace \
  --table-bucket-arn ${S3TABLE_ARN} \
  --name employee_s3_table

Delete the namespace:

aws s3tables delete-namespace \
  --namespace doeks_namespace \
  --table-bucket-arn ${S3TABLE_ARN}

Delete the table bucket:

aws s3tables delete-table-bucket \
  --region "<REGION>" \
  --table-bucket-arn ${S3TABLE_ARN}

Delete the EKS cluster with all infrastructure:

cd ${DOEKS_HOME}/analytics/terraform/spark-k8s-operator && chmod +x cleanup.sh
./cleanup.sh

Conclusion

In this post, we demonstrated how to integrate Amazon S3 Tables with Apache Spark running on Amazon EKS. We walked through the workflow from cluster set up to executing Spark jobs that use S3 Tables capabilities. Using the Iceberg REST Catalog endpoints, we showed how to perform schema evolution and time travel queries, demonstrating the powerful features available when combining S3 Tables with Apache Spark on Amazon EKS.

This integration enables data teams to build scalable and efficient analytics workflows while benefiting from improved query performance and cost optimization. Whether you’re processing batch data or performing interactive analysis through JupyterHub, the combination of S3 Tables with Apache Spark on Amazon EKS provides a robust foundation for modern data processing needs.

Get started today!

Explore Amazon S3 Tables and how they can enhance your analytics workloads.

Follow the Data on EKS Blueprints to deploy your own Apache Spark on Amazon EKS with S3 Tables setup.