Coverage for cli / capacity / multi_region.py: 89%
185 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""Multi-region capacity checking and weighted scoring."""
3from __future__ import annotations
5import contextlib
6import logging
7import statistics
8from dataclasses import dataclass
9from datetime import UTC, datetime, timedelta
10from typing import Any
12import boto3
13from botocore.exceptions import ClientError
15from cli.config import GCOConfig, get_config
17from .checker import CapacityChecker
19logger = logging.getLogger(__name__)
22@dataclass
23class RegionCapacity:
24 """Capacity information for a region."""
26 region: str
27 queue_depth: int = 0
28 pending_jobs: int = 0
29 running_jobs: int = 0
30 gpu_utilization: float = 0.0
31 cpu_utilization: float = 0.0
32 available_gpus: int = 0
33 total_gpus: int = 0
34 avg_wait_time_seconds: int = 0
35 recommendation_score: float = 0.0
38class MultiRegionCapacityChecker:
39 """
40 Checks capacity across multiple GCO regions.
42 Provides:
43 - Multi-region capacity overview
44 - Intelligent region recommendation
45 - Queue depth analysis
46 - Resource utilization metrics
47 """
49 def __init__(self, config: GCOConfig | None = None):
50 self.config = config or get_config()
51 self._session = boto3.Session()
53 def get_region_capacity(self, region: str) -> RegionCapacity:
54 """Get capacity information for a single region."""
55 from cli.aws_client import get_aws_client
57 aws_client = get_aws_client(self.config)
58 stack = aws_client.get_regional_stack(region)
60 if not stack: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 return RegionCapacity(region=region)
63 capacity = RegionCapacity(region=region)
65 # Get queue depth from SQS
66 try:
67 cfn = self._session.client("cloudformation", region_name=region)
68 response = cfn.describe_stacks(StackName=stack.stack_name)
69 outputs = {
70 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", [])
71 }
73 queue_url = outputs.get("JobQueueUrl")
74 if queue_url: 74 ↛ 91line 74 didn't jump to line 91 because the condition on line 74 was always true
75 sqs = self._session.client("sqs", region_name=region)
76 attrs = sqs.get_queue_attributes(
77 QueueUrl=queue_url,
78 AttributeNames=[
79 "ApproximateNumberOfMessages",
80 "ApproximateNumberOfMessagesNotVisible",
81 ],
82 )["Attributes"]
83 capacity.queue_depth = int(attrs.get("ApproximateNumberOfMessages", 0))
84 capacity.running_jobs = int(attrs.get("ApproximateNumberOfMessagesNotVisible", 0))
85 except ClientError as e:
86 logger.debug("Failed to get queue metrics for %s: %s", region, e)
87 except Exception as e:
88 logger.warning("Unexpected error getting queue metrics for %s: %s", region, e)
90 # Get cluster metrics from CloudWatch (if available)
91 try:
92 cloudwatch = self._session.client("cloudwatch", region_name=region)
94 # Get GPU utilization from Container Insights
95 response = cloudwatch.get_metric_statistics(
96 Namespace="ContainerInsights",
97 MetricName="node_gpu_utilization",
98 Dimensions=[{"Name": "ClusterName", "Value": stack.cluster_name}],
99 StartTime=datetime.now(UTC) - timedelta(minutes=5),
100 EndTime=datetime.now(UTC),
101 Period=300,
102 Statistics=["Average"],
103 )
104 if response["Datapoints"]:
105 capacity.gpu_utilization = response["Datapoints"][0]["Average"]
107 # Get CPU utilization
108 response = cloudwatch.get_metric_statistics(
109 Namespace="ContainerInsights",
110 MetricName="node_cpu_utilization",
111 Dimensions=[{"Name": "ClusterName", "Value": stack.cluster_name}],
112 StartTime=datetime.now(UTC) - timedelta(minutes=5),
113 EndTime=datetime.now(UTC),
114 Period=300,
115 Statistics=["Average"],
116 )
117 if response["Datapoints"]:
118 capacity.cpu_utilization = response["Datapoints"][0]["Average"]
120 except ClientError as e:
121 logger.debug("Failed to get CloudWatch metrics for %s: %s", region, e)
122 except Exception as e:
123 logger.warning("Unexpected error getting CloudWatch metrics for %s: %s", region, e)
125 # Calculate recommendation score (lower is better)
126 # Factors: queue depth, GPU utilization, running jobs
127 capacity.recommendation_score = (
128 capacity.queue_depth * 10 + capacity.gpu_utilization + capacity.running_jobs * 5
129 )
131 return capacity
133 def get_all_regions_capacity(self) -> list[RegionCapacity]:
134 """Get capacity information for all deployed regions."""
135 from cli.aws_client import get_aws_client
137 aws_client = get_aws_client(self.config)
138 stacks = aws_client.discover_regional_stacks()
140 capacities = []
141 for region in stacks:
142 try:
143 capacity = self.get_region_capacity(region)
144 capacities.append(capacity)
145 except Exception as e:
146 logger.warning("Failed to get capacity for region %s: %s", region, e)
147 continue
149 return capacities
151 def recommend_region_for_job(
152 self,
153 gpu_required: bool = False,
154 min_gpus: int = 0,
155 instance_type: str | None = None,
156 gpu_count: int = 0,
157 ) -> dict[str, Any]:
158 """
159 Recommend the optimal region for job placement.
161 When instance_type is provided, uses weighted multi-signal scoring that
162 combines spot placement scores, pricing, queue depth, GPU utilization,
163 and running job counts. Falls back to simple scoring when instance_type
164 is not specified.
166 Args:
167 gpu_required: Whether the job requires GPUs
168 min_gpus: Minimum number of GPUs required
169 instance_type: Specific instance type for workload-aware scoring
170 gpu_count: Number of GPUs required
172 Returns:
173 Dictionary with recommended region and justification
174 """
175 capacities = self.get_all_regions_capacity()
177 if not capacities:
178 return {
179 "region": self.config.default_region,
180 "reason": "No capacity data available, using default region",
181 "score": 0,
182 }
184 # When instance_type is provided, use weighted scoring with capacity data
185 if instance_type:
186 return self._weighted_recommend(capacities, instance_type, gpu_count or min_gpus)
188 # Fallback: simple scoring (existing behavior)
189 return self._simple_recommend(capacities)
191 def _simple_recommend(self, capacities: list[RegionCapacity]) -> dict[str, Any]:
192 """Simple recommendation using the existing composite score."""
193 sorted_capacities = sorted(capacities, key=lambda x: x.recommendation_score)
194 best = sorted_capacities[0]
196 reasons = []
197 if best.queue_depth == 0:
198 reasons.append("empty queue")
199 elif best.queue_depth < 5: 199 ↛ 202line 199 didn't jump to line 202 because the condition on line 199 was always true
200 reasons.append(f"low queue depth ({best.queue_depth})")
202 if best.gpu_utilization < 50:
203 reasons.append(f"{100 - best.gpu_utilization:.0f}% GPU available")
204 elif best.gpu_utilization < 80: 204 ↛ 207line 204 didn't jump to line 207 because the condition on line 204 was always true
205 reasons.append(f"moderate GPU utilization ({best.gpu_utilization:.0f}%)")
207 if best.running_jobs == 0:
208 reasons.append("no running jobs")
209 elif best.running_jobs < 5: 209 ↛ 212line 209 didn't jump to line 212 because the condition on line 209 was always true
210 reasons.append(f"few running jobs ({best.running_jobs})")
212 reason = ", ".join(reasons) if reasons else "best overall capacity"
214 return {
215 "region": best.region,
216 "reason": reason,
217 "score": best.recommendation_score,
218 "queue_depth": best.queue_depth,
219 "gpu_utilization": best.gpu_utilization,
220 "running_jobs": best.running_jobs,
221 "all_regions": [
222 {
223 "region": c.region,
224 "score": c.recommendation_score,
225 "queue_depth": c.queue_depth,
226 "gpu_utilization": c.gpu_utilization,
227 }
228 for c in sorted_capacities
229 ],
230 }
232 def _weighted_recommend(
233 self,
234 capacities: list[RegionCapacity],
235 instance_type: str,
236 gpu_count: int = 0,
237 ) -> dict[str, Any]:
238 """
239 Workload-aware recommendation using weighted multi-signal scoring.
241 Gathers per-region capacity data for the specific instance type and
242 combines it with cluster metrics using weighted scoring.
243 """
244 capacity_checker = CapacityChecker(self.config)
246 scored_regions: list[dict[str, Any]] = []
248 for cap in capacities:
249 region = cap.region
251 # Gather instance-specific signals for this region
252 spot_score = 0.0
253 spot_price_ratio = 1.0 # spot/on-demand ratio (lower = better savings)
255 try:
256 placement_scores = capacity_checker.get_spot_placement_score(
257 instance_type, region, target_capacity=max(1, gpu_count)
258 )
259 if placement_scores: 259 ↛ 266line 259 didn't jump to line 266 because the condition on line 259 was always true
260 spot_score = placement_scores.get("regional", 0) / 10.0 # Normalize to 0-1
261 except Exception as e:
262 logger.debug(
263 "Failed to get spot placement score for %s in %s: %s", instance_type, region, e
264 )
266 try:
267 spot_prices = capacity_checker.get_spot_price_history(instance_type, region)
268 on_demand_price = capacity_checker.get_on_demand_price(instance_type, region)
270 if spot_prices and on_demand_price and on_demand_price > 0:
271 avg_spot = statistics.mean(sp.current_price for sp in spot_prices)
272 spot_price_ratio = avg_spot / on_demand_price
273 except Exception as e:
274 logger.debug(
275 "Failed to get spot pricing for %s in %s: %s", instance_type, region, e
276 )
278 # Capacity Block trend — compares near-term vs far-term offering
279 # density to detect whether AWS is adding or consuming capacity
280 # in this region for the requested instance type.
281 cb_trend = 0.0
282 with contextlib.suppress(Exception):
283 cb_trend = capacity_checker.get_capacity_block_trend(instance_type, region)
285 weighted_score = compute_weighted_score(
286 spot_placement_score=spot_score,
287 spot_price_ratio=spot_price_ratio,
288 queue_depth=cap.queue_depth,
289 gpu_utilization=cap.gpu_utilization,
290 running_jobs=cap.running_jobs,
291 capacity_block_trend=cb_trend,
292 )
294 scored_regions.append(
295 {
296 "region": region,
297 "score": weighted_score,
298 "queue_depth": cap.queue_depth,
299 "gpu_utilization": cap.gpu_utilization,
300 "running_jobs": cap.running_jobs,
301 "spot_placement_score": spot_score,
302 "spot_price_ratio": spot_price_ratio,
303 "capacity_block_trend": cb_trend,
304 }
305 )
307 scored_regions.sort(key=lambda x: x["score"])
308 best = scored_regions[0]
310 # Build justification from the signals
311 reasons = []
312 if best["spot_placement_score"] >= 0.7:
313 reasons.append(f"high spot availability ({best['spot_placement_score']:.0%})")
314 elif best["spot_placement_score"] >= 0.4:
315 reasons.append(f"moderate spot availability ({best['spot_placement_score']:.0%})")
317 if best["spot_price_ratio"] < 0.5:
318 reasons.append(f"good spot savings ({1 - best['spot_price_ratio']:.0%} off on-demand)")
320 if best["queue_depth"] == 0:
321 reasons.append("empty queue")
322 elif best["queue_depth"] < 5: 322 ↛ 325line 322 didn't jump to line 325 because the condition on line 322 was always true
323 reasons.append(f"low queue depth ({best['queue_depth']})")
325 if best["gpu_utilization"] < 50: 325 ↛ 328line 325 didn't jump to line 328 because the condition on line 325 was always true
326 reasons.append(f"{100 - best['gpu_utilization']:.0f}% GPU available")
328 if best["running_jobs"] == 0:
329 reasons.append("no running jobs")
330 elif best["running_jobs"] < 5: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true
331 reasons.append(f"few running jobs ({best['running_jobs']})")
333 if best.get("capacity_block_trend", 0) > 0.2: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true
334 reasons.append("capacity block availability trending up")
335 elif best.get("capacity_block_trend", 0) < -0.2: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 reasons.append("capacity block availability trending down")
338 reason = ", ".join(reasons) if reasons else f"best weighted score for {instance_type}"
340 return {
341 "region": best["region"],
342 "reason": reason,
343 "score": best["score"],
344 "queue_depth": best["queue_depth"],
345 "gpu_utilization": best["gpu_utilization"],
346 "running_jobs": best["running_jobs"],
347 "instance_type": instance_type,
348 "scoring_method": "weighted",
349 "all_regions": scored_regions,
350 }
353def compute_price_trend(prices: list[float]) -> dict[str, Any]:
354 """
355 Compute a linear regression trend over a price time series.
357 Prices are assumed to be ordered most-recent-first (as returned by
358 the EC2 spot price history API). The series is reversed internally
359 so the slope represents change over time (positive = prices rising).
361 Args:
362 prices: List of price points, most recent first.
364 Returns:
365 Dict with:
366 slope: price change per sample period (positive = rising)
367 normalized_slope: slope / mean_price (scale-independent, -1 to 1 clamped)
368 price_changes: number of distinct price transitions (proxy for volatility)
369 direction: "rising", "falling", or "stable"
370 """
371 if len(prices) < 2:
372 return {
373 "slope": 0.0,
374 "normalized_slope": 0.0,
375 "price_changes": 0,
376 "direction": "stable",
377 }
379 # Reverse so index 0 = oldest, index N = newest
380 series = list(reversed(prices))
381 n = len(series)
383 # Count distinct price transitions (proxy for interruption frequency)
384 price_changes = sum(1 for i in range(1, n) if series[i] != series[i - 1])
386 # Linear regression: slope of price over time
387 x_mean = (n - 1) / 2.0
388 y_mean = statistics.mean(series)
390 numerator = sum((i - x_mean) * (series[i] - y_mean) for i in range(n))
391 denominator = sum((i - x_mean) ** 2 for i in range(n))
393 if denominator == 0 or y_mean == 0: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true
394 return {
395 "slope": 0.0,
396 "normalized_slope": 0.0,
397 "price_changes": price_changes,
398 "direction": "stable",
399 }
401 slope = numerator / denominator
402 normalized = max(-1.0, min(1.0, slope / y_mean))
404 if normalized > 0.05:
405 direction = "rising"
406 elif normalized < -0.05:
407 direction = "falling"
408 else:
409 direction = "stable"
411 return {
412 "slope": round(slope, 6),
413 "normalized_slope": round(normalized, 4),
414 "price_changes": price_changes,
415 "direction": direction,
416 }
419def compute_weighted_score(
420 spot_placement_score: float = 0.0,
421 spot_price_ratio: float = 1.0,
422 queue_depth: int = 0,
423 gpu_utilization: float = 0.0,
424 running_jobs: int = 0,
425 capacity_block_trend: float = 0.0,
426 *,
427 weights: dict[str, float] | None = None,
428) -> float:
429 """
430 Compute a weighted recommendation score for a region. Lower is better.
432 Combines multiple capacity signals into a single score using configurable
433 weights. Each signal is normalized to 0-1 where 0 is best, then weighted.
435 Args:
436 spot_placement_score: Normalized spot placement score (0-1, higher = better availability)
437 spot_price_ratio: Spot price / on-demand price (0-1, lower = better savings)
438 queue_depth: Number of pending jobs in the region's queue
439 gpu_utilization: GPU utilization percentage (0-100)
440 running_jobs: Number of currently running jobs
441 capacity_block_trend: Trend of capacity block offerings over a 26-week window
442 (-1 to 1). Positive means capacity is growing (regression slope positive),
443 negative means shrinking. Derived from linear regression over weekly
444 offering counts from the describe-capacity-block-offerings API.
445 weights: Optional custom weights dict. Keys: spot_placement, spot_price,
446 queue_depth, gpu_utilization, running_jobs, capacity_blocks.
447 Values should sum to 1.0.
449 Returns:
450 Weighted score (lower is better, range roughly 0-1)
451 """
452 w = weights or {
453 "spot_placement": 0.25,
454 "spot_price": 0.20,
455 "queue_depth": 0.20,
456 "gpu_utilization": 0.15,
457 "running_jobs": 0.10,
458 "capacity_blocks": 0.10,
459 }
461 # Normalize each signal to 0-1 where 0 is best
462 # Spot placement: invert (high score = good, so 1 - score = low = good)
463 norm_spot = 1.0 - min(max(spot_placement_score, 0.0), 1.0)
465 # Spot price ratio: already 0-1 where lower is better
466 norm_price = min(max(spot_price_ratio, 0.0), 1.0)
468 # Queue depth: normalize with diminishing returns (0 = best)
469 # Use tanh-like curve: depth / (depth + k) where k controls sensitivity
470 norm_queue = queue_depth / (queue_depth + 10.0) if queue_depth >= 0 else 0.0
472 # GPU utilization: normalize 0-100 to 0-1
473 norm_gpu = min(max(gpu_utilization, 0.0), 100.0) / 100.0
475 # Running jobs: normalize with diminishing returns
476 norm_jobs = running_jobs / (running_jobs + 20.0) if running_jobs >= 0 else 0.0
478 # Capacity block trend: invert and shift from [-1,1] to [0,1]
479 # trend +1 (growing) → 0.0 (best), trend -1 (shrinking) → 1.0 (worst)
480 clamped_trend = min(max(capacity_block_trend, -1.0), 1.0)
481 norm_blocks = (1.0 - clamped_trend) / 2.0
483 score = (
484 w["spot_placement"] * norm_spot
485 + w["spot_price"] * norm_price
486 + w["queue_depth"] * norm_queue
487 + w["gpu_utilization"] * norm_gpu
488 + w["running_jobs"] * norm_jobs
489 + w.get("capacity_blocks", 0) * norm_blocks
490 )
492 return round(score, 4)
495def get_multi_region_capacity_checker(
496 config: GCOConfig | None = None,
497) -> MultiRegionCapacityChecker:
498 """Get a configured multi-region capacity checker instance."""
499 return MultiRegionCapacityChecker(config)