Sample Workload — Hits Dataset
This walkthrough deploys a sharded, replicated ClickHouseCluster (managed
by the operator that deploy.sh already installed), builds a distributed
table on top of it, loads the public hits dataset, runs analytical queries
with EXPLAIN to inspect index usage, and demonstrates replica failover by
deleting a pod.
Prerequisites
- ClickHouse on EKS infrastructure deployed: You must have completed the
Infrastructure Setup Guide. That sets up the EKS cluster,
Karpenter, ArgoCD, and the ClickHouse Kubernetes operator — but no
ClickHouseClusteryet. This guide deploys one. - Local tools:
kubectlconfigured against the EKS cluster (the deploy script writeskubeconfig.yamlinto the stack directory —export KUBECONFIG=$(pwd)/kubeconfig.yaml).aws-cliwith credentials configured.
Architecture
The example manifest at
data-stacks/clickhouse-on-eks/examples/clickhouse-cluster.yaml defines two
custom resources that the operator reconciles into running pods: a
ClickHouseCluster (3 shards × 3 replicas) and a KeeperCluster (3-node
Raft ensemble). Together they form the data plane this walkthrough exercises.
ClickHouseCluster — 3 shards × 3 replicas — m6g.8xlarge · 30 CPU · 110Gi RAM · 500Gi storage
KeeperCluster — 3 nodes, Raft consensus
- ClickHouse Operator: Already installed by
deploy.sh. WatchesClickHouseClusterandKeeperClustercustom resources and reconciles them into pods, services, and PersistentVolumeClaims. - ClickHouseCluster: A sharded and replicated deployment (3 shards × 3 replicas) running on Graviton
m6g.8xlargenodes with 500Gi EBS volumes per replica. Karpenter provisions the underlying EC2 instances on demand. - KeeperCluster: A 3-node ClickHouse Keeper ensemble providing Raft-based coordination for replication, distributed DDL, and leader election.
Deploy the ClickHouse cluster
These steps apply the example manifest and bring up the data plane shown
above. They produce 3 Keeper pods plus 9 ClickHouse pods named
tests-clickhouse-<shard>-<replica>-0.
1. Create the namespace
The example manifest places everything in the clickhouse-tests namespace.
Create it first:
kubectl create namespace clickhouse-tests
2. Create the password secret
The ClickHouseCluster resource references a Kubernetes secret for the
default user's password (see defaultUserPassword.secret in the cluster
manifest). Create it before applying the cluster manifest so the operator
can bootstrap the user when the pods come up.
kubectl create secret generic clickhouse-password \
--namespace clickhouse-tests \
--from-literal=password="$(LC_ALL=C tr -dc 'A-Za-z0-9' < /dev/urandom | head -c 32)"
The tr//dev/urandom pipeline generates a 32-character alphanumeric
password — LC_ALL=C keeps tr from choking on multi-byte locales.
3. Apply the cluster manifest
export CLICKHOUSE_DIR=$(git rev-parse --show-toplevel)/data-stacks/clickhouse-on-eks
kubectl apply -f $CLICKHOUSE_DIR/examples/clickhouse-cluster.yaml
The operator reads the new KeeperCluster and ClickHouseCluster resources
and starts reconciling them. Because the ClickHouseCluster podTemplate
sets a nodeSelector for m6g.8xlarge on-demand instances, Karpenter
provisions matching EC2 nodes from scratch the first time you apply this.
4. Wait for the pods to be Ready
kubectl get pods -n clickhouse-tests -w
Expected steady state — 3 Keeper pods and 9 ClickHouse pods, all Running /
Ready:
NAME READY STATUS RESTARTS AGE
test-keeper-0 1/1 Running 0 5m
test-keeper-1 1/1 Running 0 5m
test-keeper-2 1/1 Running 0 5m
tests-clickhouse-0-0-0 1/1 Running 0 4m
tests-clickhouse-0-1-0 1/1 Running 0 4m
tests-clickhouse-0-2-0 1/1 Running 0 4m
tests-clickhouse-1-0-0 1/1 Running 0 4m
tests-clickhouse-1-1-0 1/1 Running 0 4m
tests-clickhouse-1-2-0 1/1 Running 0 4m
tests-clickhouse-2-0-0 1/1 Running 0 4m
tests-clickhouse-2-1-0 1/1 Running 0 4m
tests-clickhouse-2-2-0 1/1 Running 0 4m
First-time pod scheduling can take ~5–10 minutes while Karpenter provisions
fresh m6g.8xlarge nodes and EBS volumes. Subsequent restarts are much
faster.
Tutorial: Build a sharded, replicated table
This section creates a database, a replicated local table, and a distributed
"router" table on top of the 3 × 3 ClickHouse cluster, then verifies the layout
end-to-end. It uses the public hits.parquet dataset from
clickhouse.com/datasets for the schema.
1. Open a SQL session
Pull the password back out of the secret and exec into any ClickHouse pod to
get an interactive clickhouse-client session. Every pod in the cluster sees
the same logical database, so shard/replica 0-0-0 is fine.
CH_PW=$(kubectl get secret -n clickhouse-tests clickhouse-password \
-o jsonpath='{.data.password}' | base64 -d)
kubectl exec -it -n clickhouse-tests tests-clickhouse-0-0-0 -- \
clickhouse-client --password "$CH_PW"
Run the SQL in the following steps from this session.
2. Create the database on every node
CREATE DATABASE IF NOT EXISTS demo ON CLUSTER 'default';
ON CLUSTER 'default' turns this into a distributed DDL: the coordinator
writes the statement to ClickHouse Keeper, and every node in the default
cluster picks it up and executes it locally. That's why one CREATE DATABASE
materializes the database on all nine pods.
3. Create the replicated local table
CREATE TABLE demo.hits_local ON CLUSTER 'default'
ENGINE = ReplicatedMergeTree
ORDER BY (CounterID, EventDate, UserID, EventTime, WatchID)
EMPTY AS SELECT * FROM s3(
'https://datasets.clickhouse.com/hits_compatible/hits.parquet',
'Parquet'
);
A few things are happening here:
ReplicatedMergeTreeis the actual storage engine. Every replica inside a shard keeps an identical copy of the data, with replication coordinated through Keeper.ORDER BY (...)is both the sort key and the primary key — it defines the on-disk layout and the sparse index ClickHouse uses to skip reading irrelevant granules.EMPTY AS SELECT * FROM s3(...)is a schema-cloning trick. Thes3()table function reads only the Parquet metadata to infer column names and types, andEMPTY ASbuilds a table with that schema while loading zero rows. It's a fast way to copy a schema without the data.
Because of ON CLUSTER, the table is created on all 9 replicas (3 shards × 3
replicas).
4. Create the distributed router table
CREATE TABLE demo.hits ON CLUSTER 'default' AS demo.hits_local
ENGINE = Distributed('default', demo, hits_local, cityHash64(UserID));
demo.hits is a virtual table that stores no data of its own. It's a
proxy/router whose Distributed engine knows:
- which cluster to talk to (
'default'), - which database and underlying table on each shard (
demo.hits_local), - and how to shard on writes (
cityHash64(UserID)).
Reads against demo.hits fan out to every shard, execute against the local
hits_local table, and the coordinator merges the per-shard results. Writes
against demo.hits are routed to a single shard based on the hash of the
UserID — that's how rows get distributed evenly. (Writes against
demo.hits_local directly only land on whichever replica's pod you're
connected to.)
5. Verify the layout
SELECT hostName(), name, engine
FROM clusterAllReplicas('default', system.tables)
WHERE database = 'demo'
ORDER BY hostName(), name;
┌─hostName()─────────────┬─name───────┬─engine──────────────┐
1. │ tests-clickhouse-0-0-0 │ hits │ Distributed │
2. │ tests-clickhouse-0-0-0 │ hits_local │ ReplicatedMergeTree │
3. │ tests-clickhouse-0-1-0 │ hits │ Distributed │
4. │ tests-clickhouse-0-1-0 │ hits_local │ ReplicatedMergeTree │
5. │ tests-clickhouse-0-2-0 │ hits │ Distributed │
6. │ tests-clickhouse-0-2-0 │ hits_local │ ReplicatedMergeTree │
7. │ tests-clickhouse-1-0-0 │ hits │ Distributed │
8. │ tests-clickhouse-1-0-0 │ hits_local │ ReplicatedMergeTree │
9. │ tests-clickhouse-1-1-0 │ hits │ Distributed │
10. │ tests-clickhouse-1-1-0 │ hits_local │ ReplicatedMergeTree │
11. │ tests-clickhouse-1-2-0 │ hits │ Distributed │
12. │ tests-clickhouse-1-2-0 │ hits_local │ ReplicatedMergeTree │
13. │ tests-clickhouse-2-0-0 │ hits │ Distributed │
14. │ tests-clickhouse-2-0-0 │ hits_local │ ReplicatedMergeTree │
15. │ tests-clickhouse-2-1-0 │ hits │ Distributed │
16. │ tests-clickhouse-2-1-0 │ hits_local │ ReplicatedMergeTree │
17. │ tests-clickhouse-2-2-0 │ hits │ Distributed │
18. │ tests-clickhouse-2-2-0 │ hits_local │ ReplicatedMergeTree │
└────────────────────────┴────────────┴─────────────────────┘
clusterAllReplicas('default', system.tables) runs the query on every
replica in the cluster (not just one per shard the way cluster() would).
Each pod reports back its own copy of system.tables, so the result has
3 shards × 3 replicas × 2 tables = 18 rows. The pod naming
tests-clickhouse-<shard>-<replica>-0 is set by the operator and matches the
hostnames returned by hostName().
Recap of what each table is for:
hits_local—ReplicatedMergeTree, holds the actual rows on each replica.hits—Distributed, the cluster-wide query entry point that fans out to thehits_localtables.
6. Inspect the primary key and sort key
SELECT
name,
primary_key,
sorting_key
FROM system.tables
WHERE name = 'hits_local' AND database = 'demo';
┌─name───────┬─primary_key──────────────────────────────────────┬─sorting_key──────────────────────────────────────┐
│ hits_local │ CounterID, EventDate, UserID, EventTime, WatchID │ CounterID, EventDate, UserID, EventTime, WatchID │
└────────────┴──────────────────────────────────────────────────┴──────────────────────────────────────────────────┘
In ClickHouse the primary key defaults to the sorting key (the ORDER BY),
and they're shown here side by side. Note this is not a uniqueness
constraint — it's a sparse index that lets ClickHouse skip whole granules
when a query filters on a prefix of those columns. Choosing the right
ORDER BY is the single biggest performance lever in MergeTree.
7. View the materialized table definition
SHOW CREATE TABLE demo.hits_local;
┌─statement─────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE demo.hits_local │
│( │
│ `WatchID` Int64, │
│ `JavaEnable` Int16, │
│ `Title` String, │
│ `EventTime` Int64, │
│ `EventDate` UInt16, │
│ `ClientIP` Int32, │
│ `RegionID` Int32, │
│ `UserID` Int64, │
│ `CounterClass` Int16, │
│ `OS` Int16, │
│ `UserAgent` Int16, │
│ `URL` String, │
│) │
│ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/demo/hits_local', '{replica}')│
│ORDER BY (CounterID, EventDate, UserID, EventTime, WatchID) │
│SETTINGS index_granularity = 8192 │
└───────────────────────────────────────────────────────────────────────────────────────┘
A few things to notice:
- The columns and types are exactly what
s3()inferred from the Parquet file — no manual schema authored. ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/demo/hits_local', '{replica}')uses the{shard}and{replica}macros. The operator wires these per pod (e.g. shard0, replica1ontests-clickhouse-0-1-0), so all replicas of the same shard share a Keeper path and find each other for replication, while different shards stay isolated.index_granularity = 8192is the default block size for the sparse index — ClickHouse stores one index entry per 8,192 rows.
At this point you have an empty, sharded, replicated table, ready to load data
into via INSERT INTO demo.hits SELECT * FROM s3(...) or the parallel
load.sh script in data-stacks/clickhouse-on-eks/examples/.
Query performance: choosing the right sort key
The remaining exercises assume the demo.hits table has been populated — for
example, by running data-stacks/clickhouse-on-eks/examples/load.sh, which
ships data from a local Parquet file in parallel into each shard. Once it's
loaded, the next two sections show how the sort key you picked in
Step 3 drives query performance.
Recall the sort key:
ORDER BY (CounterID, EventDate, UserID, EventTime, WatchID)
Data on disk is physically sorted in that order, and ClickHouse builds a
sparse primary-key index with one entry per 8,192-row granule (the
index_granularity setting). Filters that line up with this layout get to
skip granules instead of scanning them.
Pick interesting filter values
Find the most popular EventDate and CounterID so you have realistic values
to filter on:
SELECT EventDate, count() AS hits
FROM demo.hits
GROUP BY EventDate
ORDER BY hits DESC
LIMIT 1;
SELECT CounterID, count() AS hits
FROM demo.hits
GROUP BY CounterID
ORDER BY hits DESC
LIMIT 1;
For the dataset used here, CounterID = 3922 and EventDate = 15907 are good
picks.
Fast: filter on the leading column of the sort key
CounterID is the leftmost column of ORDER BY, so the sparse index can
do a binary search directly on it.
EXPLAIN indexes = 1
SELECT
count(),
uniq(UserID) AS unique_users
FROM demo.hits
WHERE CounterID = '3922';
┌─explain────────────────────────────────────────────────────────────────────┐
1. │ Expression ((Project names + Projection)) │
2. │ MergingAggregated │
3. │ Union │
4. │ Aggregating │
5. │ Expression (Before GROUP BY) │
6. │ Expression ((WHERE + Change column names to column identifiers)) │
7. │ ReadFromMergeTree (demo.hits_local) │
8. │ Indexes: │
9. │ PrimaryKey │
10. │ Keys: │
11. │ CounterID │
12. │ Condition: (CounterID in [3922, 3922]) │
13. │ Parts: 8/8 │
14. │ Granules: 354/4109 │
15. │ Search Algorithm: binary search │
16. │ Ranges: 8 │
17. │ ReadFromRemote (Read from remote replica) │
└────────────────────────────────────────────────────────────────────────────┘
Two things to notice in the plan:
Search Algorithm: binary search— the engine can pinpoint the granules holdingCounterID = 3922directly in the sparse index.Granules: 354/4109— only ~9% of the table's granules need to be read. The other ~91% are skipped entirely.
Slow: filter on a non-leading column
EventDate is the second column in the sort key. Within each CounterID,
rows are sorted by EventDate, but across the table as a whole, EventDate
values are interleaved between many CounterID ranges.
EXPLAIN indexes = 1
SELECT
count(),
uniq(UserID) AS unique_users
FROM demo.hits
WHERE EventDate = '15907';
┌─explain────────────────────────────────────────────────────────────────┐
1. │ Expression ((Project names + Projection)) │
2. │ MergingAggregated │
3. │ Union │
4. │ AggregatingProjection │
5. │ Expression (Before GROUP BY) │
6. │ Filter ((WHERE + Change column names to column identifiers)) │
7. │ ReadFromMergeTree (demo.hits_local) │
8. │ Indexes: │
9. │ PrimaryKey │
10. │ Keys: │
11. │ EventDate │
12. │ Condition: (EventDate in [15907, 15907]) │
13. │ Parts: 8/8 │
14. │ Granules: 1542/4109 │
15. │ Search Algorithm: generic exclusion search │
16. │ Ranges: 446 │
17. │ ReadFromPreparedSource (_exact_count_projection) │
18. │ ReadFromRemote (Read from remote replica) │
└────────────────────────────────────────────────────────────────────────┘
Compare with the previous plan:
Search Algorithm: generic exclusion search— the engine can't binary search; it walks the granules and skips ranges where the per-granule min/max stats proveEventDate = 15907cannot exist.Granules: 1542/4109— ~37% of granules read. Roughly 4× more work than theCounterIDfilter.
Putting the row counts side by side:
CounterID = '3922': 8.68 million rows, 121.31 MB (3.87 GB/sec)
EventDate = '15907': 31.16 million rows, 345.54 MB (2.89 GB/sec)
The data is identical — only the filter column changed. The takeaway:
The order of columns in
ORDER BYis the single biggest performance lever in MergeTree. Put the columns you most commonly filter on first, in descending order of filter selectivity. Secondary filter patterns can be accelerated with projections or skip indexes — note the_exact_count_projectionstep that already appeared in the slow plan.
Resilience: surviving a replica failure
The cluster runs 3 replicas per shard. With ReplicatedMergeTree, every
replica holds an identical copy of the data, kept in sync via Keeper. The
Distributed table in front of them only needs one healthy replica per
shard to answer a query, so we can lose pods without downtime.
See where the data lives on each replica
-- Diagnostic: ask every replica directly. With skip_unavailable_shards = 1
-- the query still completes if a replica is down.
SELECT
hostName() AS pod,
getMacro('shard') AS shard,
getMacro('replica') AS replica,
count()
FROM clusterAllReplicas('default', demo.hits_local)
WHERE CounterID = 62
GROUP BY 1, 2, 3
ORDER BY 2, 3
SETTINGS skip_unavailable_shards = 1;
This bypasses the Distributed table and queries each hits_local replica
directly. You'll see 9 rows (3 shards × 3 replicas) — and the per-shard
counts will be identical across replicas, since replicas hold the same data.
The getMacro('shard') / getMacro('replica') functions read the per-pod
macros that the operator injects into each ClickHouse config, which is also
how ReplicatedMergeTree('/clickhouse/tables/{shard}/...', '{replica}')
resolves at startup.
Make replica selection deterministic
By default the Distributed engine load-balances across replicas. For
demonstration purposes, force it to always pick the first reachable replica so
you can see the failover happen:
SET load_balancing = 'in_order';
SELECT
hostName() AS pod,
getMacro('shard') AS shard,
getMacro('replica') AS replica,
count() AS rows
FROM demo.hits
WHERE CounterID = 62
GROUP BY pod, shard, replica
ORDER BY shard ASC;
This time the query goes through demo.hits (the Distributed table), so it
picks one replica per shard — three rows, typically
tests-clickhouse-{0,1,2}-0-0. Note the response time (usually ~200 ms).
Kill a pod and re-run
Open a second terminal and delete the replica that just served shard 2:
kubectl delete pod tests-clickhouse-2-0-0 -n clickhouse-tests
Re-run the same SELECT ... FROM demo.hits query. Two things should happen:
-
The query still returns in ~200 ms — the
Distributedtable fails over to the next replica in order, e.g.tests-clickhouse-2-1-0. Thepodcolumn in the result confirms that shard 2 is now being served by a different replica. -
In the other terminal, the operator's
StatefulSetis already recreating the deleted pod. Watch it:kubectl get pods -n clickhouse-tests -w
Watch the replacement replica catch up
Once the pod is back, it has to replay any writes it missed via Keeper. You
can see this catch-up in system.replicas:
SELECT
replica_name,
absolute_delay,
queue_size
FROM clusterAllReplicas('default', system.replicas)
WHERE table = 'hits_local';
queue_sizeis the number of replication tasks still pending.absolute_delayis the lag (in seconds) behind the leader.
For a freshly restarted replica with no writes happening, both should drop to zero quickly. While a heavy write workload is in flight, you'd see them shrink as the replica catches up.
Cleanup
This walkthrough deployed a ClickHouseCluster, a KeeperCluster, a
namespace, and a Kubernetes secret on top of the existing platform — but no
new AWS resources directly (Karpenter-provisioned nodes get torn down
automatically once the pods are gone). To remove everything this guide
created:
kubectl delete -f $CLICKHOUSE_DIR/examples/clickhouse-cluster.yaml
kubectl delete secret clickhouse-password -n clickhouse-tests
kubectl delete namespace clickhouse-tests
If you are completely finished with ClickHouse, you can destroy all the infrastructure (EKS cluster, operator, Karpenter, ArgoCD, etc.) by following the Cleanup section of the infrastructure guide.