Coverage for cli / nodepools.py: 96%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2NodePool management utilities for GCO CLI. 

3 

4Provides functionality to create and manage Karpenter NodePools with 

5support for On-Demand Capacity Reservations (ODCRs) and Capacity Blocks. 

6 

7Key Features: 

8- Generate NodePool manifests for ODCR-backed capacity 

9- List and describe NodePools in EKS clusters 

10- Support for fallback to on-demand when ODCR is exhausted 

11 

12See: https://karpenter.sh/docs/tasks/odcrs/ 

13""" 

14 

15import base64 

16import logging 

17from dataclasses import dataclass 

18from typing import Any 

19 

20import boto3 

21import yaml 

22from kubernetes.client import CustomObjectsApi 

23 

24logger = logging.getLogger(__name__) 

25 

26# Default vCPU count when instance type lookup fails (conservative estimate) 

27DEFAULT_VCPUS_PER_NODE = 96 

28 

29 

30def get_vcpus_for_instance_type(instance_type: str, region: str = "us-east-1") -> int: 

31 """ 

32 Get the vCPU count for an instance type from EC2 API. 

33 

34 Args: 

35 instance_type: EC2 instance type (e.g., "p4d.24xlarge") 

36 region: AWS region for API calls 

37 

38 Returns: 

39 Number of vCPUs for the instance type, or DEFAULT_VCPUS_PER_NODE if lookup fails 

40 """ 

41 try: 

42 ec2 = boto3.client("ec2", region_name=region) 

43 response = ec2.describe_instance_types(InstanceTypes=[instance_type]) 

44 if response["InstanceTypes"]: 

45 return int(response["InstanceTypes"][0]["VCpuInfo"]["DefaultVCpus"]) 

46 except Exception as e: 

47 logger.debug("Failed to get vCPU count for %s: %s", instance_type, e) 

48 

49 return DEFAULT_VCPUS_PER_NODE 

50 

51 

52def calculate_cpu_limit( 

53 instance_types: list[str] | None, max_nodes: int, region: str = "us-east-1" 

54) -> int: 

55 """ 

56 Calculate the CPU limit for a NodePool based on instance types. 

57 

58 If multiple instance types are specified, uses the maximum vCPU count 

59 to ensure the limit can accommodate the largest instances. 

60 

61 Args: 

62 instance_types: List of instance types (None means any) 

63 max_nodes: Maximum number of nodes in the pool 

64 region: AWS region for API calls 

65 

66 Returns: 

67 Total CPU limit (max_nodes * max_vcpus_per_instance) 

68 """ 

69 if not instance_types: 

70 # No specific instance types - use conservative default 

71 return max_nodes * DEFAULT_VCPUS_PER_NODE 

72 

73 # Get vCPU count for each instance type and use the maximum 

74 vcpu_counts = [get_vcpus_for_instance_type(it, region) for it in instance_types] 

75 max_vcpus = max(vcpu_counts) if vcpu_counts else DEFAULT_VCPUS_PER_NODE 

76 

77 return max_nodes * max_vcpus 

78 

79 

80@dataclass 

81class NodePoolInfo: 

82 """Information about a Karpenter NodePool.""" 

83 

84 name: str 

85 capacity_type: str # "on-demand", "spot", "reserved" 

86 instance_types: list[str] 

87 max_nodes: int | None 

88 status: str 

89 node_count: int 

90 capacity_reservation_id: str | None = None 

91 

92 

93def generate_odcr_nodepool_manifest( 

94 name: str, 

95 region: str, 

96 capacity_reservation_id: str, 

97 instance_types: list[str] | None = None, 

98 max_nodes: int = 100, 

99 fallback_on_demand: bool = False, 

100 efa: bool = False, 

101) -> str: 

102 """ 

103 Generate a Karpenter NodePool manifest for ODCR-backed capacity. 

104 

105 Args: 

106 name: Name for the NodePool 

107 region: AWS region 

108 capacity_reservation_id: EC2 Capacity Reservation ID (cr-xxx) or ODCR group ARN 

109 instance_types: List of instance types (if None, uses ODCR's instance type) 

110 max_nodes: Maximum number of nodes 

111 fallback_on_demand: Whether to fall back to on-demand if ODCR exhausted 

112 efa: Whether to enable EFA support (adds EFA taint and labels) 

113 

114 Returns: 

115 YAML manifest string for the NodePool and EC2NodeClass 

116 """ 

117 # Determine capacity types based on fallback setting 

118 capacity_types = ["reserved", "on-demand"] if fallback_on_demand else ["reserved"] 

119 

120 # Build the EC2NodeClass with capacity reservation selector 

121 ec2_node_class = { 

122 "apiVersion": "karpenter.k8s.aws/v1", 

123 "kind": "EC2NodeClass", 

124 "metadata": { 

125 "name": f"{name}-nodeclass", 

126 "labels": { 

127 "app.kubernetes.io/part-of": "gco", 

128 "gco.io/nodepool": name, 

129 }, 

130 }, 

131 "spec": { 

132 "role": "KarpenterNodeRole-gco", 

133 "subnetSelectorTerms": [{"tags": {"karpenter.sh/discovery": f"gco-{region}"}}], 

134 "securityGroupSelectorTerms": [{"tags": {"karpenter.sh/discovery": f"gco-{region}"}}], 

135 "capacityReservationSelectorTerms": [{"id": capacity_reservation_id}], 

136 "tags": { 

137 "Name": f"gco-{name}", 

138 "gco.io/nodepool": name, 

139 "gco.io/capacity-reservation": capacity_reservation_id, 

140 }, 

141 }, 

142 } 

143 

144 # Build requirements list 

145 requirements: list[dict[str, Any]] = [ 

146 { 

147 "key": "karpenter.sh/capacity-type", 

148 "operator": "In", 

149 "values": capacity_types, 

150 }, 

151 { 

152 "key": "kubernetes.io/arch", 

153 "operator": "In", 

154 "values": ["amd64"], 

155 }, 

156 ] 

157 

158 # Build the NodePool 

159 nodepool: dict[str, Any] = { 

160 "apiVersion": "karpenter.sh/v1", 

161 "kind": "NodePool", 

162 "metadata": { 

163 "name": name, 

164 "labels": { 

165 "app.kubernetes.io/part-of": "gco", 

166 }, 

167 }, 

168 "spec": { 

169 "template": { 

170 "metadata": { 

171 "labels": { 

172 "workload-type": "reserved-capacity", 

173 "project": "gco", 

174 "gco.io/capacity-reservation": capacity_reservation_id, 

175 }, 

176 }, 

177 "spec": { 

178 "nodeClassRef": { 

179 "group": "karpenter.k8s.aws", 

180 "kind": "EC2NodeClass", 

181 "name": f"{name}-nodeclass", 

182 }, 

183 "requirements": requirements, 

184 }, 

185 }, 

186 "limits": { 

187 "cpu": str(calculate_cpu_limit(instance_types, max_nodes, region)), 

188 }, 

189 "disruption": { 

190 "consolidationPolicy": "WhenEmptyOrUnderutilized", 

191 "consolidateAfter": "30s", 

192 "budgets": [{"nodes": "10%"}], 

193 }, 

194 }, 

195 } 

196 

197 # Add instance type requirements if specified 

198 if instance_types: 

199 requirements.append( 

200 { 

201 "key": "node.kubernetes.io/instance-type", 

202 "operator": "In", 

203 "values": instance_types, 

204 } 

205 ) 

206 

207 # Add GPU taints for GPU instances 

208 gpu_families = ["p3", "p4", "p5", "g4", "g5", "g6"] 

209 if instance_types and any( 

210 any(it.startswith(fam) for fam in gpu_families) for it in instance_types 

211 ): 

212 taints = [ 

213 { 

214 "key": "nvidia.com/gpu", 

215 "value": "true", 

216 "effect": "NoSchedule", 

217 } 

218 ] 

219 if efa: 

220 taints.append( 

221 { 

222 "key": "vpc.amazonaws.com/efa", 

223 "value": "true", 

224 "effect": "NoSchedule", 

225 } 

226 ) 

227 nodepool["spec"]["template"]["spec"]["taints"] = taints 

228 

229 # Add EFA labels 

230 if efa: 

231 nodepool["spec"]["template"]["metadata"]["labels"]["efa"] = "true" 

232 nodepool["spec"]["template"]["metadata"]["labels"]["workload-type"] = "gpu-efa" 

233 # Use WhenEmpty consolidation for EFA workloads to avoid disrupting 

234 # long-running distributed training jobs 

235 nodepool["spec"]["disruption"] = { 

236 "consolidationPolicy": "WhenEmpty", 

237 "consolidateAfter": "300s", 

238 "budgets": [{"nodes": "10%"}], 

239 } 

240 

241 # Generate YAML output 

242 output = [] 

243 output.append("# ODCR-backed NodePool for GCO") 

244 output.append(f"# Capacity Reservation: {capacity_reservation_id}") 

245 output.append(f"# Region: {region}") 

246 if fallback_on_demand: 

247 output.append("# Fallback: on-demand (when ODCR exhausted)") 

248 output.append("#") 

249 output.append("# Apply with: kubectl apply -f <this-file>.yaml") 

250 output.append("# See: https://karpenter.sh/docs/tasks/odcrs/") 

251 output.append("---") 

252 output.append(yaml.dump(ec2_node_class, default_flow_style=False, sort_keys=False)) 

253 output.append("---") 

254 output.append(yaml.dump(nodepool, default_flow_style=False, sort_keys=False)) 

255 

256 return "\n".join(output) 

257 

258 

259def get_eks_token(cluster_name: str, region: str) -> str: 

260 """Generate EKS authentication token using STS presigned URL.""" 

261 from botocore.signers import RequestSigner 

262 

263 session = boto3.Session() 

264 sts_client = session.client("sts", region_name=region) 

265 service_id = sts_client.meta.service_model.service_id 

266 

267 signer = RequestSigner( 

268 service_id, region, "sts", "v4", session.get_credentials(), session.events 

269 ) 

270 

271 params = { 

272 "method": "GET", 

273 "url": f"https://sts.{region}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15", 

274 "body": {}, 

275 "headers": {"x-k8s-aws-id": cluster_name}, 

276 "context": {}, 

277 } 

278 

279 url = signer.generate_presigned_url( 

280 params, region_name=region, expires_in=60, operation_name="" 

281 ) 

282 

283 token_b64 = base64.urlsafe_b64encode(url.encode("utf-8")).decode("utf-8").rstrip("=") 

284 return f"k8s-aws-v1.{token_b64}" 

285 

286 

287def get_k8s_client(cluster_name: str, region: str) -> CustomObjectsApi: 

288 """Get configured Kubernetes client for EKS cluster.""" 

289 from kubernetes import client 

290 

291 eks = boto3.client("eks", region_name=region) 

292 cluster_info = eks.describe_cluster(name=cluster_name) 

293 cluster = cluster_info["cluster"] 

294 

295 configuration = client.Configuration() 

296 configuration.host = cluster["endpoint"] 

297 configuration.verify_ssl = True 

298 

299 # Decode CA certificate using secure tempfile method 

300 ca_cert = base64.b64decode(cluster["certificateAuthority"]["data"]) 

301 import os 

302 import tempfile 

303 

304 fd, ca_cert_path = tempfile.mkstemp(suffix=".crt") 

305 try: 

306 with os.fdopen(fd, "wb") as ca_file: 

307 ca_file.write(ca_cert) 

308 ca_file.flush() 

309 configuration.ssl_ca_cert = ca_cert_path 

310 except Exception: 

311 os.close(fd) 

312 raise 

313 

314 # Generate EKS token 

315 eks_token = get_eks_token(cluster_name, region) 

316 configuration.api_key = {"authorization": f"Bearer {eks_token}"} 

317 

318 # Create API client with the configuration explicitly 

319 api_client = client.ApiClient(configuration) 

320 return client.CustomObjectsApi(api_client) 

321 

322 

323def list_cluster_nodepools(cluster_name: str, region: str) -> list[dict[str, Any]]: 

324 """ 

325 List NodePools in an EKS cluster. 

326 

327 Args: 

328 cluster_name: EKS cluster name 

329 region: AWS region 

330 

331 Returns: 

332 List of NodePool information dictionaries 

333 """ 

334 try: 

335 custom_api = get_k8s_client(cluster_name, region) 

336 

337 nodepools = custom_api.list_cluster_custom_object( 

338 group="karpenter.sh", 

339 version="v1", 

340 plural="nodepools", 

341 ) 

342 

343 result = [] 

344 for np in nodepools.get("items", []): 

345 spec = np.get("spec", {}) 

346 template = spec.get("template", {}).get("spec", {}) 

347 requirements = template.get("requirements", []) 

348 

349 # Extract capacity types 

350 capacity_types = [] 

351 instance_types = [] 

352 for req in requirements: 

353 if req.get("key") == "karpenter.sh/capacity-type": 

354 capacity_types = req.get("values", []) 

355 elif req.get("key") == "node.kubernetes.io/instance-type": 355 ↛ 352line 355 didn't jump to line 352 because the condition on line 355 was always true

356 instance_types = req.get("values", []) 

357 

358 # Get status 

359 status = np.get("status", {}) 

360 conditions = status.get("conditions", []) 

361 ready_condition: dict[str, Any] = next( 

362 (c for c in conditions if c.get("type") == "Ready"), {} 

363 ) 

364 

365 result.append( 

366 { 

367 "name": np["metadata"]["name"], 

368 "capacity_types": ", ".join(capacity_types) or "on-demand", 

369 "instance_types": ", ".join(instance_types[:3]) 

370 + ("..." if len(instance_types) > 3 else "") 

371 or "any", 

372 "status": "Ready" if ready_condition.get("status") == "True" else "NotReady", 

373 "limits": spec.get("limits", {}), 

374 } 

375 ) 

376 

377 return result 

378 

379 except Exception as e: 

380 raise RuntimeError(f"Failed to list NodePools: {e}") from e 

381 

382 

383def describe_cluster_nodepool( 

384 cluster_name: str, region: str, nodepool_name: str 

385) -> dict[str, Any] | None: 

386 """ 

387 Describe a specific NodePool in an EKS cluster. 

388 

389 Args: 

390 cluster_name: EKS cluster name 

391 region: AWS region 

392 nodepool_name: Name of the NodePool 

393 

394 Returns: 

395 NodePool details or None if not found 

396 """ 

397 try: 

398 custom_api = get_k8s_client(cluster_name, region) 

399 

400 nodepool = custom_api.get_cluster_custom_object( 

401 group="karpenter.sh", 

402 version="v1", 

403 plural="nodepools", 

404 name=nodepool_name, 

405 ) 

406 

407 if isinstance(nodepool, dict): 407 ↛ 409line 407 didn't jump to line 409 because the condition on line 407 was always true

408 return nodepool 

409 return None 

410 

411 except Exception as e: 

412 if "404" in str(e): 

413 return None 

414 raise RuntimeError(f"Failed to describe NodePool: {e}") from e