Coverage for cli/capacity/multi_region.py: 89%

185 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Multi-region capacity checking and weighted scoring.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import statistics 

7from dataclasses import dataclass 

8from datetime import UTC, datetime, timedelta 

9from typing import Any 

10 

11import boto3 

12from botocore.exceptions import ClientError 

13 

14from cli.config import GCOConfig, get_config 

15 

16from .checker import CapacityChecker 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21@dataclass 

22class RegionCapacity: 

23 """Capacity information for a region.""" 

24 

25 region: str 

26 queue_depth: int = 0 

27 pending_jobs: int = 0 

28 running_jobs: int = 0 

29 gpu_utilization: float = 0.0 

30 cpu_utilization: float = 0.0 

31 available_gpus: int = 0 

32 total_gpus: int = 0 

33 avg_wait_time_seconds: int = 0 

34 recommendation_score: float = 0.0 

35 

36 

37class MultiRegionCapacityChecker: 

38 """ 

39 Checks capacity across multiple GCO regions. 

40 

41 Provides: 

42 - Multi-region capacity overview 

43 - Intelligent region recommendation 

44 - Queue depth analysis 

45 - Resource utilization metrics 

46 """ 

47 

48 def __init__(self, config: GCOConfig | None = None): 

49 self.config = config or get_config() 

50 self._session = boto3.Session() 

51 

52 def get_region_capacity(self, region: str) -> RegionCapacity: 

53 """Get capacity information for a single region.""" 

54 from cli.aws_client import get_aws_client 

55 

56 aws_client = get_aws_client(self.config) 

57 stack = aws_client.get_regional_stack(region) 

58 

59 if not stack: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 return RegionCapacity(region=region) 

61 

62 capacity = RegionCapacity(region=region) 

63 

64 # Get queue depth from SQS 

65 try: 

66 cfn = self._session.client("cloudformation", region_name=region) 

67 response = cfn.describe_stacks(StackName=stack.stack_name) 

68 outputs = { 

69 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", []) 

70 } 

71 

72 queue_url = outputs.get("JobQueueUrl") 

73 if queue_url: 73 ↛ 90line 73 didn't jump to line 90 because the condition on line 73 was always true

74 sqs = self._session.client("sqs", region_name=region) 

75 attrs = sqs.get_queue_attributes( 

76 QueueUrl=queue_url, 

77 AttributeNames=[ 

78 "ApproximateNumberOfMessages", 

79 "ApproximateNumberOfMessagesNotVisible", 

80 ], 

81 )["Attributes"] 

82 capacity.queue_depth = int(attrs.get("ApproximateNumberOfMessages", 0)) 

83 capacity.running_jobs = int(attrs.get("ApproximateNumberOfMessagesNotVisible", 0)) 

84 except ClientError as e: 

85 logger.debug("Failed to get queue metrics for %s: %s", region, e) 

86 except Exception as e: 

87 logger.warning("Unexpected error getting queue metrics for %s: %s", region, e) 

88 

89 # Get cluster metrics from CloudWatch (if available) 

90 try: 

91 cloudwatch = self._session.client("cloudwatch", region_name=region) 

92 

93 # Get GPU utilization from Container Insights 

94 response = cloudwatch.get_metric_statistics( 

95 Namespace="ContainerInsights", 

96 MetricName="node_gpu_utilization", 

97 Dimensions=[{"Name": "ClusterName", "Value": stack.cluster_name}], 

98 StartTime=datetime.now(UTC) - timedelta(minutes=5), 

99 EndTime=datetime.now(UTC), 

100 Period=300, 

101 Statistics=["Average"], 

102 ) 

103 if response["Datapoints"]: 

104 capacity.gpu_utilization = response["Datapoints"][0]["Average"] 

105 

106 # Get CPU utilization 

107 response = cloudwatch.get_metric_statistics( 

108 Namespace="ContainerInsights", 

109 MetricName="node_cpu_utilization", 

110 Dimensions=[{"Name": "ClusterName", "Value": stack.cluster_name}], 

111 StartTime=datetime.now(UTC) - timedelta(minutes=5), 

112 EndTime=datetime.now(UTC), 

113 Period=300, 

114 Statistics=["Average"], 

115 ) 

116 if response["Datapoints"]: 

117 capacity.cpu_utilization = response["Datapoints"][0]["Average"] 

118 

119 except ClientError as e: 

120 logger.debug("Failed to get CloudWatch metrics for %s: %s", region, e) 

121 except Exception as e: 

122 logger.warning("Unexpected error getting CloudWatch metrics for %s: %s", region, e) 

123 

124 # Calculate recommendation score (lower is better) 

125 # Factors: queue depth, GPU utilization, running jobs 

126 capacity.recommendation_score = ( 

127 capacity.queue_depth * 10 + capacity.gpu_utilization + capacity.running_jobs * 5 

128 ) 

129 

130 return capacity 

131 

132 def get_all_regions_capacity(self) -> list[RegionCapacity]: 

133 """Get capacity information for all deployed regions.""" 

134 from cli.aws_client import get_aws_client 

135 

136 aws_client = get_aws_client(self.config) 

137 stacks = aws_client.discover_regional_stacks() 

138 

139 capacities = [] 

140 for region in stacks: 

141 try: 

142 capacity = self.get_region_capacity(region) 

143 capacities.append(capacity) 

144 except Exception as e: 

145 logger.warning("Failed to get capacity for region %s: %s", region, e) 

146 continue 

147 

148 return capacities 

149 

150 def recommend_region_for_job( 

151 self, 

152 gpu_required: bool = False, 

153 min_gpus: int = 0, 

154 instance_type: str | None = None, 

155 gpu_count: int = 0, 

156 ) -> dict[str, Any]: 

157 """ 

158 Recommend the optimal region for job placement. 

159 

160 When instance_type is provided, uses weighted multi-signal scoring that 

161 combines spot placement scores, pricing, queue depth, GPU utilization, 

162 and running job counts. Falls back to simple scoring when instance_type 

163 is not specified. 

164 

165 Args: 

166 gpu_required: Whether the job requires GPUs 

167 min_gpus: Minimum number of GPUs required 

168 instance_type: Specific instance type for workload-aware scoring 

169 gpu_count: Number of GPUs required 

170 

171 Returns: 

172 Dictionary with recommended region and justification 

173 """ 

174 capacities = self.get_all_regions_capacity() 

175 

176 if not capacities: 

177 return { 

178 "region": self.config.default_region, 

179 "reason": "No capacity data available, using default region", 

180 "score": 0, 

181 } 

182 

183 # When instance_type is provided, use weighted scoring with capacity data 

184 if instance_type: 

185 return self._weighted_recommend(capacities, instance_type, gpu_count or min_gpus) 

186 

187 # Fallback: simple scoring (existing behavior) 

188 return self._simple_recommend(capacities) 

189 

190 def _simple_recommend(self, capacities: list[RegionCapacity]) -> dict[str, Any]: 

191 """Simple recommendation using the existing composite score.""" 

192 sorted_capacities = sorted(capacities, key=lambda x: x.recommendation_score) 

193 best = sorted_capacities[0] 

194 

195 reasons = [] 

196 if best.queue_depth == 0: 

197 reasons.append("empty queue") 

198 elif best.queue_depth < 5: 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true

199 reasons.append(f"low queue depth ({best.queue_depth})") 

200 

201 if best.gpu_utilization < 50: 

202 reasons.append(f"{100 - best.gpu_utilization:.0f}% GPU available") 

203 elif best.gpu_utilization < 80: 203 ↛ 206line 203 didn't jump to line 206 because the condition on line 203 was always true

204 reasons.append(f"moderate GPU utilization ({best.gpu_utilization:.0f}%)") 

205 

206 if best.running_jobs == 0: 

207 reasons.append("no running jobs") 

208 elif best.running_jobs < 5: 208 ↛ 211line 208 didn't jump to line 211 because the condition on line 208 was always true

209 reasons.append(f"few running jobs ({best.running_jobs})") 

210 

211 reason = ", ".join(reasons) if reasons else "best overall capacity" 

212 

213 return { 

214 "region": best.region, 

215 "reason": reason, 

216 "score": best.recommendation_score, 

217 "queue_depth": best.queue_depth, 

218 "gpu_utilization": best.gpu_utilization, 

219 "running_jobs": best.running_jobs, 

220 "all_regions": [ 

221 { 

222 "region": c.region, 

223 "score": c.recommendation_score, 

224 "queue_depth": c.queue_depth, 

225 "gpu_utilization": c.gpu_utilization, 

226 } 

227 for c in sorted_capacities 

228 ], 

229 } 

230 

231 def _weighted_recommend( 

232 self, 

233 capacities: list[RegionCapacity], 

234 instance_type: str, 

235 gpu_count: int = 0, 

236 ) -> dict[str, Any]: 

237 """ 

238 Workload-aware recommendation using weighted multi-signal scoring. 

239 

240 Gathers per-region capacity data for the specific instance type and 

241 combines it with cluster metrics using weighted scoring. 

242 """ 

243 capacity_checker = CapacityChecker(self.config) 

244 

245 scored_regions: list[dict[str, Any]] = [] 

246 

247 for cap in capacities: 

248 region = cap.region 

249 

250 # Gather instance-specific signals for this region 

251 spot_score = 0.0 

252 spot_price_ratio = 1.0 # spot/on-demand ratio (lower = better savings) 

253 

254 try: 

255 placement_scores = capacity_checker.get_spot_placement_score( 

256 instance_type, region, target_capacity=max(1, gpu_count) 

257 ) 

258 if placement_scores: 258 ↛ 265line 258 didn't jump to line 265 because the condition on line 258 was always true

259 spot_score = placement_scores.get("regional", 0) / 10.0 # Normalize to 0-1 

260 except Exception as e: 

261 logger.debug( 

262 "Failed to get spot placement score for %s in %s: %s", instance_type, region, e 

263 ) 

264 

265 try: 

266 spot_prices = capacity_checker.get_spot_price_history(instance_type, region) 

267 on_demand_price = capacity_checker.get_on_demand_price(instance_type, region) 

268 

269 if spot_prices and on_demand_price and on_demand_price > 0: 

270 avg_spot = statistics.mean(sp.current_price for sp in spot_prices) 

271 spot_price_ratio = avg_spot / on_demand_price 

272 except Exception as e: 

273 logger.debug( 

274 "Failed to get spot pricing for %s in %s: %s", instance_type, region, e 

275 ) 

276 

277 # Capacity Block trend — compares near-term vs far-term offering 

278 # density to detect whether AWS is adding or consuming capacity 

279 # in this region for the requested instance type. 

280 try: 

281 cb_trend = capacity_checker.get_capacity_block_trend(instance_type, region) 

282 except Exception: 

283 cb_trend = 0.0 

284 

285 weighted_score = compute_weighted_score( 

286 spot_placement_score=spot_score, 

287 spot_price_ratio=spot_price_ratio, 

288 queue_depth=cap.queue_depth, 

289 gpu_utilization=cap.gpu_utilization, 

290 running_jobs=cap.running_jobs, 

291 capacity_block_trend=cb_trend, 

292 ) 

293 

294 scored_regions.append( 

295 { 

296 "region": region, 

297 "score": weighted_score, 

298 "queue_depth": cap.queue_depth, 

299 "gpu_utilization": cap.gpu_utilization, 

300 "running_jobs": cap.running_jobs, 

301 "spot_placement_score": spot_score, 

302 "spot_price_ratio": spot_price_ratio, 

303 "capacity_block_trend": cb_trend, 

304 } 

305 ) 

306 

307 scored_regions.sort(key=lambda x: x["score"]) 

308 best = scored_regions[0] 

309 

310 # Build justification from the signals 

311 reasons = [] 

312 if best["spot_placement_score"] >= 0.7: 

313 reasons.append(f"high spot availability ({best['spot_placement_score']:.0%})") 

314 elif best["spot_placement_score"] >= 0.4: 

315 reasons.append(f"moderate spot availability ({best['spot_placement_score']:.0%})") 

316 

317 if best["spot_price_ratio"] < 0.5: 

318 reasons.append(f"good spot savings ({1 - best['spot_price_ratio']:.0%} off on-demand)") 

319 

320 if best["queue_depth"] == 0: 

321 reasons.append("empty queue") 

322 elif best["queue_depth"] < 5: 322 ↛ 325line 322 didn't jump to line 325 because the condition on line 322 was always true

323 reasons.append(f"low queue depth ({best['queue_depth']})") 

324 

325 if best["gpu_utilization"] < 50: 325 ↛ 328line 325 didn't jump to line 328 because the condition on line 325 was always true

326 reasons.append(f"{100 - best['gpu_utilization']:.0f}% GPU available") 

327 

328 if best["running_jobs"] == 0: 

329 reasons.append("no running jobs") 

330 elif best["running_jobs"] < 5: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true

331 reasons.append(f"few running jobs ({best['running_jobs']})") 

332 

333 if best.get("capacity_block_trend", 0) > 0.2: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 reasons.append("capacity block availability trending up") 

335 elif best.get("capacity_block_trend", 0) < -0.2: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 reasons.append("capacity block availability trending down") 

337 

338 reason = ", ".join(reasons) if reasons else f"best weighted score for {instance_type}" 

339 

340 return { 

341 "region": best["region"], 

342 "reason": reason, 

343 "score": best["score"], 

344 "queue_depth": best["queue_depth"], 

345 "gpu_utilization": best["gpu_utilization"], 

346 "running_jobs": best["running_jobs"], 

347 "instance_type": instance_type, 

348 "scoring_method": "weighted", 

349 "all_regions": scored_regions, 

350 } 

351 

352 

353def compute_price_trend(prices: list[float]) -> dict[str, Any]: 

354 """ 

355 Compute a linear regression trend over a price time series. 

356 

357 Prices are assumed to be ordered most-recent-first (as returned by 

358 the EC2 spot price history API). The series is reversed internally 

359 so the slope represents change over time (positive = prices rising). 

360 

361 Args: 

362 prices: List of price points, most recent first. 

363 

364 Returns: 

365 Dict with: 

366 slope: price change per sample period (positive = rising) 

367 normalized_slope: slope / mean_price (scale-independent, -1 to 1 clamped) 

368 price_changes: number of distinct price transitions (proxy for volatility) 

369 direction: "rising", "falling", or "stable" 

370 """ 

371 if len(prices) < 2: 

372 return { 

373 "slope": 0.0, 

374 "normalized_slope": 0.0, 

375 "price_changes": 0, 

376 "direction": "stable", 

377 } 

378 

379 # Reverse so index 0 = oldest, index N = newest 

380 series = list(reversed(prices)) 

381 n = len(series) 

382 

383 # Count distinct price transitions (proxy for interruption frequency) 

384 price_changes = sum(1 for i in range(1, n) if series[i] != series[i - 1]) 

385 

386 # Linear regression: slope of price over time 

387 x_mean = (n - 1) / 2.0 

388 y_mean = statistics.mean(series) 

389 

390 numerator = sum((i - x_mean) * (series[i] - y_mean) for i in range(n)) 

391 denominator = sum((i - x_mean) ** 2 for i in range(n)) 

392 

393 if denominator == 0 or y_mean == 0: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 return { 

395 "slope": 0.0, 

396 "normalized_slope": 0.0, 

397 "price_changes": price_changes, 

398 "direction": "stable", 

399 } 

400 

401 slope = numerator / denominator 

402 normalized = max(-1.0, min(1.0, slope / y_mean)) 

403 

404 if normalized > 0.05: 

405 direction = "rising" 

406 elif normalized < -0.05: 

407 direction = "falling" 

408 else: 

409 direction = "stable" 

410 

411 return { 

412 "slope": round(slope, 6), 

413 "normalized_slope": round(normalized, 4), 

414 "price_changes": price_changes, 

415 "direction": direction, 

416 } 

417 

418 

419def compute_weighted_score( 

420 spot_placement_score: float = 0.0, 

421 spot_price_ratio: float = 1.0, 

422 queue_depth: int = 0, 

423 gpu_utilization: float = 0.0, 

424 running_jobs: int = 0, 

425 capacity_block_trend: float = 0.0, 

426 *, 

427 weights: dict[str, float] | None = None, 

428) -> float: 

429 """ 

430 Compute a weighted recommendation score for a region. Lower is better. 

431 

432 Combines multiple capacity signals into a single score using configurable 

433 weights. Each signal is normalized to 0-1 where 0 is best, then weighted. 

434 

435 Args: 

436 spot_placement_score: Normalized spot placement score (0-1, higher = better availability) 

437 spot_price_ratio: Spot price / on-demand price (0-1, lower = better savings) 

438 queue_depth: Number of pending jobs in the region's queue 

439 gpu_utilization: GPU utilization percentage (0-100) 

440 running_jobs: Number of currently running jobs 

441 capacity_block_trend: Trend of capacity block offerings over a 26-week window 

442 (-1 to 1). Positive means capacity is growing (regression slope positive), 

443 negative means shrinking. Derived from linear regression over weekly 

444 offering counts from the describe-capacity-block-offerings API. 

445 weights: Optional custom weights dict. Keys: spot_placement, spot_price, 

446 queue_depth, gpu_utilization, running_jobs, capacity_blocks. 

447 Values should sum to 1.0. 

448 

449 Returns: 

450 Weighted score (lower is better, range roughly 0-1) 

451 """ 

452 w = weights or { 

453 "spot_placement": 0.25, 

454 "spot_price": 0.20, 

455 "queue_depth": 0.20, 

456 "gpu_utilization": 0.15, 

457 "running_jobs": 0.10, 

458 "capacity_blocks": 0.10, 

459 } 

460 

461 # Normalize each signal to 0-1 where 0 is best 

462 # Spot placement: invert (high score = good, so 1 - score = low = good) 

463 norm_spot = 1.0 - min(max(spot_placement_score, 0.0), 1.0) 

464 

465 # Spot price ratio: already 0-1 where lower is better 

466 norm_price = min(max(spot_price_ratio, 0.0), 1.0) 

467 

468 # Queue depth: normalize with diminishing returns (0 = best) 

469 # Use tanh-like curve: depth / (depth + k) where k controls sensitivity 

470 norm_queue = queue_depth / (queue_depth + 10.0) if queue_depth >= 0 else 0.0 

471 

472 # GPU utilization: normalize 0-100 to 0-1 

473 norm_gpu = min(max(gpu_utilization, 0.0), 100.0) / 100.0 

474 

475 # Running jobs: normalize with diminishing returns 

476 norm_jobs = running_jobs / (running_jobs + 20.0) if running_jobs >= 0 else 0.0 

477 

478 # Capacity block trend: invert and shift from [-1,1] to [0,1] 

479 # trend +1 (growing) → 0.0 (best), trend -1 (shrinking) → 1.0 (worst) 

480 clamped_trend = min(max(capacity_block_trend, -1.0), 1.0) 

481 norm_blocks = (1.0 - clamped_trend) / 2.0 

482 

483 score = ( 

484 w["spot_placement"] * norm_spot 

485 + w["spot_price"] * norm_price 

486 + w["queue_depth"] * norm_queue 

487 + w["gpu_utilization"] * norm_gpu 

488 + w["running_jobs"] * norm_jobs 

489 + w.get("capacity_blocks", 0) * norm_blocks 

490 ) 

491 

492 return round(score, 4) 

493 

494 

495def get_multi_region_capacity_checker( 

496 config: GCOConfig | None = None, 

497) -> MultiRegionCapacityChecker: 

498 """Get a configured multi-region capacity checker instance.""" 

499 return MultiRegionCapacityChecker(config)