Coverage for cli / capacity / checker.py: 86%
501 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Single-region EC2 capacity checker using real AWS signals.
4This is the core capacity intelligence module (~1265 lines). It queries multiple
5AWS APIs to build a comprehensive picture of GPU/accelerator availability in a
6single region. The MultiRegionCapacityChecker in multi_region.py calls this for
7each region in parallel.
9Data Sources:
10 - EC2 GetSpotPlacementScores: likelihood of getting spot capacity (1-10 score)
11 - EC2 DescribeSpotPriceHistory: current and historical spot prices (7-day window)
12 - EC2 DescribeInstanceTypes: vCPU, memory, GPU count/type/memory, EFA support
13 - EC2 DescribeInstanceTypeOfferings: which instance types are available in the region
14 - EC2 DescribeCapacityBlockOfferings: purchasable Capacity Blocks for ML workloads
15 - EC2 PurchaseCapacityBlock: (optional) purchase a Capacity Block by offering ID
17Key Classes:
18 CapacityChecker: Main class. Instantiated with a region and optional GCOConfig.
19 - check_capacity(instance_type) → CapacityEstimate
20 - get_spot_prices(instance_type) → list[SpotPriceInfo]
21 - get_instance_info(instance_type) → InstanceTypeInfo
22 - check_capacity_blocks(instance_type, count, duration) → list[dict]
24Output Models (defined in models.py):
25 - CapacityEstimate: spot score, price, trend, on-demand price, instance specs
26 - SpotPriceInfo: AZ, price, timestamp
27 - InstanceTypeInfo: vCPU, memory, GPU count/type/memory, EFA, architecture
29The GPU_INSTANCE_SPECS lookup table in models.py provides offline specs for common
30GPU instances so the checker can return useful information even when the EC2 API
31is unavailable or the instance type isn't offered in the region.
32"""
34from __future__ import annotations
36import json
37import logging
38import statistics
39from datetime import UTC, datetime, timedelta
40from typing import Any
42import boto3
43from botocore.exceptions import ClientError
45from cli.config import GCOConfig, get_config
47from .models import (
48 GPU_INSTANCE_SPECS,
49 CapacityEstimate,
50 InstanceTypeInfo,
51 SpotPriceInfo,
52)
54logger = logging.getLogger(__name__)
57def _instance_desc(instance_type: str, gpu_count: int, gpu_type: str, total_gpu_mem: float) -> str:
58 """Build a human-readable instance description."""
59 if gpu_count > 0 and gpu_type:
60 mem_str = f", {total_gpu_mem:.0f}GB" if total_gpu_mem else ""
61 return f"{instance_type} ({gpu_count}x {gpu_type}{mem_str})"
62 return instance_type
65class CapacityChecker:
66 """
67 Checks EC2 capacity availability using real AWS capacity signals.
69 Uses:
70 - Spot Placement Score API for spot capacity estimates
71 - EC2 describe-instance-type-offerings for regional availability
72 - Spot price history for pricing trends
73 - On-demand pricing API
74 """
76 def __init__(self, config: GCOConfig | None = None):
77 self.config = config or get_config()
78 self._session = boto3.Session()
79 self._pricing_cache: dict[str, Any] = {}
80 self._offerings_cache: dict[str, set[str]] = {}
82 def get_instance_info(self, instance_type: str) -> InstanceTypeInfo | None:
83 """Get information about an instance type."""
84 if instance_type in GPU_INSTANCE_SPECS:
85 return GPU_INSTANCE_SPECS[instance_type]
87 # Try to get from EC2 API
88 try:
89 ec2 = self._session.client("ec2", region_name="us-east-1")
90 response = ec2.describe_instance_types(InstanceTypes=[instance_type])
92 if response["InstanceTypes"]:
93 info = response["InstanceTypes"][0]
94 vcpus = info["VCpuInfo"]["DefaultVCpus"]
95 memory = info["MemoryInfo"]["SizeInMiB"] / 1024
97 gpu_count = 0
98 gpu_type = None
99 gpu_memory = 0
101 if "GpuInfo" in info:
102 gpus = info["GpuInfo"].get("Gpus", [])
103 if gpus: 103 ↛ 108line 103 didn't jump to line 108 because the condition on line 103 was always true
104 gpu_count = gpus[0].get("Count", 0)
105 gpu_type = gpus[0].get("Name")
106 gpu_memory = gpus[0].get("MemoryInfo", {}).get("SizeInMiB", 0) / 1024
108 arch = info["ProcessorInfo"]["SupportedArchitectures"][0]
110 return InstanceTypeInfo(
111 instance_type=instance_type,
112 vcpus=vcpus,
113 memory_gib=memory,
114 gpu_count=gpu_count,
115 gpu_type=gpu_type,
116 gpu_memory_gib=gpu_memory,
117 architecture=arch,
118 )
119 except ClientError as e:
120 logger.debug("Failed to describe instance type %s: %s", instance_type, e)
121 except Exception as e:
122 logger.warning("Unexpected error getting instance info for %s: %s", instance_type, e)
124 return None
126 def check_instance_available_in_region(self, instance_type: str, region: str) -> bool:
127 """Check if an instance type is offered in a region."""
128 cache_key = f"{region}"
129 if cache_key not in self._offerings_cache:
130 try:
131 ec2 = self._session.client("ec2", region_name=region)
132 paginator = ec2.get_paginator("describe_instance_type_offerings")
133 offerings = set()
134 for page in paginator.paginate(LocationType="region"):
135 for offering in page["InstanceTypeOfferings"]:
136 offerings.add(offering["InstanceType"])
137 self._offerings_cache[cache_key] = offerings
138 except ClientError as e:
139 logger.debug("Failed to check instance offerings in %s: %s", region, e)
140 return False # Assume unavailable if we can't check
141 except Exception as e:
142 logger.warning("Unexpected error checking offerings in %s: %s", region, e)
143 return False
145 return instance_type in self._offerings_cache.get(cache_key, set())
147 def get_availability_zones(self, region: str) -> list[str]:
148 """Get availability zones for a region."""
149 try:
150 ec2 = self._session.client("ec2", region_name=region)
151 response = ec2.describe_availability_zones(
152 Filters=[{"Name": "state", "Values": ["available"]}]
153 )
154 return [az["ZoneName"] for az in response["AvailabilityZones"]]
155 except ClientError as e:
156 logger.debug("Failed to get availability zones for %s: %s", region, e)
157 return []
158 except Exception as e:
159 logger.warning("Unexpected error getting AZs for %s: %s", region, e)
160 return []
162 def get_az_coverage(self, instance_type: str, region: str) -> float | None:
163 """Get the fraction of AZs in a region that offer this instance type.
165 Returns a value between 0.0 and 1.0, or None if we can't determine it.
166 Constrained instances are often available in fewer AZs.
167 """
168 try:
169 ec2 = self._session.client("ec2", region_name=region)
170 total_azs = self.get_availability_zones(region)
171 if not total_azs: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 return None
174 paginator = ec2.get_paginator("describe_instance_type_offerings")
175 offering_azs = set()
176 for page in paginator.paginate( 176 ↛ 183line 176 didn't jump to line 183 because the loop on line 176 didn't complete
177 LocationType="availability-zone",
178 Filters=[{"Name": "instance-type", "Values": [instance_type]}],
179 ):
180 for offering in page["InstanceTypeOfferings"]: 180 ↛ 176line 180 didn't jump to line 176 because the loop on line 180 didn't complete
181 offering_azs.add(offering["Location"])
183 return len(offering_azs) / len(total_azs) if total_azs else None
184 except Exception as e:
185 logger.debug("Failed to get AZ coverage for %s in %s: %s", instance_type, region, e)
186 return None
188 def get_spot_placement_score(
189 self, instance_type: str, region: str, target_capacity: int = 1
190 ) -> dict[str, int]:
191 """
192 Get Spot Placement Score for an instance type.
194 The Spot Placement Score (1-10) indicates the likelihood of getting
195 spot capacity. Higher scores mean better availability.
197 Returns:
198 Dict mapping AZ to score (1-10), or empty if not available
199 """
200 try:
201 ec2 = self._session.client("ec2", region_name=region)
203 response = ec2.get_spot_placement_scores(
204 InstanceTypes=[instance_type],
205 TargetCapacity=target_capacity,
206 TargetCapacityUnitType="units",
207 RegionNames=[region],
208 SingleAvailabilityZone=False,
209 )
211 scores = {}
212 for recommendation in response.get("SpotPlacementScores", []):
213 # Regional score
214 if "AvailabilityZoneId" not in recommendation:
215 scores["regional"] = recommendation.get("Score", 0)
216 else:
217 az_id = recommendation["AvailabilityZoneId"]
218 scores[az_id] = recommendation.get("Score", 0)
220 return scores
222 except ClientError as e:
223 error_code = e.response.get("Error", {}).get("Code", "")
224 if error_code in ("InvalidParameterValue", "UnsupportedOperation"): 224 ↛ 226line 224 didn't jump to line 226 because the condition on line 224 was always true
225 return {}
226 raise
227 except Exception as e:
228 logger.debug(
229 "Failed to get spot placement scores for %s in %s: %s", instance_type, region, e
230 )
231 return {}
233 def get_spot_price_history(
234 self, instance_type: str, region: str, days: int = 7
235 ) -> list[SpotPriceInfo]:
236 """Get spot price history for an instance type."""
237 ec2 = self._session.client("ec2", region_name=region)
239 end_time = datetime.now(UTC)
240 start_time = end_time - timedelta(days=days)
242 try:
243 response = ec2.describe_spot_price_history(
244 InstanceTypes=[instance_type],
245 ProductDescriptions=["Linux/UNIX"],
246 StartTime=start_time,
247 EndTime=end_time,
248 )
250 # Group by availability zone
251 az_prices: dict[str, list[float]] = {}
252 for item in response["SpotPriceHistory"]:
253 az = item["AvailabilityZone"]
254 price = float(item["SpotPrice"])
255 if az not in az_prices:
256 az_prices[az] = []
257 az_prices[az].append(price)
259 results = []
260 for az, prices in az_prices.items():
261 if not prices: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true
262 continue
264 current = prices[0]
265 avg = statistics.mean(prices)
266 min_price = min(prices)
267 max_price = max(prices)
269 if avg > 0: 269 ↛ 274line 269 didn't jump to line 274 because the condition on line 269 was always true
270 std_dev = statistics.stdev(prices) if len(prices) > 1 else 0
271 cv = std_dev / avg
272 stability = max(0, 1 - cv)
273 else:
274 stability = 0
276 results.append(
277 SpotPriceInfo(
278 instance_type=instance_type,
279 availability_zone=az,
280 current_price=current,
281 avg_price_7d=avg,
282 min_price_7d=min_price,
283 max_price_7d=max_price,
284 price_stability=stability,
285 )
286 )
288 return results
290 except ClientError as e:
291 if "InvalidParameterValue" in str(e): 291 ↛ 293line 291 didn't jump to line 293 because the condition on line 291 was always true
292 return []
293 raise
295 def get_on_demand_price(self, instance_type: str, region: str) -> float | None:
296 """Get on-demand price for an instance type."""
297 cache_key = f"{instance_type}:{region}"
298 if cache_key in self._pricing_cache:
299 cached_value = self._pricing_cache[cache_key]
300 return float(cached_value) if cached_value is not None else None
302 try:
303 pricing = self._session.client("pricing", region_name="us-east-1")
305 region_names = {
306 "us-east-1": "US East (N. Virginia)",
307 "us-east-2": "US East (Ohio)",
308 "us-west-1": "US West (N. California)",
309 "us-west-2": "US West (Oregon)",
310 "eu-west-1": "EU (Ireland)",
311 "eu-west-2": "EU (London)",
312 "eu-central-1": "EU (Frankfurt)",
313 "ap-northeast-1": "Asia Pacific (Tokyo)",
314 "ap-southeast-1": "Asia Pacific (Singapore)",
315 "ap-southeast-2": "Asia Pacific (Sydney)",
316 }
318 location = region_names.get(region, region)
320 response = pricing.get_products(
321 ServiceCode="AmazonEC2",
322 Filters=[
323 {"Type": "TERM_MATCH", "Field": "instanceType", "Value": instance_type},
324 {"Type": "TERM_MATCH", "Field": "location", "Value": location},
325 {"Type": "TERM_MATCH", "Field": "operatingSystem", "Value": "Linux"},
326 {"Type": "TERM_MATCH", "Field": "tenancy", "Value": "Shared"},
327 {"Type": "TERM_MATCH", "Field": "preInstalledSw", "Value": "NA"},
328 {"Type": "TERM_MATCH", "Field": "capacitystatus", "Value": "Used"},
329 ],
330 MaxResults=1,
331 )
333 if response["PriceList"]:
334 price_data = json.loads(response["PriceList"][0])
335 terms = price_data.get("terms", {}).get("OnDemand", {})
336 for term in terms.values(): 336 ↛ 349line 336 didn't jump to line 349 because the loop on line 336 didn't complete
337 for price_dim in term.get("priceDimensions", {}).values(): 337 ↛ 336line 337 didn't jump to line 336 because the loop on line 337 didn't complete
338 price = float(price_dim["pricePerUnit"]["USD"])
339 self._pricing_cache[cache_key] = price
340 return price
342 except ClientError as e:
343 logger.debug("Failed to get on-demand price for %s in %s: %s", instance_type, region, e)
344 except Exception as e:
345 logger.warning(
346 "Unexpected error getting pricing for %s in %s: %s", instance_type, region, e
347 )
349 return None
351 def estimate_capacity(
352 self, instance_type: str, region: str, capacity_type: str = "both"
353 ) -> list[CapacityEstimate]:
354 """
355 Estimate capacity availability using real AWS signals.
357 Args:
358 instance_type: EC2 instance type
359 region: AWS region
360 capacity_type: "spot", "on-demand", or "both"
362 Returns:
363 List of CapacityEstimate objects
364 """
365 estimates = []
367 # Check if instance type is available in region
368 if not self.check_instance_available_in_region(instance_type, region):
369 return [
370 CapacityEstimate(
371 instance_type=instance_type,
372 region=region,
373 availability_zone=None,
374 capacity_type="both",
375 availability="unavailable",
376 confidence=1.0,
377 recommendation=f"{instance_type} is not available in {region}",
378 details={"reason": "Instance type not offered in region"},
379 )
380 ]
382 instance_info = self.get_instance_info(instance_type)
384 if capacity_type in ("spot", "both"):
385 spot_estimates = self._estimate_spot_capacity(instance_type, region, instance_info)
386 estimates.extend(spot_estimates)
388 if capacity_type in ("on-demand", "both"):
389 # Pass spot placement scores to on-demand estimator as a scarcity signal
390 spot_scores = (
391 self.get_spot_placement_score(instance_type, region)
392 if capacity_type == "on-demand"
393 else {}
394 )
395 # If we already fetched spot estimates, extract the scores from them
396 if spot_estimates := [e for e in estimates if e.capacity_type == "spot"]:
397 spot_scores = {
398 e.availability_zone or "unknown": e.details.get("spot_placement_score", 0)
399 for e in spot_estimates
400 if e.details.get("spot_placement_score") is not None
401 }
402 # Also gather spot price data for price-ratio signal
403 spot_prices = self.get_spot_price_history(instance_type, region)
404 od_estimate = self._estimate_on_demand_capacity(
405 instance_type, region, instance_info, spot_scores, spot_prices
406 )
407 if od_estimate: 407 ↛ 410line 407 didn't jump to line 410 because the condition on line 407 was always true
408 estimates.append(od_estimate)
410 return estimates
412 def _estimate_spot_capacity(
413 self, instance_type: str, region: str, instance_info: InstanceTypeInfo | None
414 ) -> list[CapacityEstimate]:
415 """Estimate spot capacity using Spot Placement Score and price history."""
416 estimates = []
418 # Get Spot Placement Score (primary signal)
419 placement_scores = self.get_spot_placement_score(instance_type, region)
421 # Get spot prices for pricing info
422 spot_prices = self.get_spot_price_history(instance_type, region)
423 price_by_az = {sp.availability_zone: sp for sp in spot_prices}
425 on_demand_price = self.get_on_demand_price(instance_type, region)
427 # Get AZs in the region
428 azs = self.get_availability_zones(region)
430 if placement_scores:
431 # Use Spot Placement Score as primary signal
432 regional_score = placement_scores.get("regional", 0)
434 for az in azs:
435 # Try to get AZ-specific score, fall back to regional
436 az_id = az # Note: might need to map zone name to zone ID
437 score = placement_scores.get(az_id, regional_score)
439 # Convert score (1-10) to availability
440 if score >= 8:
441 availability = "high"
442 recommendation = "Excellent spot availability"
443 elif score >= 5:
444 availability = "medium"
445 recommendation = "Good spot availability, some interruption risk"
446 elif score >= 3: 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true
447 availability = "low"
448 recommendation = "Limited spot capacity, consider alternatives"
449 else:
450 availability = "low"
451 recommendation = "Very limited spot capacity"
453 spot_info = price_by_az.get(az)
454 price = spot_info.current_price if spot_info else None
456 details: dict[str, Any] = {
457 "spot_placement_score": score,
458 "score_interpretation": f"{score}/10",
459 }
461 if spot_info:
462 details["current_price"] = spot_info.current_price
463 details["avg_price_7d"] = spot_info.avg_price_7d
464 details["price_stability"] = f"{spot_info.price_stability:.2f}"
466 if on_demand_price and price:
467 savings = (1 - price / on_demand_price) * 100
468 details["savings_vs_on_demand"] = f"{savings:.1f}%"
469 details["on_demand_price"] = on_demand_price
471 estimates.append(
472 CapacityEstimate(
473 instance_type=instance_type,
474 region=region,
475 availability_zone=az,
476 capacity_type="spot",
477 availability=availability,
478 confidence=0.85, # Spot Placement Score is reliable
479 price_per_hour=price,
480 recommendation=recommendation,
481 details=details,
482 )
483 )
485 elif spot_prices:
486 # Fall back to price-based estimation if no placement score
487 for spot_info in spot_prices:
488 # Use price stability as a proxy (less reliable)
489 if spot_info.price_stability > 0.8: 489 ↛ 492line 489 didn't jump to line 492 because the condition on line 489 was always true
490 availability = "medium"
491 recommendation = "Spot prices stable, likely available"
492 elif spot_info.price_stability > 0.5:
493 availability = "low"
494 recommendation = "Spot prices volatile, capacity uncertain"
495 else:
496 availability = "low"
497 recommendation = "High price volatility, limited capacity likely"
499 details = {
500 "current_price": spot_info.current_price,
501 "avg_price_7d": spot_info.avg_price_7d,
502 "price_stability": f"{spot_info.price_stability:.2f}",
503 "note": "Estimate based on price history (Spot Placement Score unavailable)",
504 }
506 if on_demand_price: 506 ↛ 510line 506 didn't jump to line 510 because the condition on line 506 was always true
507 savings = (1 - spot_info.current_price / on_demand_price) * 100
508 details["savings_vs_on_demand"] = f"{savings:.1f}%"
510 estimates.append(
511 CapacityEstimate(
512 instance_type=instance_type,
513 region=region,
514 availability_zone=spot_info.availability_zone,
515 capacity_type="spot",
516 availability=availability,
517 confidence=0.5, # Lower confidence without placement score
518 price_per_hour=spot_info.current_price,
519 recommendation=recommendation,
520 details=details,
521 )
522 )
524 if not estimates:
525 estimates.append(
526 CapacityEstimate(
527 instance_type=instance_type,
528 region=region,
529 availability_zone=None,
530 capacity_type="spot",
531 availability="unknown",
532 confidence=0.1,
533 recommendation=f"No spot data available for {instance_type} in {region}",
534 details={"reason": "No spot price history or placement score available"},
535 )
536 )
538 return estimates
540 def _estimate_on_demand_capacity(
541 self,
542 instance_type: str,
543 region: str,
544 instance_info: InstanceTypeInfo | None,
545 spot_placement_scores: dict[str, int] | None = None,
546 spot_prices: list[SpotPriceInfo] | None = None,
547 ) -> CapacityEstimate | None:
548 """Estimate on-demand capacity using live signals for ALL instance types.
550 Uses spot placement scores, instance size (vCPUs, memory, GPUs),
551 pricing, and spot-to-on-demand price ratios as universal scarcity
552 signals — no hardcoded instance families or GPU type lists.
553 """
554 on_demand_price = self.get_on_demand_price(instance_type, region)
556 is_offered = self.check_instance_available_in_region(instance_type, region)
558 if not is_offered:
559 return CapacityEstimate(
560 instance_type=instance_type,
561 region=region,
562 availability_zone=None,
563 capacity_type="on-demand",
564 availability="unavailable",
565 confidence=1.0,
566 recommendation=f"{instance_type} is not offered in {region}",
567 details={"reason": "Instance type not offered in region"},
568 )
570 # Fetch spot placement scores if not provided (on-demand only mode)
571 if spot_placement_scores is None:
572 spot_placement_scores = self.get_spot_placement_score(instance_type, region)
574 if spot_prices is None:
575 spot_prices = self.get_spot_price_history(instance_type, region)
577 az_coverage = self.get_az_coverage(instance_type, region)
579 availability, confidence, recommendation = self._assess_on_demand_availability(
580 instance_type,
581 instance_info,
582 on_demand_price,
583 spot_placement_scores,
584 spot_prices,
585 az_coverage,
586 )
588 if on_demand_price:
589 recommendation += f" Price: ${on_demand_price:.4f}/hr."
590 else:
591 confidence -= 0.1
592 recommendation += " Pricing data unavailable."
594 details: dict[str, Any] = {
595 "price_per_hour": on_demand_price,
596 "is_gpu": instance_info.is_gpu if instance_info else False,
597 }
598 if spot_placement_scores:
599 scores = [s for s in spot_placement_scores.values() if s > 0]
600 if scores: 600 ↛ 603line 600 didn't jump to line 603 because the condition on line 600 was always true
601 details["avg_spot_placement_score"] = round(sum(scores) / len(scores), 1)
603 return CapacityEstimate(
604 instance_type=instance_type,
605 region=region,
606 availability_zone=None,
607 capacity_type="on-demand",
608 availability=availability,
609 confidence=confidence,
610 price_per_hour=on_demand_price,
611 recommendation=recommendation,
612 details=details,
613 )
615 @staticmethod
616 def _assess_on_demand_availability(
617 instance_type: str,
618 instance_info: InstanceTypeInfo | None,
619 on_demand_price: float | None,
620 spot_placement_scores: dict[str, int] | None = None,
621 spot_prices: list[SpotPriceInfo] | None = None,
622 az_coverage: float | None = None,
623 ) -> tuple[str, float, str]:
624 """Assess on-demand availability using only live market signals.
626 Five live signals, zero hardcoded instance families:
627 1. Spot placement score — AWS's own capacity assessment (1-10)
628 2. Spot-to-on-demand price ratio — when spot approaches on-demand price,
629 the spot market has very little excess capacity
630 3. Spot price volatility — unstable prices reflect capacity fluctuations
631 4. AZ coverage — fraction of AZs that offer this instance type;
632 constrained instances are often available in fewer AZs
633 5. Spot price availability — how many AZs have spot price data;
634 missing price data in some AZs suggests limited capacity there
636 Confidence scales with the number of live signals available.
637 When no signals exist, returns "unknown" rather than guessing.
639 Returns:
640 Tuple of (availability, confidence, recommendation)
641 """
642 price = on_demand_price or 0
643 gpu_count = instance_info.gpu_count if instance_info else 0
644 gpu_type = (instance_info.gpu_type or "") if instance_info else ""
645 total_gpu_mem = instance_info.gpu_memory_gib if instance_info else 0
647 # --- Signal 1: Spot placement score ---
648 avg_spot_score = 0.0
649 has_spot_score = False
650 if spot_placement_scores:
651 scores = [s for s in spot_placement_scores.values() if s > 0]
652 if scores: 652 ↛ 657line 652 didn't jump to line 657 because the condition on line 652 was always true
653 avg_spot_score = sum(scores) / len(scores)
654 has_spot_score = True
656 # --- Signal 2 & 3: Spot price ratio and volatility ---
657 avg_spot_ratio = 0.0
658 avg_stability = 1.0
659 has_price_signal = False
660 if spot_prices and price > 0:
661 ratios = [sp.current_price / price for sp in spot_prices if sp.current_price > 0]
662 if ratios: 662 ↛ 665line 662 didn't jump to line 665 because the condition on line 662 was always true
663 avg_spot_ratio = sum(ratios) / len(ratios)
664 has_price_signal = True
665 stabilities = [sp.price_stability for sp in spot_prices]
666 if stabilities: 666 ↛ 670line 666 didn't jump to line 670 because the condition on line 666 was always true
667 avg_stability = sum(stabilities) / len(stabilities)
669 # --- Signal 4: AZ coverage (passed in from caller) ---
670 has_az_signal = az_coverage is not None
672 # --- Combine live signals into scarcity (0.0 - 1.0) ---
673 scarcity = 0.0
674 signal_count = 0
676 if has_spot_score:
677 signal_count += 1
678 if avg_spot_score <= 2: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true
679 scarcity += 0.5
680 elif avg_spot_score <= 4: 680 ↛ 681line 680 didn't jump to line 681 because the condition on line 680 was never true
681 scarcity += 0.3
682 elif avg_spot_score <= 6: 682 ↛ 683line 682 didn't jump to line 683 because the condition on line 682 was never true
683 scarcity += 0.15
685 if has_price_signal:
686 signal_count += 1
687 # Spot price near on-demand = spot market has minimal excess capacity
688 if avg_spot_ratio >= 0.9: 688 ↛ 689line 688 didn't jump to line 689 because the condition on line 688 was never true
689 scarcity += 0.3
690 elif avg_spot_ratio >= 0.7: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true
691 scarcity += 0.15
692 elif avg_spot_ratio >= 0.5: 692 ↛ 696line 692 didn't jump to line 696 because the condition on line 692 was always true
693 scarcity += 0.05
695 # Price instability = capacity fluctuations
696 if avg_stability < 0.6: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true
697 scarcity += 0.1
698 elif avg_stability < 0.8: 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true
699 scarcity += 0.05
701 if has_az_signal and az_coverage is not None: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true
702 signal_count += 1
703 # Available in fewer than half the AZs = constrained
704 if az_coverage <= 0.3:
705 scarcity += 0.2
706 elif az_coverage <= 0.5:
707 scarcity += 0.1
709 # --- Confidence scales with signal count ---
710 confidence = min(0.5 + (signal_count * 0.12), 0.9)
712 # --- Map scarcity to availability ---
713 desc = _instance_desc(instance_type, gpu_count, gpu_type, total_gpu_mem)
715 if signal_count == 0:
716 # No live data — be honest about it
717 return (
718 "unknown",
719 0.3,
720 f"No live capacity signals available for {instance_type}."
721 " Unable to assess on-demand availability.",
722 )
724 if scarcity >= 0.6: 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true
725 return (
726 "low",
727 confidence,
728 f"On-demand {desc} is extremely scarce based on live capacity signals."
729 " Capacity reservations or Capacity Blocks are strongly recommended.",
730 )
732 if scarcity >= 0.35: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true
733 return (
734 "low",
735 confidence,
736 f"On-demand {desc} has limited availability based on live capacity signals."
737 " Consider capacity reservations.",
738 )
740 if scarcity >= 0.15: 740 ↛ 741line 740 didn't jump to line 741 because the condition on line 740 was never true
741 return (
742 "medium",
743 confidence,
744 f"On-demand {instance_type} may have constrained availability"
745 " based on current market conditions.",
746 )
748 return (
749 "high",
750 confidence,
751 f"On-demand capacity likely available for {instance_type}"
752 " based on live capacity signals.",
753 )
755 def recommend_capacity_type(
756 self, instance_type: str, region: str, fault_tolerance: str = "medium"
757 ) -> tuple[str, str]:
758 """
759 Recommend spot vs on-demand based on actual capacity and requirements.
761 Args:
762 instance_type: EC2 instance type
763 region: AWS region
764 fault_tolerance: "high" (can handle interruptions),
765 "medium" (some tolerance),
766 "low" (needs stability)
768 Returns:
769 Tuple of (recommended_capacity_type, explanation)
770 """
771 estimates = self.estimate_capacity(instance_type, region, "both")
773 spot_estimates = [e for e in estimates if e.capacity_type == "spot"]
774 od_estimates = [e for e in estimates if e.capacity_type == "on-demand"]
776 # Check for unavailable
777 if any(e.availability == "unavailable" for e in estimates):
778 return "unavailable", f"{instance_type} is not available in {region}"
780 # Get best spot option (highest availability)
781 best_spot = None
782 if spot_estimates:
783 available_spots = [e for e in spot_estimates if e.availability != "unknown"]
784 if available_spots: 784 ↛ 792line 784 didn't jump to line 792 because the condition on line 784 was always true
785 # Sort by availability (high > medium > low) then by price
786 avail_order = {"high": 0, "medium": 1, "low": 2}
787 best_spot = min(
788 available_spots,
789 key=lambda x: (avail_order.get(x.availability, 3), x.price_per_hour or 999),
790 )
792 od_estimate = od_estimates[0] if od_estimates else None
794 # Decision logic based on fault tolerance and actual availability
795 if fault_tolerance == "low":
796 if od_estimate and od_estimate.availability in ("high", "medium"): 796 ↛ 798line 796 didn't jump to line 798 because the condition on line 796 was always true
797 return "on-demand", "Low fault tolerance requires stable on-demand capacity"
798 return (
799 "on-demand",
800 "On-demand recommended but capacity may be limited; consider capacity reservation",
801 )
803 if best_spot:
804 if best_spot.availability == "high":
805 savings = ""
806 if best_spot.price_per_hour and od_estimate and od_estimate.price_per_hour: 806 ↛ 809line 806 didn't jump to line 809 because the condition on line 806 was always true
807 pct = (1 - best_spot.price_per_hour / od_estimate.price_per_hour) * 100
808 savings = f" (save ~{pct:.0f}%)"
809 return "spot", f"High spot availability (score-based){savings}"
811 if best_spot.availability == "medium":
812 if fault_tolerance == "high":
813 return "spot", "Medium spot availability acceptable with high fault tolerance"
814 return (
815 "on-demand",
816 "Spot availability is medium; on-demand recommended for reliability",
817 )
819 if best_spot.availability == "low": 819 ↛ 828line 819 didn't jump to line 828 because the condition on line 819 was always true
820 if fault_tolerance == "high": 820 ↛ 825line 820 didn't jump to line 825 because the condition on line 820 was always true
821 return (
822 "spot",
823 "Low spot availability but acceptable with high fault tolerance",
824 )
825 return "on-demand", "Spot capacity is limited; on-demand recommended"
827 # Default to on-demand
828 return "on-demand", "On-demand recommended (spot availability unknown or limited)"
830 # -------------------------------------------------------------------------
831 # Capacity Reservations (ODCRs) and Capacity Blocks for ML
832 # -------------------------------------------------------------------------
834 def list_capacity_reservations(
835 self,
836 region: str,
837 instance_type: str | None = None,
838 state: str = "active",
839 ) -> list[dict[str, Any]]:
840 """
841 List EC2 On-Demand Capacity Reservations (ODCRs) in a region.
843 Args:
844 region: AWS region to query
845 instance_type: Filter by instance type (optional)
846 state: Filter by state — "active" (default), or None for all
848 Returns:
849 List of reservation dictionaries with availability details
850 """
851 ec2 = self._session.client("ec2", region_name=region)
853 filters: list[dict[str, Any]] = []
854 if state: 854 ↛ 856line 854 didn't jump to line 856 because the condition on line 854 was always true
855 filters.append({"Name": "state", "Values": [state]})
856 if instance_type:
857 filters.append({"Name": "instance-type", "Values": [instance_type]})
859 reservations: list[dict[str, Any]] = []
860 try:
861 paginator = ec2.get_paginator("describe_capacity_reservations")
862 page_kwargs: dict[str, Any] = {}
863 if filters: 863 ↛ 866line 863 didn't jump to line 866 because the condition on line 863 was always true
864 page_kwargs["Filters"] = filters
866 for page in paginator.paginate(**page_kwargs):
867 for cr in page.get("CapacityReservations", []):
868 total = cr.get("TotalInstanceCount", 0)
869 available = cr.get("AvailableInstanceCount", 0)
870 used = total - available
872 reservations.append(
873 {
874 "type": "odcr",
875 "reservation_id": cr.get("CapacityReservationId"),
876 "instance_type": cr.get("InstanceType"),
877 "availability_zone": cr.get("AvailabilityZone"),
878 "region": region,
879 "state": cr.get("State"),
880 "total_instances": total,
881 "available_instances": available,
882 "used_instances": used,
883 "utilization_pct": round(used / total * 100, 1) if total else 0,
884 "instance_platform": cr.get("InstancePlatform"),
885 "tenancy": cr.get("Tenancy"),
886 "instance_match_criteria": cr.get("InstanceMatchCriteria"),
887 "end_date": (cr["EndDate"].isoformat() if cr.get("EndDate") else None),
888 "end_date_type": cr.get("EndDateType"),
889 "tags": {t["Key"]: t["Value"] for t in cr.get("Tags", [])},
890 }
891 )
892 except ClientError as e:
893 logger.debug("Failed to list capacity reservations in %s: %s", region, e)
895 return reservations
897 def list_capacity_block_offerings(
898 self,
899 region: str,
900 instance_type: str,
901 instance_count: int = 1,
902 duration_hours: int = 24,
903 ) -> list[dict[str, Any]]:
904 """
905 List available Capacity Block offerings for ML workloads.
907 Capacity Blocks provide guaranteed GPU capacity for a fixed duration
908 at a known price — ideal for training jobs with predictable runtimes.
910 Args:
911 region: AWS region to query
912 instance_type: GPU instance type (e.g. p5.48xlarge, p4d.24xlarge)
913 instance_count: Number of instances needed
914 duration_hours: Desired block duration in hours (must be a supported value)
916 Returns:
917 List of available capacity block offerings
918 """
919 ec2 = self._session.client("ec2", region_name=region)
920 offerings: list[dict[str, Any]] = []
922 try:
923 response = ec2.describe_capacity_block_offerings(
924 InstanceType=instance_type,
925 InstanceCount=instance_count,
926 CapacityDurationHours=duration_hours,
927 )
929 for offering in response.get("CapacityBlockOfferings", []):
930 start_date = offering.get("StartDate")
931 end_date = offering.get("EndDate")
932 price = offering.get("UpfrontFee")
934 offerings.append(
935 {
936 "type": "capacity_block",
937 "offering_id": offering.get("CapacityBlockOfferingId"),
938 "instance_type": offering.get("InstanceType"),
939 "availability_zone": offering.get("AvailabilityZone"),
940 "region": region,
941 "instance_count": offering.get("InstanceCount"),
942 "duration_hours": offering.get("CapacityBlockDurationHours"),
943 "start_date": start_date.isoformat() if start_date else None,
944 "end_date": end_date.isoformat() if end_date else None,
945 "upfront_fee": price,
946 "currency": offering.get("CurrencyCode", "USD"),
947 "tenancy": offering.get("Tenancy"),
948 }
949 )
950 except ClientError as e:
951 error_code = e.response.get("Error", {}).get("Code", "")
952 if error_code in ("Unsupported", "InvalidParameterValue"):
953 pass # Instance type doesn't support Capacity Blocks — expected
954 else:
955 logger.warning("Failed to list capacity block offerings in %s: %s", region, e)
957 return offerings
959 def get_capacity_block_trend(
960 self,
961 instance_type: str,
962 region: str,
963 ) -> float:
964 """
965 Estimate capacity block availability trend via time-series regression.
967 Queries offerings across the maximum 182-day (26-week) window, buckets
968 them into weekly bins by start date, and fits a linear regression to
969 the offering counts per week. The normalized slope indicates whether
970 capacity is growing or shrinking over time.
972 Returns:
973 Trend score from -1.0 to 1.0:
974 > 0 = capacity growing (offerings increasing week-over-week)
975 = 0 = stable or no data
976 < 0 = capacity shrinking (offerings decreasing week-over-week)
977 """
978 ec2 = self._session.client("ec2", region_name=region)
980 now = datetime.now(UTC)
981 far_end = now + timedelta(days=182)
983 try:
984 response = ec2.describe_capacity_block_offerings(
985 InstanceType=instance_type,
986 InstanceCount=1,
987 CapacityDurationHours=24, # Minimum duration for broadest results
988 StartDateRange=now,
989 EndDateRange=far_end,
990 )
991 except ClientError, Exception:
992 return 0.0
994 offerings = response.get("CapacityBlockOfferings", [])
995 if not offerings:
996 return 0.0
998 # Bucket offerings into weekly bins (week 0 = this week, week 25 = ~6 months out)
999 num_weeks = 26
1000 bins = [0] * num_weeks
1001 for o in offerings:
1002 start = o.get("StartDate")
1003 if start is None: 1003 ↛ 1004line 1003 didn't jump to line 1004 because the condition on line 1003 was never true
1004 continue
1005 delta_days = (start - now).total_seconds() / 86400.0
1006 week_idx = int(delta_days / 7)
1007 if 0 <= week_idx < num_weeks: 1007 ↛ 1001line 1007 didn't jump to line 1001 because the condition on line 1007 was always true
1008 bins[week_idx] += 1
1010 # Need at least 2 non-zero bins to detect a meaningful trend
1011 non_zero = sum(1 for b in bins if b > 0)
1012 if non_zero < 2:
1013 return 0.0
1015 # Linear regression: slope of offerings-per-week over time
1016 # Using least-squares: slope = Σ((x-x̄)(y-ȳ)) / Σ((x-x̄)²)
1017 n = len(bins)
1018 x_mean = (n - 1) / 2.0
1019 y_mean = statistics.mean(bins)
1021 numerator = sum((i - x_mean) * (bins[i] - y_mean) for i in range(n))
1022 denominator = sum((i - x_mean) ** 2 for i in range(n))
1024 if denominator == 0: 1024 ↛ 1025line 1024 didn't jump to line 1025 because the condition on line 1024 was never true
1025 return 0.0
1027 slope = numerator / denominator
1029 # Normalize slope to -1..1 range relative to the mean offering count.
1030 # A slope of +y_mean per 26 weeks would be a doubling → maps to ~1.0.
1031 normalized = slope * num_weeks / (y_mean * 2) if y_mean > 0 else 0.0
1033 return round(max(-1.0, min(1.0, normalized)), 4)
1035 def list_all_reservations(
1036 self,
1037 instance_type: str | None = None,
1038 regions: list[str] | None = None,
1039 ) -> dict[str, Any]:
1040 """
1041 List all capacity reservations (ODCRs) across deployed regions.
1043 Args:
1044 instance_type: Filter by instance type (optional)
1045 regions: Regions to query (defaults to deployed GCO regions)
1047 Returns:
1048 Summary dict with reservations grouped by region
1049 """
1050 if not regions: 1050 ↛ 1051line 1050 didn't jump to line 1051 because the condition on line 1050 was never true
1051 from cli.aws_client import get_aws_client
1053 aws_client = get_aws_client(self.config)
1054 stacks = aws_client.discover_regional_stacks()
1055 regions = list(stacks.keys()) if stacks else [self.config.default_region]
1057 all_reservations: list[dict[str, Any]] = []
1058 for region in regions:
1059 all_reservations.extend(
1060 self.list_capacity_reservations(region, instance_type=instance_type)
1061 )
1063 total_reserved = sum(r["total_instances"] for r in all_reservations)
1064 total_available = sum(r["available_instances"] for r in all_reservations)
1066 return {
1067 "regions_checked": regions,
1068 "instance_type_filter": instance_type,
1069 "total_reservations": len(all_reservations),
1070 "total_reserved_instances": total_reserved,
1071 "total_available_instances": total_available,
1072 "reservations": all_reservations,
1073 }
1075 def check_reservation_availability(
1076 self,
1077 instance_type: str,
1078 region: str | None = None,
1079 min_count: int = 1,
1080 include_capacity_blocks: bool = True,
1081 block_duration_hours: int = 24,
1082 ) -> dict[str, Any]:
1083 """
1084 Check if capacity reservations or blocks have available instances.
1086 Checks both ODCRs (existing reservations) and Capacity Block offerings
1087 (purchasable guaranteed capacity) for a given instance type.
1089 Args:
1090 instance_type: EC2 instance type to check
1091 region: Specific region (or None to check all deployed regions)
1092 min_count: Minimum number of available instances needed
1093 include_capacity_blocks: Also check Capacity Block offerings
1094 block_duration_hours: Duration for capacity block search
1096 Returns:
1097 Dictionary with ODCR availability and capacity block offerings
1098 """
1099 if region: 1099 ↛ 1102line 1099 didn't jump to line 1102 because the condition on line 1099 was always true
1100 regions = [region]
1101 else:
1102 from cli.aws_client import get_aws_client
1104 aws_client = get_aws_client(self.config)
1105 stacks = aws_client.discover_regional_stacks()
1106 regions = list(stacks.keys()) if stacks else [self.config.default_region]
1108 # Check ODCRs
1109 odcr_results: list[dict[str, Any]] = []
1110 total_available = 0
1111 total_reserved = 0
1113 for r in regions:
1114 reservations = self.list_capacity_reservations(r, instance_type=instance_type)
1115 for res in reservations:
1116 avail = res["available_instances"]
1117 total_available += avail
1118 total_reserved += res["total_instances"]
1119 if avail > 0: 1119 ↛ 1115line 1119 didn't jump to line 1115 because the condition on line 1119 was always true
1120 odcr_results.append(res)
1122 # Check Capacity Block offerings
1123 block_offerings: list[dict[str, Any]] = []
1124 if include_capacity_blocks:
1125 for r in regions:
1126 offerings = self.list_capacity_block_offerings(
1127 r,
1128 instance_type=instance_type,
1129 instance_count=min_count,
1130 duration_hours=block_duration_hours,
1131 )
1132 block_offerings.extend(offerings)
1134 has_odcr = total_available >= min_count
1135 has_blocks = len(block_offerings) > 0
1137 # Build recommendation
1138 if has_odcr:
1139 recommendation = (
1140 f"ODCR capacity available: {total_available} instances "
1141 f"across {len(odcr_results)} reservation(s)"
1142 )
1143 elif has_blocks:
1144 cheapest = min(block_offerings, key=lambda x: x.get("upfront_fee") or float("inf"))
1145 recommendation = (
1146 f"No ODCR capacity, but {len(block_offerings)} Capacity Block offering(s) "
1147 f"available (from ${cheapest.get('upfront_fee', '?')} "
1148 f"for {block_duration_hours}h)"
1149 )
1150 else:
1151 recommendation = (
1152 "No reserved capacity or block offerings found. "
1153 "Consider on-demand or spot, or request a Capacity Block "
1154 "for a different duration/region."
1155 )
1157 return {
1158 "instance_type": instance_type,
1159 "min_count_requested": min_count,
1160 "regions_checked": regions,
1161 "odcr": {
1162 "total_reserved_instances": total_reserved,
1163 "total_available_instances": total_available,
1164 "has_availability": has_odcr,
1165 "reservations": odcr_results,
1166 },
1167 "capacity_blocks": {
1168 "offerings_found": len(block_offerings),
1169 "has_offerings": has_blocks,
1170 "duration_hours": block_duration_hours,
1171 "offerings": block_offerings,
1172 },
1173 "recommendation": recommendation,
1174 }
1176 def purchase_capacity_block(
1177 self,
1178 offering_id: str,
1179 region: str,
1180 dry_run: bool = False,
1181 ) -> dict[str, Any]:
1182 """
1183 Purchase a Capacity Block offering by its ID.
1185 Args:
1186 offering_id: Capacity Block offering ID (cb-xxx) from list_capacity_block_offerings
1187 region: AWS region where the offering exists
1188 dry_run: If True, validate the offering without purchasing
1190 Returns:
1191 Dictionary with the created capacity reservation details
1192 """
1193 ec2 = self._session.client("ec2", region_name=region)
1195 if dry_run:
1196 # Validate the offering exists by describing capacity block offerings
1197 # and matching the ID
1198 try:
1199 # Use EC2 DryRun to validate permissions without purchasing
1200 ec2.purchase_capacity_block(
1201 CapacityBlockOfferingId=offering_id,
1202 InstancePlatform="Linux/UNIX",
1203 DryRun=True,
1204 )
1205 except ClientError as e:
1206 error_code = e.response.get("Error", {}).get("Code", "")
1207 if error_code == "DryRunOperation":
1208 # DryRunOperation means the request would have succeeded
1209 return {
1210 "success": True,
1211 "dry_run": True,
1212 "offering_id": offering_id,
1213 "region": region,
1214 "message": "Dry run succeeded — offering is valid and purchasable",
1215 }
1216 error_msg = e.response.get("Error", {}).get("Message", str(e))
1217 return {
1218 "success": False,
1219 "dry_run": True,
1220 "offering_id": offering_id,
1221 "region": region,
1222 "error_code": error_code,
1223 "error": error_msg,
1224 }
1226 try:
1227 response = ec2.purchase_capacity_block(
1228 CapacityBlockOfferingId=offering_id,
1229 InstancePlatform="Linux/UNIX",
1230 )
1232 reservation = response.get("CapacityReservation", {})
1233 reservation_id = reservation.get("CapacityReservationId", "")
1234 instance_type = reservation.get("InstanceType", "")
1235 az = reservation.get("AvailabilityZone", "")
1236 total = reservation.get("TotalInstanceCount", 0)
1237 start = reservation.get("StartDate")
1238 end = reservation.get("EndDate")
1240 return {
1241 "success": True,
1242 "dry_run": False,
1243 "reservation_id": reservation_id,
1244 "offering_id": offering_id,
1245 "instance_type": instance_type,
1246 "availability_zone": az,
1247 "region": region,
1248 "total_instances": total,
1249 "start_date": start.isoformat() if start else None,
1250 "end_date": end.isoformat() if end else None,
1251 "state": reservation.get("State", ""),
1252 }
1253 except ClientError as e:
1254 error_code = e.response.get("Error", {}).get("Code", "")
1255 error_msg = e.response.get("Error", {}).get("Message", str(e))
1256 return {
1257 "success": False,
1258 "dry_run": False,
1259 "offering_id": offering_id,
1260 "region": region,
1261 "error_code": error_code,
1262 "error": error_msg,
1263 }
1265 def recommend_region_for_job(
1266 self,
1267 gpu_required: bool = False,
1268 min_gpus: int = 0,
1269 instance_type: str | None = None,
1270 gpu_count: int = 0,
1271 ) -> dict[str, Any]:
1272 """
1273 Recommend the optimal region for job placement.
1275 Delegates to MultiRegionCapacityChecker for cross-region analysis.
1277 Args:
1278 gpu_required: Whether the job requires GPUs
1279 min_gpus: Minimum number of GPUs required
1280 instance_type: Specific instance type for workload-aware scoring
1281 gpu_count: Number of GPUs required
1283 Returns:
1284 Dictionary with recommended region and justification
1285 """
1286 from .multi_region import MultiRegionCapacityChecker
1288 checker = MultiRegionCapacityChecker(self.config)
1289 return checker.recommend_region_for_job(
1290 gpu_required, min_gpus, instance_type=instance_type, gpu_count=gpu_count
1291 )
1294def get_capacity_checker(config: GCOConfig | None = None) -> CapacityChecker:
1295 """Get a configured capacity checker instance."""
1296 return CapacityChecker(config)