Skip to main content

Deploying Trino on EKS

Introduction

Trino is an open-source, fast, distributed query engine designed to run SQL queries for big data analytics, over a multitude of data sources including Amazon S3, relational databases, distributed data stores and data warehouses.

When Trino executes a query, it does so by breaking up the execution into a hierarchy of stages, which are implemented as a series of tasks distributed over a network of Trino workers. A Trino cluster consists of a coordinator and many workers for parallel processing, which can be deployed as Kubernetes pods on an EKS cluster. The coordinator and the workers collaborate to access connected data sources, with schemas and references stored in a catalog. To access the data sources, you can use one of the many connectors provided by Trino to adapt Trino. Examples include as Hive, Iceberg and Kafka. More details about Trino project can be found on this link

Blueprint Solution

This blueprint will deploy Trino on an EKS cluster (Kubernetes version 1.29) with nodes provisioned using Karpenter (v0.34.0). To optimize on cost and performance, Karpenter will provision On-demand nodes for the Trino coordinator and EC2 Spot instances for Trino workers. With Trino's multi-architectural container images, Karpenter NodePool will allow provisioning of nodes with EC2 instances from different cpu architectures including AWS Graviton based instances. Trino is deployed using the official Helm chart, with custom values provided for users to leverage Hive and Iceberg connectors. The examples will use Glue and Iceberg tables on AWS as the backend data source, using S3 as the storage.

Deploying the Solution

Let's go through the deployment steps.

Prerequisites

Ensure that you have installed the following tools on your machine.

  1. aws cli
  2. kubectl
  3. terraform
  4. Trino CLI client
Toggle to see installation steps of Trino CLI
wget https://repo1.maven.org/maven2/io/trino/trino-cli/427/trino-cli-427-executable.jar
mv trino-cli-427-executable.jar trino
chmod +x trino

Deploy the EKS Cluster with Trino

First, clone the repository

git clone https://github.com/awslabs/data-on-eks.git

Navigate into distributed-databases/trino and run install.sh script. Enter, when prompted, the AWS region you want to provision the resources to (for example, us-west-2).

cd data-on-eks/distributed-databases/trino

./install.sh

Verify Deployment

Verify the Amazon EKS Cluster

#Select your own region where you deployed resources
aws eks describe-cluster --name trino-on-eks --region us-west-2

Update local kubeconfig so we can access kubernetes cluster (you can also get this command from the terraform output configure_kubectl )

aws eks update-kubeconfig --name trino-on-eks --region us-west-2

First, let's verify that we have worker nodes provisioned by Karpenter in the cluster. Let's also see their availability zone and capacity type (on-demand or spot)

kubectl get nodes --selector=karpenter.sh/nodepool=trino-karpenter -L topology.kubernetes.io/zone -L karpenter.sh/capacity-type -L node.kubernetes.io/instance-type

Output

NAME                                        STATUS   ROLES    AGE   VERSION               ZONE         CAPACITY-TYPE   INSTANCE-TYPE
ip-10-1-11-131.us-west-2.compute.internal Ready <none> 23m v1.29.0-eks-5e0fdde us-west-2b spot is4gen.2xlarge
ip-10-1-11-49.us-west-2.compute.internal Ready <none> 24m v1.29.0-eks-5e0fdde us-west-2b on-demand t4g.medium

We can see above that Karpenter provisioned on-demand node for running Trino coordinator and spot node for running Trino workers in the same availability zone.

info

For a distributed Big Data query engine like Trino which runs on a massively parallel processing cluster, it is recommended to deploy the cluster in same availability zone to avoid incurring high Inter-AZ Data Transfer costs. That's why Karpenter NodePool has been configured to launch EKS nodes in same AZ

Now, let's verify the coordinator and worker pods running in trino namespace

kubectl get pods --namespace=trino

Output

NAME                                 READY   STATUS    RESTARTS   AGE
trino-coordinator-5cfd685c8f-mchff 1/1 Running 0 37m
trino-worker-6c896d96d6-kr895 1/1 Running 0 37m
trino-worker-6c896d96d6-llp9j 1/1 Running 0 37m
trino-worker-6c896d96d6-zpglv 1/1 Running 0 37m

Next, we will retrieve the Application Load Balancer DNS fronting Trino service from Ingress object

export TRINO_UI_DNS=$(kubectl describe ingress --namespace=trino | grep Address: | awk '{ print "http://"$2 }')
echo $TRINO_UI_DNS

Output

http://k8s-trino-trinocoo-f64c9587b5-1488329710.us-west-2.elb.amazonaws.com

Now, lets access the Trino UI by pasting above Load Balancer DNS in a web browser and login with username admin in the login window as shown below:

Trino UI Login

We can see Trino Web UI showing 3 active workers

Trino UI

Using Trino for database querying executions

Example #1: Using the Hive Connector

In this example, we will set up a Hive metastore using AWS Glue, with the source data stored in S3, and crawler that will infer schema from it to build a Glue table.

Using Trino on EKS with the Glue connector, we will use Trino CLI to run sample SQL queries to retrieve data.

Setup

Run the hive script from the examples directory to set up the blueprint S3 bucket with the 2022 NYC Taxi dataset (in Parquet), and build Glue metastore:

cd examples/
./hive-setup.sh

You will see some outputs to show progress, and if successful, will see the name of the Glue table that will store the metadata as hive.

Running the queries

You should have the Trino CLI installed as part of the prerequisite. The blueprint has the Hive Connector configured with the bucket we set up in the previous section, so you should be able to query the data source without additional settings.

First, port-forward your trino coordinator pod to access it locally:

COORDINATOR_POD=$(kubectl get pods -l "app=trino,release=trino,component=coordinator" -o name -n trino)
kubectl port-forward $COORDINATOR_POD -n trino 8080:8080

While the port-forward is running, open another terminal tab where you have Trino CLI and run the following command to access the coordinator:

./trino http://127.0.0.1:8080 --user admin

Once successful, you will be able to get a prompt to execute commands. You can use help command to see a list of supported commands.

For example:

To show a list of catalogs, run query - SHOW CATALOGS; and you can see hive and iceberg catalogs among others configured by the blueprint

Output

 Catalog
---------
hive
iceberg
system
tpcds
tpch
(5 rows)

Query 20240215_200117_00003_6jdxw, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.49 [0 rows, 0B] [0 rows/s, 0B/s]

To see the schemas (databases) in Hive catalog, run query - SHOW SCHEMAS FROM hive; :

Output

  Schema
--------------------
information_schema
taxi_hive_database
(2 rows)

Let's use taxi_hive_database and show table in this database -

USE hive.taxi_hive_database;
SHOW TABLES;

Output

Table
-------
hive
(1 row)

Finally, to run a simple query to list items - SELECT * FROM hive LIMIT 5;

Output

vendorid |  tpep_pickup_datetime   |  tpep_dropoff_datetime  | passenger_count | trip_distance | ratecodeid | store_and_fwd_flag | pulocationid | dolocation>
----------+-------------------------+-------------------------+-----------------+---------------+------------+--------------------+--------------+----------->
1 | 2022-09-01 00:28:12.000 | 2022-09-01 00:36:22.000 | 1.0 | 2.1 | 1.0 | N | 100 | 2>
1 | 2022-11-01 00:24:49.000 | 2022-11-01 00:31:04.000 | 2.0 | 1.0 | 1.0 | N | 158 | 1>
1 | 2022-11-01 00:37:32.000 | 2022-11-01 00:42:23.000 | 2.0 | 0.8 | 1.0 | N | 249 | 1>
2 | 2022-09-01 00:02:24.000 | 2022-09-01 00:09:39.000 | 1.0 | 1.32 | 1.0 | N | 238 | 1>
2 | 2022-09-01 00:47:25.000 | 2022-09-01 00:56:09.000 | 1.0 | 2.94 | 1.0 | N |

Cleaning Up Hive Resources

  1. Exit from Trino CLI with exit command.

  2. Run the cleanup script from the examples directory to delete all the resources created from the hive script:

cd data-on-eks/distributed-databases/trino/examples
./hive-cleanup.sh

Example #2: Using the Iceberg Connector

In this example, we will set up using Apache Iceberg with AWS Glue as the catalog type, and will store the data in Amazon S3 with PARQUET format.

Using Trino on EKS with the Iceberg connector, we will use Trino CLI to create the above resources and run sample SQL queries to insert and retrieve data.

Running the queries

  • Let's find out the S3 data bucket created by blueprint. We will use this bucket to store data in Iceberg tables in PARQUET format.
cd data-on-eks/distributed-databases/trino
export BUCKET=$(terraform output --state="./terraform.tfstate" --raw data_bucket)
echo $BUCKET

Output

trino-data-bucket-20240215180855515400000001
  • Let’s now create an Iceberg schema with tables populated with data from sf10000 schema tables of TPCDS. We will use CREATE TABLE AS SELECT (CTAS) statements. For that, open a new terminal where you have Trino CLI installed and create a SQL file trino_sf10000_tpcds_to_iceberg.sql by copying below SQL statements
(Don't forget to replace S3 bucket name for LOCATION below with your bucket from above command):
use tpcds.sf10000;
select * from tpcds.sf10000.item limit 10;
select * from tpcds.sf10000.warehouse limit 10;

/* Drop tables & schema */

drop schema iceberg.iceberg_schema;
drop table iceberg.iceberg_schema.warehouse;
drop table iceberg.iceberg_schema.item;
drop table iceberg.iceberg_schema.inventory;
drop table iceberg.iceberg_schema.date_dim;

/* Iceberg schema creation */

create schema if not exists iceberg.iceberg_schema
with (LOCATION = 's3://trino-data-bucket-20240215180855515400000001/iceberg/');

/* Iceberg Table Creation with CTAS from tpcds tables */

create table if not exists iceberg.iceberg_schema.inventory
with (FORMAT = 'PARQUET')
as select *
from tpcds.sf10000.inventory;

create table if not exists iceberg.iceberg_schema.date_dim
with (FORMAT = 'PARQUET')
as select d_date_sk,
cast(d_date_id as varchar(16)) as d_date_id,
d_date,
d_month_seq,
d_week_seq,
d_quarter_seq,
d_year,
d_dow,
d_moy,
d_dom,
d_qoy,
d_fy_year,
d_fy_quarter_seq,
d_fy_week_seq,
cast(d_day_name as varchar(9)) as d_day_name,
cast(d_quarter_name as varchar(6)) as d_quarter_name,
cast(d_holiday as varchar(1)) as d_holiday,
cast(d_weekend as varchar(1)) as d_weekend,
cast(d_following_holiday as varchar(1)) as d_following_holiday,
d_first_dom,
d_last_dom,
d_same_day_ly,
d_same_day_lq,
cast(d_current_day as varchar(1)) as d_current_day,
cast(d_current_week as varchar(1)) as d_current_week,
cast(d_current_month as varchar(1)) as d_current_month,
cast(d_current_quarter as varchar(1)) as d_current_quarter
from tpcds.sf10000.date_dim;

create table if not exists iceberg.iceberg_schema.warehouse
with (FORMAT = 'PARQUET')
as select
w_warehouse_sk,
cast(w_warehouse_id as varchar(16)) as w_warehouse_id,
w_warehouse_name,
w_warehouse_sq_ft,
cast(w_street_number as varchar(10)) as w_street_number,
w_street_name,
cast(w_street_type as varchar(15)) as w_street_type,
cast(w_suite_number as varchar(10)) as w_suite_number,
w_city,
w_county,
cast(w_state as varchar(2)) as w_state,
cast(w_zip as varchar(10)) as w_zip,
w_country,
w_gmt_offset
from tpcds.sf10000.warehouse;

create table if not exists iceberg.iceberg_schema.item
with (FORMAT = 'PARQUET')
as select
i_item_sk,
cast(i_item_id as varchar(16)) as i_item_id,
i_rec_start_date,
i_rec_end_date,
i_item_desc,
i_current_price,
i_wholesale_cost,
i_brand_id,
cast(i_brand as varchar(50)) as i_brand,
i_class_id,
cast(i_class as varchar(50)) as i_class,
i_category_id,
cast(i_category as varchar(50)) as i_category,
i_manufact_id,
cast(i_manufact as varchar(50)) as i_manufact,
cast(i_size as varchar(50)) as i_size,
cast(i_formulation as varchar(20)) as i_formulation,
cast(i_color as varchar(20)) as i_color,
cast(i_units as varchar(10)) as i_units,
cast(i_container as varchar(10)) as i_container,
i_manager_id,
cast(i_product_name as varchar(50)) as i_product_name
from tpcds.sf10000.item;


/* Select from Iceberg table */

select * from iceberg.iceberg_schema.date_dim limit 10;
select * from iceberg.iceberg_schema.item limit 10;
select * from iceberg.iceberg_schema.inventory limit 10;

/* Running query from Iceberg table */

with inv as
(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stdev,mean, case mean when 0 then null else stdev/mean end cov
from(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stddev_samp(inv_quantity_on_hand) stdev,avg(inv_quantity_on_hand) mean
from iceberg.iceberg_schema.inventory
,iceberg.iceberg_schema.item
,iceberg.iceberg_schema.warehouse
,iceberg.iceberg_schema.date_dim
where inv_item_sk = i_item_sk
and inv_warehouse_sk = w_warehouse_sk
and inv_date_sk = d_date_sk
and d_year =1999
group by w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy) foo
where case mean when 0 then 0 else stdev/mean end > 1)
select inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean, inv1.cov
,inv2.w_warehouse_sk,inv2.i_item_sk,inv2.d_moy,inv2.mean, inv2.cov
from inv inv1,inv inv2
where inv1.i_item_sk = inv2.i_item_sk
and inv1.w_warehouse_sk = inv2.w_warehouse_sk
and inv1.d_moy=4
and inv2.d_moy=4+1
and inv1.cov > 1.5
order by inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean,inv1.cov,inv2.d_moy,inv2.mean, inv2.cov;
  • Above SQL commands will execute following actions:

    • Create an Iceberg schema named iceberg_schema
    • Create 4 Iceberg tables - warehouse, item, inventory and date_dim with data from same tables of tpcds
    • Query data from above Iceberg tables
  • Let's now execute above SQL commands using Trino CLI:

export TRINO_UI_DNS=$(kubectl describe ingress --namespace=trino | grep Address: | awk '{ print "http://"$2 }')
./trino --file 'trino_sf10000_tpcds_to_iceberg.sql' --server ${TRINO_UI_DNS} --user admin --ignore-errors
  • You can see completed and running SQL queries in Trino UI web monitor as below:

Trino Queries

  • Let’s open another terminal and see how Horizontal Pod Autoscaler (HPA) is scaling Trino worker pods, when above SQL commands are running:
kubectl get hpa -n trino -w

Output

NAME           REFERENCE                 TARGETS   MINPODS   MAXPODS   REPLICAS   AGE
trino-worker Deployment/trino-worker 0%/70% 3 20 3 3d2h
trino-worker Deployment/trino-worker 0%/70% 3 20 3 3d2h
trino-worker Deployment/trino-worker 170%/70% 3 20 3 3d2h
trino-worker Deployment/trino-worker 196%/70% 3 20 6 3d2h
trino-worker Deployment/trino-worker 197%/70% 3 20 9 3d2h
trino-worker Deployment/trino-worker 197%/70% 3 20 9 3d2h
trino-worker Deployment/trino-worker 197%/70% 3 20 9 3d2h
trino-worker Deployment/trino-worker 125%/70% 3 20 9 3d2h
trino-worker Deployment/trino-worker 43%/70% 3 20 9 3d2h
trino-worker Deployment/trino-worker 152%/70% 3 20 9 3d2h
trino-worker Deployment/trino-worker 179%/70% 3 20 9 3d2h

You can see HPA scaling from initial 3 workers to 9 workers running in Trino UI with increasing query load and average cpu utilization of workerss:

Trino Scaling

Example #3 (Optional): Fault-tolerant execution in Trino

Fault-tolerant execution is an opt-in mechanism in Trino that was implemented using Project Tardigrade. Without fault-tolerant configuration, Trino query fails whenever any of the component tasks of the query fails due to any reason (for example, a worker node failure or termination). These failed queries have to be restarted from scratch resulting in longer execution time, compute wastage, and spend, especially for long-running queries.

When fault-tolerant execution is configured in Trino with a retry policy, intermediate exchange data is spooled in an external storage like Amazon S3 or HDFS using exchange manager. Trino then retries failed query (if retry policy is configured as "QUERY") or failed tasks (if retry policy is configured as "TASK"). Trino's remaining workers reuse exchange manager data to retry and complete query in the event of a worker outage or other fault during query execution.

info

A QUERY retry policy instructs Trino to retry whole query when an error occurs on a worker node. It is recommended to use this retry policy when the majority of the workload for the Trino cluster comprises many small queries.

A TASK retry policy instructs Trino to retry individual tasks in the event of failure. It is recommended to use this policy when Trino executes large batch queries. The cluster can more efficiently retry smaller tasks within the query rather than retry the whole query.

  • This blueprint has deployed Trino cluster with fault-tolerant configuration with a TASK retry policy in config.properties file in coordinator and worker pods. Let's verify that by opening a bash command shell inside coordinator pod:
COORDINATOR_POD=$(kubectl get pods -l "app=trino,release=trino,component=coordinator" -o name -n trino)
kubectl exec --stdin --tty $COORDINATOR_POD -n trino -- /bin/bash
cat /etc/trino/config.properties

Output

coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
query.max-memory=40GB
query.max-memory-per-node=4GB
discovery.uri=http://localhost:8080
retry-policy=TASK
exchange.compression-enabled=true
query.low-memory-killer.delay=0s
query.remote-task.max-error-duration=1m
query.hash-partition-count=50
  • Blueprint has also configured exchange manager using an Amazon S3 bucket in exchange-manager.properties file in coordinator and worker pods. Let's also verify that inside coordinator pod
cat /etc/trino/exchange-manager.properties

Output

exchange-manager.name=filesystem
exchange.base-directories=s3://trino-exchange-bucket-20240215180855570800000004
exchange.s3.region=us-west-2
exchange.s3.iam-role=arn:aws:iam::xxxxxxxxxx:role/trino-sa-role

Please note down exchange manager S3 bucket name from above. You can explore contents of above S3 bucket in AWS Console. It will be empty when no query is running.

  • Now, let's exit from bash shell of the coordinator pod
exit

With below steps, we will now test fault-tolerant execution by running a select query and terminate few Trino workers when query is still running.

  • Let's create a trino_select_query_iceberg.sql file in the location where Trino CLI is installed and paste below SQL commands:
with inv as
(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stdev,mean, case mean when 0 then null else stdev/mean end cov
from(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stddev_samp(inv_quantity_on_hand) stdev,avg(inv_quantity_on_hand) mean
from iceberg.iceberg_schema.inventory
,iceberg.iceberg_schema.item
,iceberg.iceberg_schema.warehouse
,iceberg.iceberg_schema.date_dim
where inv_item_sk = i_item_sk
and inv_warehouse_sk = w_warehouse_sk
and inv_date_sk = d_date_sk
and d_year =1999
group by w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy) foo
where case mean when 0 then 0 else stdev/mean end > 1)
select inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean, inv1.cov
,inv2.w_warehouse_sk,inv2.i_item_sk,inv2.d_moy,inv2.mean, inv2.cov
from inv inv1,inv inv2
where inv1.i_item_sk = inv2.i_item_sk
and inv1.w_warehouse_sk = inv2.w_warehouse_sk
and inv1.d_moy=4
and inv2.d_moy=4+1
and inv1.cov > 1.5
order by inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean,inv1.cov,inv2.d_moy,inv2.mean, inv2.cov;
  • Let's now run select query first
export TRINO_UI_DNS=$(kubectl describe ingress --namespace=trino | grep Address: | awk '{ print "http://"$2 }')
./trino --file 'trino_select_query_iceberg.sql' --server ${TRINO_UI_DNS} --user admin --ignore-errors
  • Immediately after above command, when above query is still running, open another terminal and scale down worker pods to just 1 worker, terminating all other workers with command below:
kubectl scale deployment trino-worker -n trino --replicas=1

See Trino Web UI on browser and there is only 1 active worker running now as other workers are terminated:

Terminated workers

  • Go to Amazon S3 console and verify intermediate exchange data spooling in exchange manager S3 bucket with a name starting with trino-exchange-bucket.

Exchange manager

  • Let's now see Trino Web UI monitor again to verify completion of the query despite of 6 failed tasks due to terminated workers (we have encircled them in RED in the screenshot below).
info

Please note, number of failed tasks could be different in your Trino Web UI depending upon how many tasks were running on workers that got terminated.

Also you can see different number of active workers depending upon worker pods scaled by Horizontal Pod Autoscaler(HPA) using cpu utilization metric

Trino query completion

Cleaning Up Iceberg Resources

  1. Let's open Trino CLI
export TRINO_UI_DNS=$(kubectl describe ingress --namespace=trino | grep Address: | awk '{ print "http://"$2 }')
./trino --server ${TRINO_UI_DNS} --user admin
  1. Now, let's delete Iceberg tables and schema by running below SQL commands on Trino CLI:
drop table iceberg.iceberg_schema.warehouse;
drop table iceberg.iceberg_schema.item;
drop table iceberg.iceberg_schema.inventory;
drop table iceberg.iceberg_schema.date_dim;
drop schema iceberg.iceberg_schema;
  1. Exit from Trino CLI with exit command.

Cleanup 🧹

To delete all the components provisioned as part of this blueprint, using the following command to destroy all the resources.

cd data-on-eks/distributed-databases/trino
./cleanup.sh
caution

To avoid unwanted charges to your AWS account, delete all the AWS resources created during this deployment

ex. S3 buckets for Trino Exchange manager

Conclusion

Trino is a tool for fast querying vast amounts of data from your data sources.In this example, we shared a terraform based blueprint that deploys Trino with fault-tolerant configuration on Amazon EKS, with add-ons necessary to build a complete EKS cluster (i.e. Karpenter for node autoscaling, Metrics server and HPA for Trino worker pods autoscaling, monitoring with Prometheus/Grafana stack). Among many features, we highlighted a couple of examples on creating an Iceberg or Hive data store using Amazon S3 as storage, and running simple Trino queries for results. We also deployed and scaled Trino workers on Spot instances for cost optimization. We also demonstrated fault-tolerant feature of Trino, which makes it suitable for Spot instances to save costs for long-running batch queries.