Coverage for cli / nodepools.py: 96%
126 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2NodePool management utilities for GCO CLI.
4Provides functionality to create and manage Karpenter NodePools with
5support for On-Demand Capacity Reservations (ODCRs) and Capacity Blocks.
7Key Features:
8- Generate NodePool manifests for ODCR-backed capacity
9- List and describe NodePools in EKS clusters
10- Support for fallback to on-demand when ODCR is exhausted
12See: https://karpenter.sh/docs/tasks/odcrs/
13"""
15import base64
16import logging
17from dataclasses import dataclass
18from typing import Any
20import boto3
21import yaml
22from kubernetes.client import CustomObjectsApi
24logger = logging.getLogger(__name__)
26# Default vCPU count when instance type lookup fails (conservative estimate)
27DEFAULT_VCPUS_PER_NODE = 96
30def get_vcpus_for_instance_type(instance_type: str, region: str = "us-east-1") -> int:
31 """
32 Get the vCPU count for an instance type from EC2 API.
34 Args:
35 instance_type: EC2 instance type (e.g., "p4d.24xlarge")
36 region: AWS region for API calls
38 Returns:
39 Number of vCPUs for the instance type, or DEFAULT_VCPUS_PER_NODE if lookup fails
40 """
41 try:
42 ec2 = boto3.client("ec2", region_name=region)
43 response = ec2.describe_instance_types(InstanceTypes=[instance_type])
44 if response["InstanceTypes"]:
45 return int(response["InstanceTypes"][0]["VCpuInfo"]["DefaultVCpus"])
46 except Exception as e:
47 logger.debug("Failed to get vCPU count for %s: %s", instance_type, e)
49 return DEFAULT_VCPUS_PER_NODE
52def calculate_cpu_limit(
53 instance_types: list[str] | None, max_nodes: int, region: str = "us-east-1"
54) -> int:
55 """
56 Calculate the CPU limit for a NodePool based on instance types.
58 If multiple instance types are specified, uses the maximum vCPU count
59 to ensure the limit can accommodate the largest instances.
61 Args:
62 instance_types: List of instance types (None means any)
63 max_nodes: Maximum number of nodes in the pool
64 region: AWS region for API calls
66 Returns:
67 Total CPU limit (max_nodes * max_vcpus_per_instance)
68 """
69 if not instance_types:
70 # No specific instance types - use conservative default
71 return max_nodes * DEFAULT_VCPUS_PER_NODE
73 # Get vCPU count for each instance type and use the maximum
74 vcpu_counts = [get_vcpus_for_instance_type(it, region) for it in instance_types]
75 max_vcpus = max(vcpu_counts) if vcpu_counts else DEFAULT_VCPUS_PER_NODE
77 return max_nodes * max_vcpus
80@dataclass
81class NodePoolInfo:
82 """Information about a Karpenter NodePool."""
84 name: str
85 capacity_type: str # "on-demand", "spot", "reserved"
86 instance_types: list[str]
87 max_nodes: int | None
88 status: str
89 node_count: int
90 capacity_reservation_id: str | None = None
93def generate_odcr_nodepool_manifest(
94 name: str,
95 region: str,
96 capacity_reservation_id: str,
97 instance_types: list[str] | None = None,
98 max_nodes: int = 100,
99 fallback_on_demand: bool = False,
100 efa: bool = False,
101) -> str:
102 """
103 Generate a Karpenter NodePool manifest for ODCR-backed capacity.
105 Args:
106 name: Name for the NodePool
107 region: AWS region
108 capacity_reservation_id: EC2 Capacity Reservation ID (cr-xxx) or ODCR group ARN
109 instance_types: List of instance types (if None, uses ODCR's instance type)
110 max_nodes: Maximum number of nodes
111 fallback_on_demand: Whether to fall back to on-demand if ODCR exhausted
112 efa: Whether to enable EFA support (adds EFA taint and labels)
114 Returns:
115 YAML manifest string for the NodePool and EC2NodeClass
116 """
117 # Determine capacity types based on fallback setting
118 capacity_types = ["reserved", "on-demand"] if fallback_on_demand else ["reserved"]
120 # Build the EC2NodeClass with capacity reservation selector
121 ec2_node_class = {
122 "apiVersion": "karpenter.k8s.aws/v1",
123 "kind": "EC2NodeClass",
124 "metadata": {
125 "name": f"{name}-nodeclass",
126 "labels": {
127 "app.kubernetes.io/part-of": "gco",
128 "gco.io/nodepool": name,
129 },
130 },
131 "spec": {
132 "role": "KarpenterNodeRole-gco",
133 "subnetSelectorTerms": [{"tags": {"karpenter.sh/discovery": f"gco-{region}"}}],
134 "securityGroupSelectorTerms": [{"tags": {"karpenter.sh/discovery": f"gco-{region}"}}],
135 "capacityReservationSelectorTerms": [{"id": capacity_reservation_id}],
136 "tags": {
137 "Name": f"gco-{name}",
138 "gco.io/nodepool": name,
139 "gco.io/capacity-reservation": capacity_reservation_id,
140 },
141 },
142 }
144 # Build requirements list
145 requirements: list[dict[str, Any]] = [
146 {
147 "key": "karpenter.sh/capacity-type",
148 "operator": "In",
149 "values": capacity_types,
150 },
151 {
152 "key": "kubernetes.io/arch",
153 "operator": "In",
154 "values": ["amd64"],
155 },
156 ]
158 # Build the NodePool
159 nodepool: dict[str, Any] = {
160 "apiVersion": "karpenter.sh/v1",
161 "kind": "NodePool",
162 "metadata": {
163 "name": name,
164 "labels": {
165 "app.kubernetes.io/part-of": "gco",
166 },
167 },
168 "spec": {
169 "template": {
170 "metadata": {
171 "labels": {
172 "workload-type": "reserved-capacity",
173 "project": "gco",
174 "gco.io/capacity-reservation": capacity_reservation_id,
175 },
176 },
177 "spec": {
178 "nodeClassRef": {
179 "group": "karpenter.k8s.aws",
180 "kind": "EC2NodeClass",
181 "name": f"{name}-nodeclass",
182 },
183 "requirements": requirements,
184 },
185 },
186 "limits": {
187 "cpu": str(calculate_cpu_limit(instance_types, max_nodes, region)),
188 },
189 "disruption": {
190 "consolidationPolicy": "WhenEmptyOrUnderutilized",
191 "consolidateAfter": "30s",
192 "budgets": [{"nodes": "10%"}],
193 },
194 },
195 }
197 # Add instance type requirements if specified
198 if instance_types:
199 requirements.append(
200 {
201 "key": "node.kubernetes.io/instance-type",
202 "operator": "In",
203 "values": instance_types,
204 }
205 )
207 # Add GPU taints for GPU instances
208 gpu_families = ["p3", "p4", "p5", "g4", "g5", "g6"]
209 if instance_types and any(
210 any(it.startswith(fam) for fam in gpu_families) for it in instance_types
211 ):
212 taints = [
213 {
214 "key": "nvidia.com/gpu",
215 "value": "true",
216 "effect": "NoSchedule",
217 }
218 ]
219 if efa:
220 taints.append(
221 {
222 "key": "vpc.amazonaws.com/efa",
223 "value": "true",
224 "effect": "NoSchedule",
225 }
226 )
227 nodepool["spec"]["template"]["spec"]["taints"] = taints
229 # Add EFA labels
230 if efa:
231 nodepool["spec"]["template"]["metadata"]["labels"]["efa"] = "true"
232 nodepool["spec"]["template"]["metadata"]["labels"]["workload-type"] = "gpu-efa"
233 # Use WhenEmpty consolidation for EFA workloads to avoid disrupting
234 # long-running distributed training jobs
235 nodepool["spec"]["disruption"] = {
236 "consolidationPolicy": "WhenEmpty",
237 "consolidateAfter": "300s",
238 "budgets": [{"nodes": "10%"}],
239 }
241 # Generate YAML output
242 output = []
243 output.append("# ODCR-backed NodePool for GCO")
244 output.append(f"# Capacity Reservation: {capacity_reservation_id}")
245 output.append(f"# Region: {region}")
246 if fallback_on_demand:
247 output.append("# Fallback: on-demand (when ODCR exhausted)")
248 output.append("#")
249 output.append("# Apply with: kubectl apply -f <this-file>.yaml")
250 output.append("# See: https://karpenter.sh/docs/tasks/odcrs/")
251 output.append("---")
252 output.append(yaml.dump(ec2_node_class, default_flow_style=False, sort_keys=False))
253 output.append("---")
254 output.append(yaml.dump(nodepool, default_flow_style=False, sort_keys=False))
256 return "\n".join(output)
259def get_eks_token(cluster_name: str, region: str) -> str:
260 """Generate EKS authentication token using STS presigned URL."""
261 from botocore.signers import RequestSigner
263 session = boto3.Session()
264 sts_client = session.client("sts", region_name=region)
265 service_id = sts_client.meta.service_model.service_id
267 signer = RequestSigner(
268 service_id, region, "sts", "v4", session.get_credentials(), session.events
269 )
271 params = {
272 "method": "GET",
273 "url": f"https://sts.{region}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15",
274 "body": {},
275 "headers": {"x-k8s-aws-id": cluster_name},
276 "context": {},
277 }
279 url = signer.generate_presigned_url(
280 params, region_name=region, expires_in=60, operation_name=""
281 )
283 token_b64 = base64.urlsafe_b64encode(url.encode("utf-8")).decode("utf-8").rstrip("=")
284 return f"k8s-aws-v1.{token_b64}"
287def get_k8s_client(cluster_name: str, region: str) -> CustomObjectsApi:
288 """Get configured Kubernetes client for EKS cluster."""
289 from kubernetes import client
291 eks = boto3.client("eks", region_name=region)
292 cluster_info = eks.describe_cluster(name=cluster_name)
293 cluster = cluster_info["cluster"]
295 configuration = client.Configuration()
296 configuration.host = cluster["endpoint"]
297 configuration.verify_ssl = True
299 # Decode CA certificate using secure tempfile method
300 ca_cert = base64.b64decode(cluster["certificateAuthority"]["data"])
301 import os
302 import tempfile
304 fd, ca_cert_path = tempfile.mkstemp(suffix=".crt")
305 try:
306 with os.fdopen(fd, "wb") as ca_file:
307 ca_file.write(ca_cert)
308 ca_file.flush()
309 configuration.ssl_ca_cert = ca_cert_path
310 except Exception:
311 os.close(fd)
312 raise
314 # Generate EKS token
315 eks_token = get_eks_token(cluster_name, region)
316 configuration.api_key = {"authorization": f"Bearer {eks_token}"}
318 # Create API client with the configuration explicitly
319 api_client = client.ApiClient(configuration)
320 return client.CustomObjectsApi(api_client)
323def list_cluster_nodepools(cluster_name: str, region: str) -> list[dict[str, Any]]:
324 """
325 List NodePools in an EKS cluster.
327 Args:
328 cluster_name: EKS cluster name
329 region: AWS region
331 Returns:
332 List of NodePool information dictionaries
333 """
334 try:
335 custom_api = get_k8s_client(cluster_name, region)
337 nodepools = custom_api.list_cluster_custom_object(
338 group="karpenter.sh",
339 version="v1",
340 plural="nodepools",
341 )
343 result = []
344 for np in nodepools.get("items", []):
345 spec = np.get("spec", {})
346 template = spec.get("template", {}).get("spec", {})
347 requirements = template.get("requirements", [])
349 # Extract capacity types
350 capacity_types = []
351 instance_types = []
352 for req in requirements:
353 if req.get("key") == "karpenter.sh/capacity-type":
354 capacity_types = req.get("values", [])
355 elif req.get("key") == "node.kubernetes.io/instance-type": 355 ↛ 352line 355 didn't jump to line 352 because the condition on line 355 was always true
356 instance_types = req.get("values", [])
358 # Get status
359 status = np.get("status", {})
360 conditions = status.get("conditions", [])
361 ready_condition: dict[str, Any] = next(
362 (c for c in conditions if c.get("type") == "Ready"), {}
363 )
365 result.append(
366 {
367 "name": np["metadata"]["name"],
368 "capacity_types": ", ".join(capacity_types) or "on-demand",
369 "instance_types": ", ".join(instance_types[:3])
370 + ("..." if len(instance_types) > 3 else "")
371 or "any",
372 "status": "Ready" if ready_condition.get("status") == "True" else "NotReady",
373 "limits": spec.get("limits", {}),
374 }
375 )
377 return result
379 except Exception as e:
380 raise RuntimeError(f"Failed to list NodePools: {e}") from e
383def describe_cluster_nodepool(
384 cluster_name: str, region: str, nodepool_name: str
385) -> dict[str, Any] | None:
386 """
387 Describe a specific NodePool in an EKS cluster.
389 Args:
390 cluster_name: EKS cluster name
391 region: AWS region
392 nodepool_name: Name of the NodePool
394 Returns:
395 NodePool details or None if not found
396 """
397 try:
398 custom_api = get_k8s_client(cluster_name, region)
400 nodepool = custom_api.get_cluster_custom_object(
401 group="karpenter.sh",
402 version="v1",
403 plural="nodepools",
404 name=nodepool_name,
405 )
407 if isinstance(nodepool, dict): 407 ↛ 409line 407 didn't jump to line 409 because the condition on line 407 was always true
408 return nodepool
409 return None
411 except Exception as e:
412 if "404" in str(e):
413 return None
414 raise RuntimeError(f"Failed to describe NodePool: {e}") from e