Coverage for cli/capacity/multi_region.py: 89%
185 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Multi-region capacity checking and weighted scoring."""
3from __future__ import annotations
5import logging
6import statistics
7from dataclasses import dataclass
8from datetime import UTC, datetime, timedelta
9from typing import Any
11import boto3
12from botocore.exceptions import ClientError
14from cli.config import GCOConfig, get_config
16from .checker import CapacityChecker
18logger = logging.getLogger(__name__)
21@dataclass
22class RegionCapacity:
23 """Capacity information for a region."""
25 region: str
26 queue_depth: int = 0
27 pending_jobs: int = 0
28 running_jobs: int = 0
29 gpu_utilization: float = 0.0
30 cpu_utilization: float = 0.0
31 available_gpus: int = 0
32 total_gpus: int = 0
33 avg_wait_time_seconds: int = 0
34 recommendation_score: float = 0.0
37class MultiRegionCapacityChecker:
38 """
39 Checks capacity across multiple GCO regions.
41 Provides:
42 - Multi-region capacity overview
43 - Intelligent region recommendation
44 - Queue depth analysis
45 - Resource utilization metrics
46 """
48 def __init__(self, config: GCOConfig | None = None):
49 self.config = config or get_config()
50 self._session = boto3.Session()
52 def get_region_capacity(self, region: str) -> RegionCapacity:
53 """Get capacity information for a single region."""
54 from cli.aws_client import get_aws_client
56 aws_client = get_aws_client(self.config)
57 stack = aws_client.get_regional_stack(region)
59 if not stack: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 return RegionCapacity(region=region)
62 capacity = RegionCapacity(region=region)
64 # Get queue depth from SQS
65 try:
66 cfn = self._session.client("cloudformation", region_name=region)
67 response = cfn.describe_stacks(StackName=stack.stack_name)
68 outputs = {
69 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", [])
70 }
72 queue_url = outputs.get("JobQueueUrl")
73 if queue_url: 73 ↛ 90line 73 didn't jump to line 90 because the condition on line 73 was always true
74 sqs = self._session.client("sqs", region_name=region)
75 attrs = sqs.get_queue_attributes(
76 QueueUrl=queue_url,
77 AttributeNames=[
78 "ApproximateNumberOfMessages",
79 "ApproximateNumberOfMessagesNotVisible",
80 ],
81 )["Attributes"]
82 capacity.queue_depth = int(attrs.get("ApproximateNumberOfMessages", 0))
83 capacity.running_jobs = int(attrs.get("ApproximateNumberOfMessagesNotVisible", 0))
84 except ClientError as e:
85 logger.debug("Failed to get queue metrics for %s: %s", region, e)
86 except Exception as e:
87 logger.warning("Unexpected error getting queue metrics for %s: %s", region, e)
89 # Get cluster metrics from CloudWatch (if available)
90 try:
91 cloudwatch = self._session.client("cloudwatch", region_name=region)
93 # Get GPU utilization from Container Insights
94 response = cloudwatch.get_metric_statistics(
95 Namespace="ContainerInsights",
96 MetricName="node_gpu_utilization",
97 Dimensions=[{"Name": "ClusterName", "Value": stack.cluster_name}],
98 StartTime=datetime.now(UTC) - timedelta(minutes=5),
99 EndTime=datetime.now(UTC),
100 Period=300,
101 Statistics=["Average"],
102 )
103 if response["Datapoints"]:
104 capacity.gpu_utilization = response["Datapoints"][0]["Average"]
106 # Get CPU utilization
107 response = cloudwatch.get_metric_statistics(
108 Namespace="ContainerInsights",
109 MetricName="node_cpu_utilization",
110 Dimensions=[{"Name": "ClusterName", "Value": stack.cluster_name}],
111 StartTime=datetime.now(UTC) - timedelta(minutes=5),
112 EndTime=datetime.now(UTC),
113 Period=300,
114 Statistics=["Average"],
115 )
116 if response["Datapoints"]:
117 capacity.cpu_utilization = response["Datapoints"][0]["Average"]
119 except ClientError as e:
120 logger.debug("Failed to get CloudWatch metrics for %s: %s", region, e)
121 except Exception as e:
122 logger.warning("Unexpected error getting CloudWatch metrics for %s: %s", region, e)
124 # Calculate recommendation score (lower is better)
125 # Factors: queue depth, GPU utilization, running jobs
126 capacity.recommendation_score = (
127 capacity.queue_depth * 10 + capacity.gpu_utilization + capacity.running_jobs * 5
128 )
130 return capacity
132 def get_all_regions_capacity(self) -> list[RegionCapacity]:
133 """Get capacity information for all deployed regions."""
134 from cli.aws_client import get_aws_client
136 aws_client = get_aws_client(self.config)
137 stacks = aws_client.discover_regional_stacks()
139 capacities = []
140 for region in stacks:
141 try:
142 capacity = self.get_region_capacity(region)
143 capacities.append(capacity)
144 except Exception as e:
145 logger.warning("Failed to get capacity for region %s: %s", region, e)
146 continue
148 return capacities
150 def recommend_region_for_job(
151 self,
152 gpu_required: bool = False,
153 min_gpus: int = 0,
154 instance_type: str | None = None,
155 gpu_count: int = 0,
156 ) -> dict[str, Any]:
157 """
158 Recommend the optimal region for job placement.
160 When instance_type is provided, uses weighted multi-signal scoring that
161 combines spot placement scores, pricing, queue depth, GPU utilization,
162 and running job counts. Falls back to simple scoring when instance_type
163 is not specified.
165 Args:
166 gpu_required: Whether the job requires GPUs
167 min_gpus: Minimum number of GPUs required
168 instance_type: Specific instance type for workload-aware scoring
169 gpu_count: Number of GPUs required
171 Returns:
172 Dictionary with recommended region and justification
173 """
174 capacities = self.get_all_regions_capacity()
176 if not capacities:
177 return {
178 "region": self.config.default_region,
179 "reason": "No capacity data available, using default region",
180 "score": 0,
181 }
183 # When instance_type is provided, use weighted scoring with capacity data
184 if instance_type:
185 return self._weighted_recommend(capacities, instance_type, gpu_count or min_gpus)
187 # Fallback: simple scoring (existing behavior)
188 return self._simple_recommend(capacities)
190 def _simple_recommend(self, capacities: list[RegionCapacity]) -> dict[str, Any]:
191 """Simple recommendation using the existing composite score."""
192 sorted_capacities = sorted(capacities, key=lambda x: x.recommendation_score)
193 best = sorted_capacities[0]
195 reasons = []
196 if best.queue_depth == 0:
197 reasons.append("empty queue")
198 elif best.queue_depth < 5: 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true
199 reasons.append(f"low queue depth ({best.queue_depth})")
201 if best.gpu_utilization < 50:
202 reasons.append(f"{100 - best.gpu_utilization:.0f}% GPU available")
203 elif best.gpu_utilization < 80: 203 ↛ 206line 203 didn't jump to line 206 because the condition on line 203 was always true
204 reasons.append(f"moderate GPU utilization ({best.gpu_utilization:.0f}%)")
206 if best.running_jobs == 0:
207 reasons.append("no running jobs")
208 elif best.running_jobs < 5: 208 ↛ 211line 208 didn't jump to line 211 because the condition on line 208 was always true
209 reasons.append(f"few running jobs ({best.running_jobs})")
211 reason = ", ".join(reasons) if reasons else "best overall capacity"
213 return {
214 "region": best.region,
215 "reason": reason,
216 "score": best.recommendation_score,
217 "queue_depth": best.queue_depth,
218 "gpu_utilization": best.gpu_utilization,
219 "running_jobs": best.running_jobs,
220 "all_regions": [
221 {
222 "region": c.region,
223 "score": c.recommendation_score,
224 "queue_depth": c.queue_depth,
225 "gpu_utilization": c.gpu_utilization,
226 }
227 for c in sorted_capacities
228 ],
229 }
231 def _weighted_recommend(
232 self,
233 capacities: list[RegionCapacity],
234 instance_type: str,
235 gpu_count: int = 0,
236 ) -> dict[str, Any]:
237 """
238 Workload-aware recommendation using weighted multi-signal scoring.
240 Gathers per-region capacity data for the specific instance type and
241 combines it with cluster metrics using weighted scoring.
242 """
243 capacity_checker = CapacityChecker(self.config)
245 scored_regions: list[dict[str, Any]] = []
247 for cap in capacities:
248 region = cap.region
250 # Gather instance-specific signals for this region
251 spot_score = 0.0
252 spot_price_ratio = 1.0 # spot/on-demand ratio (lower = better savings)
254 try:
255 placement_scores = capacity_checker.get_spot_placement_score(
256 instance_type, region, target_capacity=max(1, gpu_count)
257 )
258 if placement_scores: 258 ↛ 265line 258 didn't jump to line 265 because the condition on line 258 was always true
259 spot_score = placement_scores.get("regional", 0) / 10.0 # Normalize to 0-1
260 except Exception as e:
261 logger.debug(
262 "Failed to get spot placement score for %s in %s: %s", instance_type, region, e
263 )
265 try:
266 spot_prices = capacity_checker.get_spot_price_history(instance_type, region)
267 on_demand_price = capacity_checker.get_on_demand_price(instance_type, region)
269 if spot_prices and on_demand_price and on_demand_price > 0:
270 avg_spot = statistics.mean(sp.current_price for sp in spot_prices)
271 spot_price_ratio = avg_spot / on_demand_price
272 except Exception as e:
273 logger.debug(
274 "Failed to get spot pricing for %s in %s: %s", instance_type, region, e
275 )
277 # Capacity Block trend — compares near-term vs far-term offering
278 # density to detect whether AWS is adding or consuming capacity
279 # in this region for the requested instance type.
280 try:
281 cb_trend = capacity_checker.get_capacity_block_trend(instance_type, region)
282 except Exception:
283 cb_trend = 0.0
285 weighted_score = compute_weighted_score(
286 spot_placement_score=spot_score,
287 spot_price_ratio=spot_price_ratio,
288 queue_depth=cap.queue_depth,
289 gpu_utilization=cap.gpu_utilization,
290 running_jobs=cap.running_jobs,
291 capacity_block_trend=cb_trend,
292 )
294 scored_regions.append(
295 {
296 "region": region,
297 "score": weighted_score,
298 "queue_depth": cap.queue_depth,
299 "gpu_utilization": cap.gpu_utilization,
300 "running_jobs": cap.running_jobs,
301 "spot_placement_score": spot_score,
302 "spot_price_ratio": spot_price_ratio,
303 "capacity_block_trend": cb_trend,
304 }
305 )
307 scored_regions.sort(key=lambda x: x["score"])
308 best = scored_regions[0]
310 # Build justification from the signals
311 reasons = []
312 if best["spot_placement_score"] >= 0.7:
313 reasons.append(f"high spot availability ({best['spot_placement_score']:.0%})")
314 elif best["spot_placement_score"] >= 0.4:
315 reasons.append(f"moderate spot availability ({best['spot_placement_score']:.0%})")
317 if best["spot_price_ratio"] < 0.5:
318 reasons.append(f"good spot savings ({1 - best['spot_price_ratio']:.0%} off on-demand)")
320 if best["queue_depth"] == 0:
321 reasons.append("empty queue")
322 elif best["queue_depth"] < 5: 322 ↛ 325line 322 didn't jump to line 325 because the condition on line 322 was always true
323 reasons.append(f"low queue depth ({best['queue_depth']})")
325 if best["gpu_utilization"] < 50: 325 ↛ 328line 325 didn't jump to line 328 because the condition on line 325 was always true
326 reasons.append(f"{100 - best['gpu_utilization']:.0f}% GPU available")
328 if best["running_jobs"] == 0:
329 reasons.append("no running jobs")
330 elif best["running_jobs"] < 5: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true
331 reasons.append(f"few running jobs ({best['running_jobs']})")
333 if best.get("capacity_block_trend", 0) > 0.2: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true
334 reasons.append("capacity block availability trending up")
335 elif best.get("capacity_block_trend", 0) < -0.2: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 reasons.append("capacity block availability trending down")
338 reason = ", ".join(reasons) if reasons else f"best weighted score for {instance_type}"
340 return {
341 "region": best["region"],
342 "reason": reason,
343 "score": best["score"],
344 "queue_depth": best["queue_depth"],
345 "gpu_utilization": best["gpu_utilization"],
346 "running_jobs": best["running_jobs"],
347 "instance_type": instance_type,
348 "scoring_method": "weighted",
349 "all_regions": scored_regions,
350 }
353def compute_price_trend(prices: list[float]) -> dict[str, Any]:
354 """
355 Compute a linear regression trend over a price time series.
357 Prices are assumed to be ordered most-recent-first (as returned by
358 the EC2 spot price history API). The series is reversed internally
359 so the slope represents change over time (positive = prices rising).
361 Args:
362 prices: List of price points, most recent first.
364 Returns:
365 Dict with:
366 slope: price change per sample period (positive = rising)
367 normalized_slope: slope / mean_price (scale-independent, -1 to 1 clamped)
368 price_changes: number of distinct price transitions (proxy for volatility)
369 direction: "rising", "falling", or "stable"
370 """
371 if len(prices) < 2:
372 return {
373 "slope": 0.0,
374 "normalized_slope": 0.0,
375 "price_changes": 0,
376 "direction": "stable",
377 }
379 # Reverse so index 0 = oldest, index N = newest
380 series = list(reversed(prices))
381 n = len(series)
383 # Count distinct price transitions (proxy for interruption frequency)
384 price_changes = sum(1 for i in range(1, n) if series[i] != series[i - 1])
386 # Linear regression: slope of price over time
387 x_mean = (n - 1) / 2.0
388 y_mean = statistics.mean(series)
390 numerator = sum((i - x_mean) * (series[i] - y_mean) for i in range(n))
391 denominator = sum((i - x_mean) ** 2 for i in range(n))
393 if denominator == 0 or y_mean == 0: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true
394 return {
395 "slope": 0.0,
396 "normalized_slope": 0.0,
397 "price_changes": price_changes,
398 "direction": "stable",
399 }
401 slope = numerator / denominator
402 normalized = max(-1.0, min(1.0, slope / y_mean))
404 if normalized > 0.05:
405 direction = "rising"
406 elif normalized < -0.05:
407 direction = "falling"
408 else:
409 direction = "stable"
411 return {
412 "slope": round(slope, 6),
413 "normalized_slope": round(normalized, 4),
414 "price_changes": price_changes,
415 "direction": direction,
416 }
419def compute_weighted_score(
420 spot_placement_score: float = 0.0,
421 spot_price_ratio: float = 1.0,
422 queue_depth: int = 0,
423 gpu_utilization: float = 0.0,
424 running_jobs: int = 0,
425 capacity_block_trend: float = 0.0,
426 *,
427 weights: dict[str, float] | None = None,
428) -> float:
429 """
430 Compute a weighted recommendation score for a region. Lower is better.
432 Combines multiple capacity signals into a single score using configurable
433 weights. Each signal is normalized to 0-1 where 0 is best, then weighted.
435 Args:
436 spot_placement_score: Normalized spot placement score (0-1, higher = better availability)
437 spot_price_ratio: Spot price / on-demand price (0-1, lower = better savings)
438 queue_depth: Number of pending jobs in the region's queue
439 gpu_utilization: GPU utilization percentage (0-100)
440 running_jobs: Number of currently running jobs
441 capacity_block_trend: Trend of capacity block offerings over a 26-week window
442 (-1 to 1). Positive means capacity is growing (regression slope positive),
443 negative means shrinking. Derived from linear regression over weekly
444 offering counts from the describe-capacity-block-offerings API.
445 weights: Optional custom weights dict. Keys: spot_placement, spot_price,
446 queue_depth, gpu_utilization, running_jobs, capacity_blocks.
447 Values should sum to 1.0.
449 Returns:
450 Weighted score (lower is better, range roughly 0-1)
451 """
452 w = weights or {
453 "spot_placement": 0.25,
454 "spot_price": 0.20,
455 "queue_depth": 0.20,
456 "gpu_utilization": 0.15,
457 "running_jobs": 0.10,
458 "capacity_blocks": 0.10,
459 }
461 # Normalize each signal to 0-1 where 0 is best
462 # Spot placement: invert (high score = good, so 1 - score = low = good)
463 norm_spot = 1.0 - min(max(spot_placement_score, 0.0), 1.0)
465 # Spot price ratio: already 0-1 where lower is better
466 norm_price = min(max(spot_price_ratio, 0.0), 1.0)
468 # Queue depth: normalize with diminishing returns (0 = best)
469 # Use tanh-like curve: depth / (depth + k) where k controls sensitivity
470 norm_queue = queue_depth / (queue_depth + 10.0) if queue_depth >= 0 else 0.0
472 # GPU utilization: normalize 0-100 to 0-1
473 norm_gpu = min(max(gpu_utilization, 0.0), 100.0) / 100.0
475 # Running jobs: normalize with diminishing returns
476 norm_jobs = running_jobs / (running_jobs + 20.0) if running_jobs >= 0 else 0.0
478 # Capacity block trend: invert and shift from [-1,1] to [0,1]
479 # trend +1 (growing) → 0.0 (best), trend -1 (shrinking) → 1.0 (worst)
480 clamped_trend = min(max(capacity_block_trend, -1.0), 1.0)
481 norm_blocks = (1.0 - clamped_trend) / 2.0
483 score = (
484 w["spot_placement"] * norm_spot
485 + w["spot_price"] * norm_price
486 + w["queue_depth"] * norm_queue
487 + w["gpu_utilization"] * norm_gpu
488 + w["running_jobs"] * norm_jobs
489 + w.get("capacity_blocks", 0) * norm_blocks
490 )
492 return round(score, 4)
495def get_multi_region_capacity_checker(
496 config: GCOConfig | None = None,
497) -> MultiRegionCapacityChecker:
498 """Get a configured multi-region capacity checker instance."""
499 return MultiRegionCapacityChecker(config)