Coverage for cli / capacity / multi_region.py: 89%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1"""Multi-region capacity checking and weighted scoring.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6import logging 

7import statistics 

8from dataclasses import dataclass 

9from datetime import UTC, datetime, timedelta 

10from typing import Any 

11 

12import boto3 

13from botocore.exceptions import ClientError 

14 

15from cli.config import GCOConfig, get_config 

16 

17from .checker import CapacityChecker 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22@dataclass 

23class RegionCapacity: 

24 """Capacity information for a region.""" 

25 

26 region: str 

27 queue_depth: int = 0 

28 pending_jobs: int = 0 

29 running_jobs: int = 0 

30 gpu_utilization: float = 0.0 

31 cpu_utilization: float = 0.0 

32 available_gpus: int = 0 

33 total_gpus: int = 0 

34 avg_wait_time_seconds: int = 0 

35 recommendation_score: float = 0.0 

36 

37 

38class MultiRegionCapacityChecker: 

39 """ 

40 Checks capacity across multiple GCO regions. 

41 

42 Provides: 

43 - Multi-region capacity overview 

44 - Intelligent region recommendation 

45 - Queue depth analysis 

46 - Resource utilization metrics 

47 """ 

48 

49 def __init__(self, config: GCOConfig | None = None): 

50 self.config = config or get_config() 

51 self._session = boto3.Session() 

52 

53 def get_region_capacity(self, region: str) -> RegionCapacity: 

54 """Get capacity information for a single region.""" 

55 from cli.aws_client import get_aws_client 

56 

57 aws_client = get_aws_client(self.config) 

58 stack = aws_client.get_regional_stack(region) 

59 

60 if not stack: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 return RegionCapacity(region=region) 

62 

63 capacity = RegionCapacity(region=region) 

64 

65 # Get queue depth from SQS 

66 try: 

67 cfn = self._session.client("cloudformation", region_name=region) 

68 response = cfn.describe_stacks(StackName=stack.stack_name) 

69 outputs = { 

70 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", []) 

71 } 

72 

73 queue_url = outputs.get("JobQueueUrl") 

74 if queue_url: 74 ↛ 91line 74 didn't jump to line 91 because the condition on line 74 was always true

75 sqs = self._session.client("sqs", region_name=region) 

76 attrs = sqs.get_queue_attributes( 

77 QueueUrl=queue_url, 

78 AttributeNames=[ 

79 "ApproximateNumberOfMessages", 

80 "ApproximateNumberOfMessagesNotVisible", 

81 ], 

82 )["Attributes"] 

83 capacity.queue_depth = int(attrs.get("ApproximateNumberOfMessages", 0)) 

84 capacity.running_jobs = int(attrs.get("ApproximateNumberOfMessagesNotVisible", 0)) 

85 except ClientError as e: 

86 logger.debug("Failed to get queue metrics for %s: %s", region, e) 

87 except Exception as e: 

88 logger.warning("Unexpected error getting queue metrics for %s: %s", region, e) 

89 

90 # Get cluster metrics from CloudWatch (if available) 

91 try: 

92 cloudwatch = self._session.client("cloudwatch", region_name=region) 

93 

94 # Get GPU utilization from Container Insights 

95 response = cloudwatch.get_metric_statistics( 

96 Namespace="ContainerInsights", 

97 MetricName="node_gpu_utilization", 

98 Dimensions=[{"Name": "ClusterName", "Value": stack.cluster_name}], 

99 StartTime=datetime.now(UTC) - timedelta(minutes=5), 

100 EndTime=datetime.now(UTC), 

101 Period=300, 

102 Statistics=["Average"], 

103 ) 

104 if response["Datapoints"]: 

105 capacity.gpu_utilization = response["Datapoints"][0]["Average"] 

106 

107 # Get CPU utilization 

108 response = cloudwatch.get_metric_statistics( 

109 Namespace="ContainerInsights", 

110 MetricName="node_cpu_utilization", 

111 Dimensions=[{"Name": "ClusterName", "Value": stack.cluster_name}], 

112 StartTime=datetime.now(UTC) - timedelta(minutes=5), 

113 EndTime=datetime.now(UTC), 

114 Period=300, 

115 Statistics=["Average"], 

116 ) 

117 if response["Datapoints"]: 

118 capacity.cpu_utilization = response["Datapoints"][0]["Average"] 

119 

120 except ClientError as e: 

121 logger.debug("Failed to get CloudWatch metrics for %s: %s", region, e) 

122 except Exception as e: 

123 logger.warning("Unexpected error getting CloudWatch metrics for %s: %s", region, e) 

124 

125 # Calculate recommendation score (lower is better) 

126 # Factors: queue depth, GPU utilization, running jobs 

127 capacity.recommendation_score = ( 

128 capacity.queue_depth * 10 + capacity.gpu_utilization + capacity.running_jobs * 5 

129 ) 

130 

131 return capacity 

132 

133 def get_all_regions_capacity(self) -> list[RegionCapacity]: 

134 """Get capacity information for all deployed regions.""" 

135 from cli.aws_client import get_aws_client 

136 

137 aws_client = get_aws_client(self.config) 

138 stacks = aws_client.discover_regional_stacks() 

139 

140 capacities = [] 

141 for region in stacks: 

142 try: 

143 capacity = self.get_region_capacity(region) 

144 capacities.append(capacity) 

145 except Exception as e: 

146 logger.warning("Failed to get capacity for region %s: %s", region, e) 

147 continue 

148 

149 return capacities 

150 

151 def recommend_region_for_job( 

152 self, 

153 gpu_required: bool = False, 

154 min_gpus: int = 0, 

155 instance_type: str | None = None, 

156 gpu_count: int = 0, 

157 ) -> dict[str, Any]: 

158 """ 

159 Recommend the optimal region for job placement. 

160 

161 When instance_type is provided, uses weighted multi-signal scoring that 

162 combines spot placement scores, pricing, queue depth, GPU utilization, 

163 and running job counts. Falls back to simple scoring when instance_type 

164 is not specified. 

165 

166 Args: 

167 gpu_required: Whether the job requires GPUs 

168 min_gpus: Minimum number of GPUs required 

169 instance_type: Specific instance type for workload-aware scoring 

170 gpu_count: Number of GPUs required 

171 

172 Returns: 

173 Dictionary with recommended region and justification 

174 """ 

175 capacities = self.get_all_regions_capacity() 

176 

177 if not capacities: 

178 return { 

179 "region": self.config.default_region, 

180 "reason": "No capacity data available, using default region", 

181 "score": 0, 

182 } 

183 

184 # When instance_type is provided, use weighted scoring with capacity data 

185 if instance_type: 

186 return self._weighted_recommend(capacities, instance_type, gpu_count or min_gpus) 

187 

188 # Fallback: simple scoring (existing behavior) 

189 return self._simple_recommend(capacities) 

190 

191 def _simple_recommend(self, capacities: list[RegionCapacity]) -> dict[str, Any]: 

192 """Simple recommendation using the existing composite score.""" 

193 sorted_capacities = sorted(capacities, key=lambda x: x.recommendation_score) 

194 best = sorted_capacities[0] 

195 

196 reasons = [] 

197 if best.queue_depth == 0: 

198 reasons.append("empty queue") 

199 elif best.queue_depth < 5: 199 ↛ 202line 199 didn't jump to line 202 because the condition on line 199 was always true

200 reasons.append(f"low queue depth ({best.queue_depth})") 

201 

202 if best.gpu_utilization < 50: 

203 reasons.append(f"{100 - best.gpu_utilization:.0f}% GPU available") 

204 elif best.gpu_utilization < 80: 204 ↛ 207line 204 didn't jump to line 207 because the condition on line 204 was always true

205 reasons.append(f"moderate GPU utilization ({best.gpu_utilization:.0f}%)") 

206 

207 if best.running_jobs == 0: 

208 reasons.append("no running jobs") 

209 elif best.running_jobs < 5: 209 ↛ 212line 209 didn't jump to line 212 because the condition on line 209 was always true

210 reasons.append(f"few running jobs ({best.running_jobs})") 

211 

212 reason = ", ".join(reasons) if reasons else "best overall capacity" 

213 

214 return { 

215 "region": best.region, 

216 "reason": reason, 

217 "score": best.recommendation_score, 

218 "queue_depth": best.queue_depth, 

219 "gpu_utilization": best.gpu_utilization, 

220 "running_jobs": best.running_jobs, 

221 "all_regions": [ 

222 { 

223 "region": c.region, 

224 "score": c.recommendation_score, 

225 "queue_depth": c.queue_depth, 

226 "gpu_utilization": c.gpu_utilization, 

227 } 

228 for c in sorted_capacities 

229 ], 

230 } 

231 

232 def _weighted_recommend( 

233 self, 

234 capacities: list[RegionCapacity], 

235 instance_type: str, 

236 gpu_count: int = 0, 

237 ) -> dict[str, Any]: 

238 """ 

239 Workload-aware recommendation using weighted multi-signal scoring. 

240 

241 Gathers per-region capacity data for the specific instance type and 

242 combines it with cluster metrics using weighted scoring. 

243 """ 

244 capacity_checker = CapacityChecker(self.config) 

245 

246 scored_regions: list[dict[str, Any]] = [] 

247 

248 for cap in capacities: 

249 region = cap.region 

250 

251 # Gather instance-specific signals for this region 

252 spot_score = 0.0 

253 spot_price_ratio = 1.0 # spot/on-demand ratio (lower = better savings) 

254 

255 try: 

256 placement_scores = capacity_checker.get_spot_placement_score( 

257 instance_type, region, target_capacity=max(1, gpu_count) 

258 ) 

259 if placement_scores: 259 ↛ 266line 259 didn't jump to line 266 because the condition on line 259 was always true

260 spot_score = placement_scores.get("regional", 0) / 10.0 # Normalize to 0-1 

261 except Exception as e: 

262 logger.debug( 

263 "Failed to get spot placement score for %s in %s: %s", instance_type, region, e 

264 ) 

265 

266 try: 

267 spot_prices = capacity_checker.get_spot_price_history(instance_type, region) 

268 on_demand_price = capacity_checker.get_on_demand_price(instance_type, region) 

269 

270 if spot_prices and on_demand_price and on_demand_price > 0: 

271 avg_spot = statistics.mean(sp.current_price for sp in spot_prices) 

272 spot_price_ratio = avg_spot / on_demand_price 

273 except Exception as e: 

274 logger.debug( 

275 "Failed to get spot pricing for %s in %s: %s", instance_type, region, e 

276 ) 

277 

278 # Capacity Block trend — compares near-term vs far-term offering 

279 # density to detect whether AWS is adding or consuming capacity 

280 # in this region for the requested instance type. 

281 cb_trend = 0.0 

282 with contextlib.suppress(Exception): 

283 cb_trend = capacity_checker.get_capacity_block_trend(instance_type, region) 

284 

285 weighted_score = compute_weighted_score( 

286 spot_placement_score=spot_score, 

287 spot_price_ratio=spot_price_ratio, 

288 queue_depth=cap.queue_depth, 

289 gpu_utilization=cap.gpu_utilization, 

290 running_jobs=cap.running_jobs, 

291 capacity_block_trend=cb_trend, 

292 ) 

293 

294 scored_regions.append( 

295 { 

296 "region": region, 

297 "score": weighted_score, 

298 "queue_depth": cap.queue_depth, 

299 "gpu_utilization": cap.gpu_utilization, 

300 "running_jobs": cap.running_jobs, 

301 "spot_placement_score": spot_score, 

302 "spot_price_ratio": spot_price_ratio, 

303 "capacity_block_trend": cb_trend, 

304 } 

305 ) 

306 

307 scored_regions.sort(key=lambda x: x["score"]) 

308 best = scored_regions[0] 

309 

310 # Build justification from the signals 

311 reasons = [] 

312 if best["spot_placement_score"] >= 0.7: 

313 reasons.append(f"high spot availability ({best['spot_placement_score']:.0%})") 

314 elif best["spot_placement_score"] >= 0.4: 

315 reasons.append(f"moderate spot availability ({best['spot_placement_score']:.0%})") 

316 

317 if best["spot_price_ratio"] < 0.5: 

318 reasons.append(f"good spot savings ({1 - best['spot_price_ratio']:.0%} off on-demand)") 

319 

320 if best["queue_depth"] == 0: 

321 reasons.append("empty queue") 

322 elif best["queue_depth"] < 5: 322 ↛ 325line 322 didn't jump to line 325 because the condition on line 322 was always true

323 reasons.append(f"low queue depth ({best['queue_depth']})") 

324 

325 if best["gpu_utilization"] < 50: 325 ↛ 328line 325 didn't jump to line 328 because the condition on line 325 was always true

326 reasons.append(f"{100 - best['gpu_utilization']:.0f}% GPU available") 

327 

328 if best["running_jobs"] == 0: 

329 reasons.append("no running jobs") 

330 elif best["running_jobs"] < 5: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true

331 reasons.append(f"few running jobs ({best['running_jobs']})") 

332 

333 if best.get("capacity_block_trend", 0) > 0.2: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 reasons.append("capacity block availability trending up") 

335 elif best.get("capacity_block_trend", 0) < -0.2: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 reasons.append("capacity block availability trending down") 

337 

338 reason = ", ".join(reasons) if reasons else f"best weighted score for {instance_type}" 

339 

340 return { 

341 "region": best["region"], 

342 "reason": reason, 

343 "score": best["score"], 

344 "queue_depth": best["queue_depth"], 

345 "gpu_utilization": best["gpu_utilization"], 

346 "running_jobs": best["running_jobs"], 

347 "instance_type": instance_type, 

348 "scoring_method": "weighted", 

349 "all_regions": scored_regions, 

350 } 

351 

352 

353def compute_price_trend(prices: list[float]) -> dict[str, Any]: 

354 """ 

355 Compute a linear regression trend over a price time series. 

356 

357 Prices are assumed to be ordered most-recent-first (as returned by 

358 the EC2 spot price history API). The series is reversed internally 

359 so the slope represents change over time (positive = prices rising). 

360 

361 Args: 

362 prices: List of price points, most recent first. 

363 

364 Returns: 

365 Dict with: 

366 slope: price change per sample period (positive = rising) 

367 normalized_slope: slope / mean_price (scale-independent, -1 to 1 clamped) 

368 price_changes: number of distinct price transitions (proxy for volatility) 

369 direction: "rising", "falling", or "stable" 

370 """ 

371 if len(prices) < 2: 

372 return { 

373 "slope": 0.0, 

374 "normalized_slope": 0.0, 

375 "price_changes": 0, 

376 "direction": "stable", 

377 } 

378 

379 # Reverse so index 0 = oldest, index N = newest 

380 series = list(reversed(prices)) 

381 n = len(series) 

382 

383 # Count distinct price transitions (proxy for interruption frequency) 

384 price_changes = sum(1 for i in range(1, n) if series[i] != series[i - 1]) 

385 

386 # Linear regression: slope of price over time 

387 x_mean = (n - 1) / 2.0 

388 y_mean = statistics.mean(series) 

389 

390 numerator = sum((i - x_mean) * (series[i] - y_mean) for i in range(n)) 

391 denominator = sum((i - x_mean) ** 2 for i in range(n)) 

392 

393 if denominator == 0 or y_mean == 0: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 return { 

395 "slope": 0.0, 

396 "normalized_slope": 0.0, 

397 "price_changes": price_changes, 

398 "direction": "stable", 

399 } 

400 

401 slope = numerator / denominator 

402 normalized = max(-1.0, min(1.0, slope / y_mean)) 

403 

404 if normalized > 0.05: 

405 direction = "rising" 

406 elif normalized < -0.05: 

407 direction = "falling" 

408 else: 

409 direction = "stable" 

410 

411 return { 

412 "slope": round(slope, 6), 

413 "normalized_slope": round(normalized, 4), 

414 "price_changes": price_changes, 

415 "direction": direction, 

416 } 

417 

418 

419def compute_weighted_score( 

420 spot_placement_score: float = 0.0, 

421 spot_price_ratio: float = 1.0, 

422 queue_depth: int = 0, 

423 gpu_utilization: float = 0.0, 

424 running_jobs: int = 0, 

425 capacity_block_trend: float = 0.0, 

426 *, 

427 weights: dict[str, float] | None = None, 

428) -> float: 

429 """ 

430 Compute a weighted recommendation score for a region. Lower is better. 

431 

432 Combines multiple capacity signals into a single score using configurable 

433 weights. Each signal is normalized to 0-1 where 0 is best, then weighted. 

434 

435 Args: 

436 spot_placement_score: Normalized spot placement score (0-1, higher = better availability) 

437 spot_price_ratio: Spot price / on-demand price (0-1, lower = better savings) 

438 queue_depth: Number of pending jobs in the region's queue 

439 gpu_utilization: GPU utilization percentage (0-100) 

440 running_jobs: Number of currently running jobs 

441 capacity_block_trend: Trend of capacity block offerings over a 26-week window 

442 (-1 to 1). Positive means capacity is growing (regression slope positive), 

443 negative means shrinking. Derived from linear regression over weekly 

444 offering counts from the describe-capacity-block-offerings API. 

445 weights: Optional custom weights dict. Keys: spot_placement, spot_price, 

446 queue_depth, gpu_utilization, running_jobs, capacity_blocks. 

447 Values should sum to 1.0. 

448 

449 Returns: 

450 Weighted score (lower is better, range roughly 0-1) 

451 """ 

452 w = weights or { 

453 "spot_placement": 0.25, 

454 "spot_price": 0.20, 

455 "queue_depth": 0.20, 

456 "gpu_utilization": 0.15, 

457 "running_jobs": 0.10, 

458 "capacity_blocks": 0.10, 

459 } 

460 

461 # Normalize each signal to 0-1 where 0 is best 

462 # Spot placement: invert (high score = good, so 1 - score = low = good) 

463 norm_spot = 1.0 - min(max(spot_placement_score, 0.0), 1.0) 

464 

465 # Spot price ratio: already 0-1 where lower is better 

466 norm_price = min(max(spot_price_ratio, 0.0), 1.0) 

467 

468 # Queue depth: normalize with diminishing returns (0 = best) 

469 # Use tanh-like curve: depth / (depth + k) where k controls sensitivity 

470 norm_queue = queue_depth / (queue_depth + 10.0) if queue_depth >= 0 else 0.0 

471 

472 # GPU utilization: normalize 0-100 to 0-1 

473 norm_gpu = min(max(gpu_utilization, 0.0), 100.0) / 100.0 

474 

475 # Running jobs: normalize with diminishing returns 

476 norm_jobs = running_jobs / (running_jobs + 20.0) if running_jobs >= 0 else 0.0 

477 

478 # Capacity block trend: invert and shift from [-1,1] to [0,1] 

479 # trend +1 (growing) → 0.0 (best), trend -1 (shrinking) → 1.0 (worst) 

480 clamped_trend = min(max(capacity_block_trend, -1.0), 1.0) 

481 norm_blocks = (1.0 - clamped_trend) / 2.0 

482 

483 score = ( 

484 w["spot_placement"] * norm_spot 

485 + w["spot_price"] * norm_price 

486 + w["queue_depth"] * norm_queue 

487 + w["gpu_utilization"] * norm_gpu 

488 + w["running_jobs"] * norm_jobs 

489 + w.get("capacity_blocks", 0) * norm_blocks 

490 ) 

491 

492 return round(score, 4) 

493 

494 

495def get_multi_region_capacity_checker( 

496 config: GCOConfig | None = None, 

497) -> MultiRegionCapacityChecker: 

498 """Get a configured multi-region capacity checker instance.""" 

499 return MultiRegionCapacityChecker(config)