Skip to main content

EMR Spark with NVMe SSD Storage

This example demonstrates running Spark jobs on EMR on EKS using NVMe instance store SSDs for shuffle storage. This provides the highest I/O performance with sub-millisecond latency, ideal for shuffle-intensive workloads on Graviton instances.

What You'll Learn

  • How to configure Spark to use NVMe SSD instance stores for shuffle operations
  • How Karpenter provisions nodes with NVMe SSDs and configures RAID0
  • When to use NVMe storage vs. EBS hostpath or dynamic PVCs
  • How to verify NVMe storage is mounted and being used

When to Use This Example

Best for:

  • ✅ Shuffle-intensive workloads (large joins, aggregations, sorts)
  • ✅ Ultra-low latency requirements (<1ms)
  • ✅ High-throughput data processing (2.5 GB/s per drive)
  • ✅ Cost-performance optimization with Graviton instances

Not recommended for:

  • ❌ Workloads requiring data persistence (NVMe is ephemeral)
  • ❌ Long-running jobs sensitive to node failures
  • ❌ Regions without NVMe SSD instance availability
  • ❌ Workloads with minimal shuffle operations

Architecture: Local NVMe SSDs

Key Benefits:

  • 🔥 Ultra-High Performance: Up to 2.5 GB/s throughput per drive
  • Low Latency: Sub-millisecond latency for I/O operations
  • 💰 Cost Effective: Included with instance price, no additional storage cost
  • 🚀 Graviton Performance: ARM64 processors with superior price-performance

Trade-offs:

  • ⚠️ Ephemeral Storage: Data lost when instance terminates
  • 🔄 Limited Availability: Only on SSD instance types (c6gd, c7gd, m6gd, r6gd, etc.)
  • 📊 Node-Bound: Data tied to specific node

Prerequisites

  • Deploy EMR on EKS infrastructure: Infrastructure Setup
  • Karpenter configured with NVMe SSD instance types
  • Graviton-compatible EMR runtime (ARM64)

What is Shuffle Storage in Spark?

Shuffle storage holds intermediate data during Spark operations like groupBy, join, and reduceByKey. When data is redistributed across executors, it's temporarily stored before being read by subsequent stages.

Spark Shuffle Storage Comparison

Storage TypePerformanceCostLatencyUse Case
NVMe SSD🔥 Very High💰 Medium⚡ <1msMaximum performance
EBS Dynamic PVC⚡ High💰 Medium📊 1-3msProduction isolation
EBS Hostpath📊 Medium💵 Low📊 1-3msCost-optimized

When to Use NVMe SSD

  • ✅ Shuffle-intensive workloads (joins, aggregations)
  • ✅ Low-latency requirements
  • ✅ High-throughput data processing
  • ✅ Cost-performance optimization with Graviton

When to Avoid

  • ❌ Workloads requiring data persistence
  • ❌ Long-running jobs with node failures
  • ❌ Regions without SSD instance availability

NVMe Instance Types

Instance TypevCPUsMemoryNVMe StorageThroughput
c6gd.xlarge48 GiB237 GB2.5 GB/s
c6gd.2xlarge816 GiB474 GB2.5 GB/s
c6gd.4xlarge1632 GiB950 GB2.5 GB/s
c7gd.4xlarge1632 GiB950 GB2.5 GB/s
m6gd.4xlarge1664 GiB950 GB2.5 GB/s
r6gd.4xlarge16128 GiB950 GB2.5 GB/s

x86 (Intel/AMD)

Instance TypevCPUsMemoryNVMe StorageThroughput
c5d.4xlarge1632 GiB400 GB2.0 GB/s
c6id.4xlarge1632 GiB950 GB2.5 GB/s
m5d.4xlarge1664 GiB600 GB2.0 GB/s
Graviton Advantage

Graviton instances offer up to 40% better price-performance compared to x86 instances.

Example Configuration

Pod Template

The executor pod template configures NVMe hostpath storage:

# EMR on EKS Executor Pod Template - NVMe SSD Storage (Graviton)
# Uses Graviton instances WITH NVMe SSDs for high-performance shuffle
apiVersion: v1
kind: Pod
metadata:
name: emr-executor
namespace: emr-data-team-a
spec:
volumes:
# NVMe SSD storage - Karpenter configures RAID0 under /mnt/k8s-disks/0
- name: spark-local-dir-1
hostPath:
path: /mnt/k8s-disks/0
type: DirectoryOrCreate

nodeSelector:
# Use memory-optimized Graviton nodepool with SSD
NodeGroupType: SparkGravitonMemoryOptimized
node.kubernetes.io/arch: arm64

affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
# Only Graviton SSD instance families (with 'd' suffix)
- key: karpenter.k8s.aws/instance-family
operator: In
values: ["c6gd", "c7gd", "c8gd", "m6gd", "m7gd", "m8gd", "r6gd", "r7gd", "r8gd"]

tolerations:
- key: spark-executor
operator: Equal
value: "true"
effect: NoSchedule

initContainers:
- name: volume-permission
image: public.ecr.aws/docker/library/busybox
# Grant volume access to hadoop user
command: ['sh', '-c', 'mkdir -p /data1; chown -R 999:1000 /data1']
volumeMounts:
- name: spark-local-dir-1
mountPath: /data1

containers:
- name: spark-kubernetes-executor
volumeMounts:
- name: spark-local-dir-1
mountPath: /data1
readOnly: false

Spark Configuration

Key Spark properties for NVMe storage:

{
"spark.local.dir": "/data1",
"spark.driver.cores": "2",
"spark.executor.cores": "4",
"spark.driver.memory": "8g",
"spark.executor.memory": "16g",
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "10",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true"
}

Running the Example

1. Configure kubectl Access

First, ensure you have kubectl access to your EMR on EKS cluster:

# Navigate to the terraform directory
cd data-stacks/emr-on-eks/terraform/_local

# Get the kubectl configuration command
terraform output configure_kubectl

# Run the output command (example):
aws eks --region us-west-2 update-kubeconfig --name emr-on-eks

# Verify access
kubectl get nodes

2. Navigate to Example Directory

cd ../../examples/nvme-ssd

3. Review the Configuration

The example includes:

  • execute_emr_eks_job.sh - Job submission script
  • driver-pod-template.yaml - Driver pod configuration
  • executor-pod-template.yaml - Executor pod configuration with NVMe affinity
  • pyspark-taxi-trip.py - Sample PySpark application analyzing NYC taxi data

Key configuration in executor pod template:

  • Node selector for Graviton instances with SSD
  • Affinity rules for NVMe instance families (c6gd, c7gd, m6gd, r6gd, etc.)
  • HostPath volume mount to /mnt/k8s-disks/0 (Karpenter RAID0 array)

4. Submit the Spark Job

./execute_emr_eks_job.sh

This script will:

  1. Read Terraform outputs for EMR virtual cluster details
  2. Upload pod templates and PySpark script to S3
  3. Download NYC taxi dataset (11 parquet files, ~500MB)
  4. Submit EMR Spark job with NVMe SSD configuration

Expected output:

Starting EMR on EKS job submission...
Virtual Cluster ID: hclg71zute4fm4fpm3m2cobv0
Job submitted successfully!
Job ID: 000000036udo0ghs1mq

5. Monitor the Job

# Watch pods in real-time
kubectl get pods -n emr-data-team-a -w

# In another terminal, check job status
aws emr-containers list-job-runs \
--virtual-cluster-id $EMR_VIRTUAL_CLUSTER_ID_TEAM_A \
--region us-west-2

# View job logs
kubectl logs -f <driver-pod-name> -n emr-data-team-a

6. Verify NVMe Instance Provisioning

Check that Karpenter provisioned NVMe SSD instances:

# Describe an executor pod
kubectl describe pod taxidata-nvme-ssd-exec-1 -n emr-data-team-a | grep -A10 "Node-Selectors"

# Expected output:
# Node-Selectors: NodeGroupType=SparkGravitonMemoryOptimized
# node.kubernetes.io/arch=arm64
# Affinity:
# karpenter.k8s.aws/instance-family In [c6gd c7gd c8gd m6gd m7gd m8gd r6gd r7gd r8gd]

7. Check Node Instance Type

Verify the node is using an NVMe SSD instance:

# Get node name where executor is running
NODE=$(kubectl get pod taxidata-nvme-ssd-exec-1 -n emr-data-team-a \
-o jsonpath='{.spec.nodeName}')

# Check instance type (should have 'd' suffix for SSD)
kubectl get node $NODE -o jsonpath='{.metadata.labels.node\.kubernetes\.io/instance-type}'

# Expected output: c7gd.4xlarge, m6gd.4xlarge, r6gd.4xlarge, etc.

8. Verify NVMe RAID0 Mount

Check that Karpenter configured the NVMe storage as RAID0:

# Check RAID configuration
kubectl debug node/$NODE -it --image=ubuntu -- cat /proc/mdstat

# Expected output showing RAID0:
# md0 : active raid0 nvme1n1[1] nvme0n1[0]
# 1875385344 blocks super 1.2 512k chunks

# Verify mount point
kubectl debug node/$NODE -it --image=ubuntu -- df -h /mnt/k8s-disks/0

# Expected output:
# Filesystem Size Used Avail Use% Mounted on
# /dev/md0 900G 10G 890G 2% /mnt/k8s-disks/0

Performance Characteristics

Throughput

  • Sequential Read: Up to 2.5 GB/s per drive
  • Sequential Write: Up to 2.5 GB/s per drive
  • Random Read: Up to 400,000 IOPS
  • Random Write: Up to 400,000 IOPS

Latency

  • Average: <1ms
  • P99: <2ms
  • P99.9: <5ms

Cost Analysis

Example for 10 executors on c7gd.4xlarge running 1 hour:

ComponentInstance TypeCost/HourCountTotal Cost
Compute + NVMec7gd.4xlarge$0.693 nodes$2.07
EBS (for comparison)c7g.4xlarge + EBS$0.58 + $0.403 nodes$2.94
Savings---30%
Cost Calculation

NVMe storage is included in instance price. Savings come from avoiding separate EBS volumes and better performance.

Karpenter Configuration

EC2NodeClass for NVMe

Karpenter automatically configures RAID0 for NVMe drives:

apiVersion: karpenter.k8s.aws/v1
kind: EC2NodeClass
metadata:
name: ephemeral-nvme-local-provisioner
spec:
amiFamily: AL2023
role: karpenter-node-role

# RAID0 configuration for NVMe drives
instanceStorePolicy: RAID0

# User data creates symlinks for discovery
userData: |
#!/bin/bash
cat <<EOF > /etc/udev/rules.d/90-kubernetes-discovery.rules
# Discover Instance Storage disks
KERNEL=="nvme[0-9]*n[0-9]*", ENV{DEVTYPE}=="disk", \
ATTRS{model}=="Amazon EC2 NVMe Instance Storage", \
ATTRS{serial}=="?*", \
SYMLINK+="disk/kubernetes/nvme-\$attr{model}_\$attr{serial}"
EOF
udevadm control --reload && udevadm trigger

NodePool for NVMe Instances

apiVersion: karpenter.sh/v1
kind: NodePool
metadata:
name: nvme-ssd-graviton
spec:
template:
spec:
nodeClassRef:
name: ephemeral-nvme-local-provisioner
requirements:
- key: karpenter.k8s.aws/instance-family
operator: In
values: ["c6gd", "c7gd", "m6gd", "r6gd"]
- key: kubernetes.io/arch
operator: In
values: ["arm64"]

Performance Tuning

1. Optimize Spark Shuffle

{
"spark.shuffle.file.buffer": "1m",
"spark.shuffle.unsafe.file.output.buffer": "5m",
"spark.io.compression.codec": "lz4",
"spark.shuffle.compress": "true"
}

2. Increase Parallelism

{
"spark.sql.shuffle.partitions": "200",
"spark.default.parallelism": "200"
}

3. Enable Adaptive Query Execution

{
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true"
}

Troubleshooting

Pods Stuck in Pending

Check if NVMe instance types are available:

kubectl get nodeclaims
kubectl describe nodeclaim <nodeclaim-name>

Common issues:

  • No NVMe instances available in AZ
  • Instance type not in Karpenter NodePool
  • Insufficient capacity

NVMe Not Mounted

Check Karpenter logs:

kubectl logs -n karpenter -l app.kubernetes.io/name=karpenter --tail=100

Verify RAID0 configuration:

kubectl debug node/$NODE -it --image=ubuntu -- \
cat /proc/mdstat

Permission Denied Errors

Verify init container permissions:

kubectl logs taxidata-nvme-ssd-exec-1 -n emr-data-team-a -c volume-permission

Instance Type Not Supported

Update pod template to include more instance families:

affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: karpenter.k8s.aws/instance-family
operator: In
values: ["c6gd", "c7gd", "c8gd", "m6gd", "m7gd", "r6gd", "r7gd"]

Best Practices

1. Use Graviton Instances

Graviton instances offer better price-performance:

nodeSelector:
node.kubernetes.io/arch: arm64

2. Enable Dynamic Allocation

{
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.shuffleTracking.enabled": "true"
}

3. Monitor NVMe Health

Set up CloudWatch alarms for disk metrics:

aws cloudwatch put-metric-alarm \
--alarm-name nvme-disk-usage \
--metric-name disk_used_percent \
--threshold 80

4. Handle Node Failures Gracefully

Enable shuffle tracking for dynamic allocation:

{
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.dynamicAllocation.shuffleTracking.timeout": "60s"
}

5. Clean Up Shuffle Data

NVMe data is ephemeral, but clean up between jobs:

# Clean up shuffle data on node
kubectl debug node/$NODE -it --image=ubuntu -- \
rm -rf /mnt/k8s-disks/0/spark-*

Comparison with Other Storage Options

vs. EBS Dynamic PVC

FeatureNVMe SSDEBS PVC
Performance🔥 Very High⚡ High
Latency⚡ <1ms📊 1-3ms
Cost✅ Included💰 Additional
Durability⚠️ Ephemeral✅ Persistent

vs. EBS Hostpath

FeatureNVMe SSDEBS Hostpath
Performance🔥 Very High📊 Medium
Throughput2.5 GB/s250 MB/s
IOPS400K3K-16K
Availability⚠️ SSD instances✅ All instances

Next Steps

Additional Resources