Skip to main content

Distributed Data Processing with Ray Data

What is Ray Data?

Ray Data is a scalable, framework-agnostic data processing library built on top of Ray, designed for distributed data analytics and machine learning workloads. It provides:

  • Distributed Processing: Parallel data processing across multiple Ray worker nodes
  • Lazy Evaluation: Operations are optimized and executed only when results are needed
  • Rich Data Connectors: Native support for various data sources including S3, databases, and file systems
  • Memory Management: Efficient handling of large datasets that don't fit in memory
  • Integration with ML Libraries: Seamless integration with pandas, NumPy, and PyArrow

Why Ray Data? Is this an Alternative Tool to Spark?

Ray Data is complementary to Spark, not a direct replacement. While both are distributed data processing frameworks, they serve different use cases:

Ray Data excels when you need:

  • Python-native data processing with familiar pandas/NumPy APIs
  • Tight integration with machine learning pipelines
  • Real-time or streaming data processing
  • Complex iterative algorithms

Spark remains ideal for:

  • Large-scale ETL operations
  • Complex SQL-based analytics
  • Enterprise data warehouse workloads
  • Cross-language support (Scala, Java, Python, R)

Problem Statement

When Apache Spark applications run on Kubernetes, they generate extensive logs that are captured by Fluent Bit and written to S3. However, these logs present several challenges for data engineers:

  1. Unstructured Format: Spark logs are written as raw text files without a consistent schema
  2. No Query Capability: Engineers cannot easily query logs using SQL-based tools like Amazon Athena
  3. Metadata Enrichment: Fluent Bit adds Kubernetes metadata as JSON, creating mixed formats
  4. Performance Issues: Scanning raw log files for troubleshooting is time-consuming and expensive

Solution: Use Ray Data to periodically process these unstructured logs, apply a consistent schema, and write them to Apache Iceberg tables. This enables:

  • ✅ SQL queries via Amazon Athena
  • ✅ Structured data with defined schema
  • ✅ Efficient columnar storage format
  • ✅ Time-travel and versioning capabilities

Log Snippet in S3 Before Processing

Here's what Spark logs look like when written to S3 by Fluent Bit:

{
"log": "2024-01-15 14:23:45 INFO SparkContext: Running Spark version 3.5.0\n",
"stream": "stdout",
"time": "2024-01-15T14:23:45.123456Z",
"kubernetes": {
"pod_name": "spark-driver-abc123",
"namespace_name": "spark-team-a",
"pod_id": "12345678-1234-1234-1234-123456789012",
"labels": {
"spark-role": "driver",
"spark-app-id": "spark-application-12345"
},
"container_name": "spark-driver",
"container_image": "spark:3.5.0"
}
}
{
"log": "2024-01-15 14:23:46 INFO ResourceUtils: Using Spark's default log4j profile\n",
"stream": "stdout",
"time": "2024-01-15T14:23:46.234567Z",
"kubernetes": {
"pod_name": "spark-driver-abc123",
"namespace_name": "spark-team-a",
"pod_id": "12345678-1234-1234-1234-123456789012",
"labels": {
"spark-role": "driver",
"spark-app-id": "spark-application-12345"
},
"container_name": "spark-driver",
"container_image": "spark:3.5.0"
}
}

Key Challenges:

  • Each log line is wrapped in JSON with Kubernetes metadata
  • The actual log message is embedded in the log field
  • No structured schema for querying specific log levels or components
  • Redundant metadata repeated for each log line
Fluent Bit Enrichment

Fluent Bit automatically enriches each log line with Kubernetes metadata including pod name, namespace, labels, and container information. This enrichment is configured in the aws-for-fluentbit-values.yaml file. While this metadata is valuable for debugging, it creates a mixed format that's difficult to query efficiently.

📋 Architecture Overview

How Ray Data Transforms Log Processing

Ray Data periodically fetches new logs from S3, processes them in parallel, and writes structured data to Apache Iceberg tables. The solution includes:

Key Features of Ray Data Processing

📊 Schema Extraction and Parsing

Ray Data intelligently extracts structured fields from unstructured logs:

  • 🕐 timestamp - Parsed from the log message
  • 🏷️ log_level - Extracted levels (INFO, WARN, ERROR, DEBUG)
  • 🔧 component - Spark component (SparkContext, ResourceUtils, etc.)
  • 📝 message - The actual log content
  • 🏠 pod_name & namespace - From Kubernetes metadata
  • 👷 spark_role - Driver or Executor identification
  • 🆔 application_id - Unique Spark application identifier

🔍 Intelligent Filtering and Querying

Once processed, you can easily query logs using SQL:

-- Find all ERROR logs for a specific application
SELECT timestamp, component, message
FROM spark_logs
WHERE log_level = 'ERROR'
AND application_id = 'spark-application-12345'
AND timestamp > '2024-01-15 00:00:00'
ORDER BY timestamp DESC;

-- Analyze log patterns by component
SELECT component, log_level, COUNT(*) as count
FROM spark_logs
WHERE namespace = 'spark-team-a'
GROUP BY component, log_level
ORDER BY count DESC;

-- Track application lifecycle events
SELECT timestamp, message
FROM spark_logs
WHERE component = 'SparkContext'
AND (message LIKE '%Starting%' OR message LIKE '%Stopping%')
ORDER BY timestamp;

🎯 Metadata Management

  • Idempotent Processing - Tracks processed folders to avoid reprocessing
  • 📋 Metadata Table - Maintains processing history and state
  • 🔄 Auto-Discovery - Automatically finds new log folders in S3
  • Incremental Updates - Processes only new data for efficiency

🚀 Getting Started

Prerequisites

Before deploying this blueprint, ensure you have:

  • S3 bucket with Spark application logs: Follow the Spark Operator blueprint to generate Spark logs. Note: Execute the steps in Put sample data in S3 section of the Execute Sample Spark job with Karpenter step to populate the S3 bucket with Spark Application Logs.
  • AWS CLI configured with appropriate permissions
  • ✅ kubectl
  • Terraform installed (>= 1.0)
Generate Spark Logs First

The Ray Data pipeline processes Spark application logs. Make sure you've run the taxi-trip example from the Spark Operator blueprint to populate your S3 bucket with logs.

📁 Spark Logfile Structure in S3:

s3://${S3_BUCKET}/
└── spark-application-logs/
└── spark-team-a/
├── spark-application-1234567890-driver/
│ └── stdout
├── spark-application-1234567890-exec-1/
│ └── stdout
└── spark-application-1234567890-exec-2/
└── stdout

Each stdout file contains JSON-formatted logs with Kubernetes metadata enrichment from Fluent Bit.

Step 1: Enable Ray Data Processing

Deploy the EKS cluster with Ray Data components by enabling the enable_raydata variable. This will install:

  • KubeRay Operator - Manages Ray clusters on Kubernetes
  • Ray Custom Resources - RayJob and RayCluster CRDs
  • AWS Resources - IAM roles, S3 access policies, and Glue database
  • Ray Data Pipeline - Namespace, service accounts, and RBAC
cd analytics/terraform/spark-k8s-operator

# Deploy EKS cluster with Ray Data support enabled
export TF_VAR_enable_raydata=true

terraform init
terraform plan
terraform apply -auto-approve
Deployment Time

The full deployment takes approximately 20-25 minutes to create the EKS cluster, install operators, and configure all Ray Data components.

This deployment creates:

  • 🎯 KubeRay Operator for Ray job orchestration
  • 🔐 Ray Service Account with IRSA (IAM Roles for Service Accounts)
  • 📝 IAM Roles with S3 and Glue permissions
  • 📊 AWS Glue Database for Iceberg catalog
  • 🌐 Kubernetes Namespace (raydata)

Step 2: Verify KubeRay Operator Installation

Confirm that the KubeRay Operator is running successfully:

kubectl get po -n kuberay-operator

Expected output:

NAME                                READY   STATUS    RESTARTS   AGE
kuberay-operator-74fcdcc6bf-gpl5p 1/1 Running 0 10h

Step 3: Configure Ray Job

Navigate to the example directory and update the S3 configuration in the deployment script.

cd examples/raydata-sparklogs-processing-job

Replace S3_BUCKET, CLUSTER_NAME and AWS_REGION variables in the execute-rayjob.sh shell script before running.

Step 4: Deploy the Ray Cluster & Execute Ray Job

# Make script executable
chmod +x execute-rayjob.sh

# Deploy the processing job
./execute-rayjob.sh deploy

📊 Monitoring the RayJob Deployment

Check Job Status

Monitor your Ray job with these commands:

# Monitor job progress in real-time
./execute-rayjob.sh monitor

# Check current status
./execute-rayjob.sh status

# View processing logs
./execute-rayjob.sh logs

Check RayJob Logs

2025-07-27 22:04:46,324 - spark-log-processor - INFO - ✅ Successfully processed 1287 records from spark-fb094270bf654473b372d0f773e86687
2025-07-27 22:04:46,324 - spark-log-processor - INFO - 🎯 Processing Summary:
2025-07-27 22:04:46,324 - spark-log-processor - INFO - 📊 Total records processed: 1287
2025-07-27 22:04:46,324 - spark-log-processor - INFO - ✅ Successful folders: 1
2025-07-27 22:04:46,324 - spark-log-processor - INFO - ❌ Failed folders: 0
2025-07-27 22:04:46,324 - spark-log-processor - INFO - ✅ Successfully processed: ['spark-fb094270bf654473b372d0f773e86687']
2025-07-27 22:04:46,324 - spark-log-processor - INFO - ✅ Metadata-driven incremental processing completed
What's Happening Behind the Scenes?

When you deploy the RayJob, the following automated process occurs:

  1. 🚀 Ray Cluster Initialization - KubeRay Operator creates a Ray cluster with head and worker nodes
  2. 🔍 S3 Discovery - Ray Data scans the configured S3 bucket path for folders matching spark-* pattern
  3. 📊 Metadata Check - Queries the Iceberg metadata table to identify unprocessed folders
  4. 📥 Parallel Processing - Ray workers read JSON log files from S3 in parallel
  5. 🔄 Data Transformation - Extracts structured fields from JSON logs (timestamp, log level, component, etc.)
  6. ✍️ Iceberg Writing - Writes transformed data to Apache Iceberg tables with ACID guarantees
  7. 📝 Metadata Update - Records processing status in metadata table for idempotency
  8. 🎯 Completion - Shuts down Ray cluster after successful processing

The entire process is idempotent - you can safely re-run it without duplicating data, as it only processes new log folders.

Access Ray Dashboard

🎨 Ray Dashboard Access

👈

✅ Data Verification

S3 Bucket Structure

Ray Data uses the same S3 bucket for both input Spark logs and output Iceberg data, organized in separate paths:

s3://your-spark-logs-bucket/
├── spark-application-logs/ # 📥 Input: Raw Spark logs from Fluent Bit
│ └── spark-team-a/
│ ├── spark-application-1234567890-driver/
│ │ └── stdout # JSON logs with Kubernetes metadata
│ ├── spark-application-1234567890-exec-1/
│ │ └── stdout
│ └── spark-application-1234567890-exec-2/
│ └── stdout

└── iceberg-warehouse/ # 📤 Output: Processed Iceberg data
└── raydata_spark_logs.db/
└── spark_logs/
├── metadata/ # Iceberg metadata files
│ ├── 00000-xxx.metadata.json
│ ├── snap-xxx.avro # Snapshots for time travel
│ └── version-hint.text
└── data/ # Actual data in Parquet format
├── 00000-0-xxx.parquet
├── 00001-0-xxx.parquet
└── ...
Same Bucket, Different Paths
  • Input Path: s3://bucket/spark-application-logs/ - Contains raw JSON logs
  • Output Path: s3://bucket/iceberg-warehouse/ - Contains structured Parquet files
  • Storage Format: Iceberg uses efficient columnar Parquet format with metadata for ACID transactions

In AWS S3 Console, it should looks like below:

s3

Log Snippet After RayData Processing

Here's how the data transformation looks before and after Ray Data processing:

Raw Fluent Bit logs in S3 - Each log line wrapped in JSON with redundant metadata:

{
"log": "2024-01-15 14:23:45 INFO SparkContext: Running Spark version 3.5.0\n",
"stream": "stdout",
"time": "2024-01-15T14:23:45.123456Z",
"kubernetes": {
"pod_name": "spark-driver-abc123",
"namespace_name": "spark-team-a",
"pod_id": "12345678-1234-1234-1234-123456789012",
"labels": {
"spark-role": "driver",
"spark-app-id": "spark-application-12345"
},
"container_name": "spark-driver",
"container_image": "spark:3.5.0"
}
}
{
"log": "2024-01-15 14:23:46 ERROR TaskSchedulerImpl: Lost executor 1: Container killed\n",
"stream": "stderr",
"time": "2024-01-15T14:23:46.234567Z",
"kubernetes": {
"pod_name": "spark-executor-def456",
"namespace_name": "spark-team-a",
"labels": {
"spark-role": "executor",
"spark-app-id": "spark-application-12345"
}
}
}

Option 1: Query Iceberg Tables

Use the built-in data verification script provided in the blueprint that automatically sets up a Python virtual environment and all required dependencies:

from your environment.

# Make script executable
chmod +x verify-iceberg-data.sh

Replace S3_BUCKET and AWS_REGION variables in the verify-iceberg-data.sh shell script before running.

./verify-iceberg-data.sh

The script automatically...

  • ✅ Creates an isolated Python virtual environment
  • ✅ Installs PyIceberg and all dependencies (pyiceberg[glue,s3fs]==0.7.0)
  • ✅ Connects to AWS Glue catalog and Iceberg tables
  • ✅ Performs comprehensive data validation
  • ✅ Cleans up temporary files and environment after completion

📋 Sample Script Output

👈

Option 2: Use AWS CLI

Check table metadata without querying data:

# View Iceberg table in Glue catalog
aws glue get-table \
--database-name raydata_spark_logs \
--name spark_logs \
--query 'Table.StorageDescriptor.Location'

🧹 Cleanup

To clean up resources:

# Remove Ray job only (preserve infrastructure)
./execute-rayjob.sh cleanup

# Remove all infrastructure
cd analytics/terraform/spark-k8s-operator
terraform destroy -var="enable_raydata_processing=true"

🌟 Scale Your Data Pipeline

  • Scale Processing: Adjust Ray worker counts in rayjob.yaml for larger workloads
  • Add Analytics: Create dashboards using Amazon QuickSight or Grafana
  • Automate: Schedule regular processing with Kubernetes CronJobs
  • Extend: Process other data types like metrics, events, or application data

This blueprint demonstrates how Ray Data and Apache Iceberg can work together to build scalable, reliable data processing pipelines on Amazon EKS. The combination provides a modern data lake architecture with distributed processing capabilities, ACID transactions, and intelligent metadata management.