Coverage for cli / capacity / checker.py: 86%

501 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Single-region EC2 capacity checker using real AWS signals. 

3 

4This is the core capacity intelligence module (~1265 lines). It queries multiple 

5AWS APIs to build a comprehensive picture of GPU/accelerator availability in a 

6single region. The MultiRegionCapacityChecker in multi_region.py calls this for 

7each region in parallel. 

8 

9Data Sources: 

10 - EC2 GetSpotPlacementScores: likelihood of getting spot capacity (1-10 score) 

11 - EC2 DescribeSpotPriceHistory: current and historical spot prices (7-day window) 

12 - EC2 DescribeInstanceTypes: vCPU, memory, GPU count/type/memory, EFA support 

13 - EC2 DescribeInstanceTypeOfferings: which instance types are available in the region 

14 - EC2 DescribeCapacityBlockOfferings: purchasable Capacity Blocks for ML workloads 

15 - EC2 PurchaseCapacityBlock: (optional) purchase a Capacity Block by offering ID 

16 

17Key Classes: 

18 CapacityChecker: Main class. Instantiated with a region and optional GCOConfig. 

19 - check_capacity(instance_type) → CapacityEstimate 

20 - get_spot_prices(instance_type) → list[SpotPriceInfo] 

21 - get_instance_info(instance_type) → InstanceTypeInfo 

22 - check_capacity_blocks(instance_type, count, duration) → list[dict] 

23 

24Output Models (defined in models.py): 

25 - CapacityEstimate: spot score, price, trend, on-demand price, instance specs 

26 - SpotPriceInfo: AZ, price, timestamp 

27 - InstanceTypeInfo: vCPU, memory, GPU count/type/memory, EFA, architecture 

28 

29The GPU_INSTANCE_SPECS lookup table in models.py provides offline specs for common 

30GPU instances so the checker can return useful information even when the EC2 API 

31is unavailable or the instance type isn't offered in the region. 

32""" 

33 

34from __future__ import annotations 

35 

36import json 

37import logging 

38import statistics 

39from datetime import UTC, datetime, timedelta 

40from typing import Any 

41 

42import boto3 

43from botocore.exceptions import ClientError 

44 

45from cli.config import GCOConfig, get_config 

46 

47from .models import ( 

48 GPU_INSTANCE_SPECS, 

49 CapacityEstimate, 

50 InstanceTypeInfo, 

51 SpotPriceInfo, 

52) 

53 

54logger = logging.getLogger(__name__) 

55 

56 

57def _instance_desc(instance_type: str, gpu_count: int, gpu_type: str, total_gpu_mem: float) -> str: 

58 """Build a human-readable instance description.""" 

59 if gpu_count > 0 and gpu_type: 

60 mem_str = f", {total_gpu_mem:.0f}GB" if total_gpu_mem else "" 

61 return f"{instance_type} ({gpu_count}x {gpu_type}{mem_str})" 

62 return instance_type 

63 

64 

65class CapacityChecker: 

66 """ 

67 Checks EC2 capacity availability using real AWS capacity signals. 

68 

69 Uses: 

70 - Spot Placement Score API for spot capacity estimates 

71 - EC2 describe-instance-type-offerings for regional availability 

72 - Spot price history for pricing trends 

73 - On-demand pricing API 

74 """ 

75 

76 def __init__(self, config: GCOConfig | None = None): 

77 self.config = config or get_config() 

78 self._session = boto3.Session() 

79 self._pricing_cache: dict[str, Any] = {} 

80 self._offerings_cache: dict[str, set[str]] = {} 

81 

82 def get_instance_info(self, instance_type: str) -> InstanceTypeInfo | None: 

83 """Get information about an instance type.""" 

84 if instance_type in GPU_INSTANCE_SPECS: 

85 return GPU_INSTANCE_SPECS[instance_type] 

86 

87 # Try to get from EC2 API 

88 try: 

89 ec2 = self._session.client("ec2", region_name="us-east-1") 

90 response = ec2.describe_instance_types(InstanceTypes=[instance_type]) 

91 

92 if response["InstanceTypes"]: 

93 info = response["InstanceTypes"][0] 

94 vcpus = info["VCpuInfo"]["DefaultVCpus"] 

95 memory = info["MemoryInfo"]["SizeInMiB"] / 1024 

96 

97 gpu_count = 0 

98 gpu_type = None 

99 gpu_memory = 0 

100 

101 if "GpuInfo" in info: 

102 gpus = info["GpuInfo"].get("Gpus", []) 

103 if gpus: 103 ↛ 108line 103 didn't jump to line 108 because the condition on line 103 was always true

104 gpu_count = gpus[0].get("Count", 0) 

105 gpu_type = gpus[0].get("Name") 

106 gpu_memory = gpus[0].get("MemoryInfo", {}).get("SizeInMiB", 0) / 1024 

107 

108 arch = info["ProcessorInfo"]["SupportedArchitectures"][0] 

109 

110 return InstanceTypeInfo( 

111 instance_type=instance_type, 

112 vcpus=vcpus, 

113 memory_gib=memory, 

114 gpu_count=gpu_count, 

115 gpu_type=gpu_type, 

116 gpu_memory_gib=gpu_memory, 

117 architecture=arch, 

118 ) 

119 except ClientError as e: 

120 logger.debug("Failed to describe instance type %s: %s", instance_type, e) 

121 except Exception as e: 

122 logger.warning("Unexpected error getting instance info for %s: %s", instance_type, e) 

123 

124 return None 

125 

126 def check_instance_available_in_region(self, instance_type: str, region: str) -> bool: 

127 """Check if an instance type is offered in a region.""" 

128 cache_key = f"{region}" 

129 if cache_key not in self._offerings_cache: 

130 try: 

131 ec2 = self._session.client("ec2", region_name=region) 

132 paginator = ec2.get_paginator("describe_instance_type_offerings") 

133 offerings = set() 

134 for page in paginator.paginate(LocationType="region"): 

135 for offering in page["InstanceTypeOfferings"]: 

136 offerings.add(offering["InstanceType"]) 

137 self._offerings_cache[cache_key] = offerings 

138 except ClientError as e: 

139 logger.debug("Failed to check instance offerings in %s: %s", region, e) 

140 return False # Assume unavailable if we can't check 

141 except Exception as e: 

142 logger.warning("Unexpected error checking offerings in %s: %s", region, e) 

143 return False 

144 

145 return instance_type in self._offerings_cache.get(cache_key, set()) 

146 

147 def get_availability_zones(self, region: str) -> list[str]: 

148 """Get availability zones for a region.""" 

149 try: 

150 ec2 = self._session.client("ec2", region_name=region) 

151 response = ec2.describe_availability_zones( 

152 Filters=[{"Name": "state", "Values": ["available"]}] 

153 ) 

154 return [az["ZoneName"] for az in response["AvailabilityZones"]] 

155 except ClientError as e: 

156 logger.debug("Failed to get availability zones for %s: %s", region, e) 

157 return [] 

158 except Exception as e: 

159 logger.warning("Unexpected error getting AZs for %s: %s", region, e) 

160 return [] 

161 

162 def get_az_coverage(self, instance_type: str, region: str) -> float | None: 

163 """Get the fraction of AZs in a region that offer this instance type. 

164 

165 Returns a value between 0.0 and 1.0, or None if we can't determine it. 

166 Constrained instances are often available in fewer AZs. 

167 """ 

168 try: 

169 ec2 = self._session.client("ec2", region_name=region) 

170 total_azs = self.get_availability_zones(region) 

171 if not total_azs: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 return None 

173 

174 paginator = ec2.get_paginator("describe_instance_type_offerings") 

175 offering_azs = set() 

176 for page in paginator.paginate( 176 ↛ 183line 176 didn't jump to line 183 because the loop on line 176 didn't complete

177 LocationType="availability-zone", 

178 Filters=[{"Name": "instance-type", "Values": [instance_type]}], 

179 ): 

180 for offering in page["InstanceTypeOfferings"]: 180 ↛ 176line 180 didn't jump to line 176 because the loop on line 180 didn't complete

181 offering_azs.add(offering["Location"]) 

182 

183 return len(offering_azs) / len(total_azs) if total_azs else None 

184 except Exception as e: 

185 logger.debug("Failed to get AZ coverage for %s in %s: %s", instance_type, region, e) 

186 return None 

187 

188 def get_spot_placement_score( 

189 self, instance_type: str, region: str, target_capacity: int = 1 

190 ) -> dict[str, int]: 

191 """ 

192 Get Spot Placement Score for an instance type. 

193 

194 The Spot Placement Score (1-10) indicates the likelihood of getting 

195 spot capacity. Higher scores mean better availability. 

196 

197 Returns: 

198 Dict mapping AZ to score (1-10), or empty if not available 

199 """ 

200 try: 

201 ec2 = self._session.client("ec2", region_name=region) 

202 

203 response = ec2.get_spot_placement_scores( 

204 InstanceTypes=[instance_type], 

205 TargetCapacity=target_capacity, 

206 TargetCapacityUnitType="units", 

207 RegionNames=[region], 

208 SingleAvailabilityZone=False, 

209 ) 

210 

211 scores = {} 

212 for recommendation in response.get("SpotPlacementScores", []): 

213 # Regional score 

214 if "AvailabilityZoneId" not in recommendation: 

215 scores["regional"] = recommendation.get("Score", 0) 

216 else: 

217 az_id = recommendation["AvailabilityZoneId"] 

218 scores[az_id] = recommendation.get("Score", 0) 

219 

220 return scores 

221 

222 except ClientError as e: 

223 error_code = e.response.get("Error", {}).get("Code", "") 

224 if error_code in ("InvalidParameterValue", "UnsupportedOperation"): 224 ↛ 226line 224 didn't jump to line 226 because the condition on line 224 was always true

225 return {} 

226 raise 

227 except Exception as e: 

228 logger.debug( 

229 "Failed to get spot placement scores for %s in %s: %s", instance_type, region, e 

230 ) 

231 return {} 

232 

233 def get_spot_price_history( 

234 self, instance_type: str, region: str, days: int = 7 

235 ) -> list[SpotPriceInfo]: 

236 """Get spot price history for an instance type.""" 

237 ec2 = self._session.client("ec2", region_name=region) 

238 

239 end_time = datetime.now(UTC) 

240 start_time = end_time - timedelta(days=days) 

241 

242 try: 

243 response = ec2.describe_spot_price_history( 

244 InstanceTypes=[instance_type], 

245 ProductDescriptions=["Linux/UNIX"], 

246 StartTime=start_time, 

247 EndTime=end_time, 

248 ) 

249 

250 # Group by availability zone 

251 az_prices: dict[str, list[float]] = {} 

252 for item in response["SpotPriceHistory"]: 

253 az = item["AvailabilityZone"] 

254 price = float(item["SpotPrice"]) 

255 if az not in az_prices: 

256 az_prices[az] = [] 

257 az_prices[az].append(price) 

258 

259 results = [] 

260 for az, prices in az_prices.items(): 

261 if not prices: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true

262 continue 

263 

264 current = prices[0] 

265 avg = statistics.mean(prices) 

266 min_price = min(prices) 

267 max_price = max(prices) 

268 

269 if avg > 0: 269 ↛ 274line 269 didn't jump to line 274 because the condition on line 269 was always true

270 std_dev = statistics.stdev(prices) if len(prices) > 1 else 0 

271 cv = std_dev / avg 

272 stability = max(0, 1 - cv) 

273 else: 

274 stability = 0 

275 

276 results.append( 

277 SpotPriceInfo( 

278 instance_type=instance_type, 

279 availability_zone=az, 

280 current_price=current, 

281 avg_price_7d=avg, 

282 min_price_7d=min_price, 

283 max_price_7d=max_price, 

284 price_stability=stability, 

285 ) 

286 ) 

287 

288 return results 

289 

290 except ClientError as e: 

291 if "InvalidParameterValue" in str(e): 291 ↛ 293line 291 didn't jump to line 293 because the condition on line 291 was always true

292 return [] 

293 raise 

294 

295 def get_on_demand_price(self, instance_type: str, region: str) -> float | None: 

296 """Get on-demand price for an instance type.""" 

297 cache_key = f"{instance_type}:{region}" 

298 if cache_key in self._pricing_cache: 

299 cached_value = self._pricing_cache[cache_key] 

300 return float(cached_value) if cached_value is not None else None 

301 

302 try: 

303 pricing = self._session.client("pricing", region_name="us-east-1") 

304 

305 region_names = { 

306 "us-east-1": "US East (N. Virginia)", 

307 "us-east-2": "US East (Ohio)", 

308 "us-west-1": "US West (N. California)", 

309 "us-west-2": "US West (Oregon)", 

310 "eu-west-1": "EU (Ireland)", 

311 "eu-west-2": "EU (London)", 

312 "eu-central-1": "EU (Frankfurt)", 

313 "ap-northeast-1": "Asia Pacific (Tokyo)", 

314 "ap-southeast-1": "Asia Pacific (Singapore)", 

315 "ap-southeast-2": "Asia Pacific (Sydney)", 

316 } 

317 

318 location = region_names.get(region, region) 

319 

320 response = pricing.get_products( 

321 ServiceCode="AmazonEC2", 

322 Filters=[ 

323 {"Type": "TERM_MATCH", "Field": "instanceType", "Value": instance_type}, 

324 {"Type": "TERM_MATCH", "Field": "location", "Value": location}, 

325 {"Type": "TERM_MATCH", "Field": "operatingSystem", "Value": "Linux"}, 

326 {"Type": "TERM_MATCH", "Field": "tenancy", "Value": "Shared"}, 

327 {"Type": "TERM_MATCH", "Field": "preInstalledSw", "Value": "NA"}, 

328 {"Type": "TERM_MATCH", "Field": "capacitystatus", "Value": "Used"}, 

329 ], 

330 MaxResults=1, 

331 ) 

332 

333 if response["PriceList"]: 

334 price_data = json.loads(response["PriceList"][0]) 

335 terms = price_data.get("terms", {}).get("OnDemand", {}) 

336 for term in terms.values(): 336 ↛ 349line 336 didn't jump to line 349 because the loop on line 336 didn't complete

337 for price_dim in term.get("priceDimensions", {}).values(): 337 ↛ 336line 337 didn't jump to line 336 because the loop on line 337 didn't complete

338 price = float(price_dim["pricePerUnit"]["USD"]) 

339 self._pricing_cache[cache_key] = price 

340 return price 

341 

342 except ClientError as e: 

343 logger.debug("Failed to get on-demand price for %s in %s: %s", instance_type, region, e) 

344 except Exception as e: 

345 logger.warning( 

346 "Unexpected error getting pricing for %s in %s: %s", instance_type, region, e 

347 ) 

348 

349 return None 

350 

351 def estimate_capacity( 

352 self, instance_type: str, region: str, capacity_type: str = "both" 

353 ) -> list[CapacityEstimate]: 

354 """ 

355 Estimate capacity availability using real AWS signals. 

356 

357 Args: 

358 instance_type: EC2 instance type 

359 region: AWS region 

360 capacity_type: "spot", "on-demand", or "both" 

361 

362 Returns: 

363 List of CapacityEstimate objects 

364 """ 

365 estimates = [] 

366 

367 # Check if instance type is available in region 

368 if not self.check_instance_available_in_region(instance_type, region): 

369 return [ 

370 CapacityEstimate( 

371 instance_type=instance_type, 

372 region=region, 

373 availability_zone=None, 

374 capacity_type="both", 

375 availability="unavailable", 

376 confidence=1.0, 

377 recommendation=f"{instance_type} is not available in {region}", 

378 details={"reason": "Instance type not offered in region"}, 

379 ) 

380 ] 

381 

382 instance_info = self.get_instance_info(instance_type) 

383 

384 if capacity_type in ("spot", "both"): 

385 spot_estimates = self._estimate_spot_capacity(instance_type, region, instance_info) 

386 estimates.extend(spot_estimates) 

387 

388 if capacity_type in ("on-demand", "both"): 

389 # Pass spot placement scores to on-demand estimator as a scarcity signal 

390 spot_scores = ( 

391 self.get_spot_placement_score(instance_type, region) 

392 if capacity_type == "on-demand" 

393 else {} 

394 ) 

395 # If we already fetched spot estimates, extract the scores from them 

396 if spot_estimates := [e for e in estimates if e.capacity_type == "spot"]: 

397 spot_scores = { 

398 e.availability_zone or "unknown": e.details.get("spot_placement_score", 0) 

399 for e in spot_estimates 

400 if e.details.get("spot_placement_score") is not None 

401 } 

402 # Also gather spot price data for price-ratio signal 

403 spot_prices = self.get_spot_price_history(instance_type, region) 

404 od_estimate = self._estimate_on_demand_capacity( 

405 instance_type, region, instance_info, spot_scores, spot_prices 

406 ) 

407 if od_estimate: 407 ↛ 410line 407 didn't jump to line 410 because the condition on line 407 was always true

408 estimates.append(od_estimate) 

409 

410 return estimates 

411 

412 def _estimate_spot_capacity( 

413 self, instance_type: str, region: str, instance_info: InstanceTypeInfo | None 

414 ) -> list[CapacityEstimate]: 

415 """Estimate spot capacity using Spot Placement Score and price history.""" 

416 estimates = [] 

417 

418 # Get Spot Placement Score (primary signal) 

419 placement_scores = self.get_spot_placement_score(instance_type, region) 

420 

421 # Get spot prices for pricing info 

422 spot_prices = self.get_spot_price_history(instance_type, region) 

423 price_by_az = {sp.availability_zone: sp for sp in spot_prices} 

424 

425 on_demand_price = self.get_on_demand_price(instance_type, region) 

426 

427 # Get AZs in the region 

428 azs = self.get_availability_zones(region) 

429 

430 if placement_scores: 

431 # Use Spot Placement Score as primary signal 

432 regional_score = placement_scores.get("regional", 0) 

433 

434 for az in azs: 

435 # Try to get AZ-specific score, fall back to regional 

436 az_id = az # Note: might need to map zone name to zone ID 

437 score = placement_scores.get(az_id, regional_score) 

438 

439 # Convert score (1-10) to availability 

440 if score >= 8: 

441 availability = "high" 

442 recommendation = "Excellent spot availability" 

443 elif score >= 5: 

444 availability = "medium" 

445 recommendation = "Good spot availability, some interruption risk" 

446 elif score >= 3: 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true

447 availability = "low" 

448 recommendation = "Limited spot capacity, consider alternatives" 

449 else: 

450 availability = "low" 

451 recommendation = "Very limited spot capacity" 

452 

453 spot_info = price_by_az.get(az) 

454 price = spot_info.current_price if spot_info else None 

455 

456 details: dict[str, Any] = { 

457 "spot_placement_score": score, 

458 "score_interpretation": f"{score}/10", 

459 } 

460 

461 if spot_info: 

462 details["current_price"] = spot_info.current_price 

463 details["avg_price_7d"] = spot_info.avg_price_7d 

464 details["price_stability"] = f"{spot_info.price_stability:.2f}" 

465 

466 if on_demand_price and price: 

467 savings = (1 - price / on_demand_price) * 100 

468 details["savings_vs_on_demand"] = f"{savings:.1f}%" 

469 details["on_demand_price"] = on_demand_price 

470 

471 estimates.append( 

472 CapacityEstimate( 

473 instance_type=instance_type, 

474 region=region, 

475 availability_zone=az, 

476 capacity_type="spot", 

477 availability=availability, 

478 confidence=0.85, # Spot Placement Score is reliable 

479 price_per_hour=price, 

480 recommendation=recommendation, 

481 details=details, 

482 ) 

483 ) 

484 

485 elif spot_prices: 

486 # Fall back to price-based estimation if no placement score 

487 for spot_info in spot_prices: 

488 # Use price stability as a proxy (less reliable) 

489 if spot_info.price_stability > 0.8: 489 ↛ 492line 489 didn't jump to line 492 because the condition on line 489 was always true

490 availability = "medium" 

491 recommendation = "Spot prices stable, likely available" 

492 elif spot_info.price_stability > 0.5: 

493 availability = "low" 

494 recommendation = "Spot prices volatile, capacity uncertain" 

495 else: 

496 availability = "low" 

497 recommendation = "High price volatility, limited capacity likely" 

498 

499 details = { 

500 "current_price": spot_info.current_price, 

501 "avg_price_7d": spot_info.avg_price_7d, 

502 "price_stability": f"{spot_info.price_stability:.2f}", 

503 "note": "Estimate based on price history (Spot Placement Score unavailable)", 

504 } 

505 

506 if on_demand_price: 506 ↛ 510line 506 didn't jump to line 510 because the condition on line 506 was always true

507 savings = (1 - spot_info.current_price / on_demand_price) * 100 

508 details["savings_vs_on_demand"] = f"{savings:.1f}%" 

509 

510 estimates.append( 

511 CapacityEstimate( 

512 instance_type=instance_type, 

513 region=region, 

514 availability_zone=spot_info.availability_zone, 

515 capacity_type="spot", 

516 availability=availability, 

517 confidence=0.5, # Lower confidence without placement score 

518 price_per_hour=spot_info.current_price, 

519 recommendation=recommendation, 

520 details=details, 

521 ) 

522 ) 

523 

524 if not estimates: 

525 estimates.append( 

526 CapacityEstimate( 

527 instance_type=instance_type, 

528 region=region, 

529 availability_zone=None, 

530 capacity_type="spot", 

531 availability="unknown", 

532 confidence=0.1, 

533 recommendation=f"No spot data available for {instance_type} in {region}", 

534 details={"reason": "No spot price history or placement score available"}, 

535 ) 

536 ) 

537 

538 return estimates 

539 

540 def _estimate_on_demand_capacity( 

541 self, 

542 instance_type: str, 

543 region: str, 

544 instance_info: InstanceTypeInfo | None, 

545 spot_placement_scores: dict[str, int] | None = None, 

546 spot_prices: list[SpotPriceInfo] | None = None, 

547 ) -> CapacityEstimate | None: 

548 """Estimate on-demand capacity using live signals for ALL instance types. 

549 

550 Uses spot placement scores, instance size (vCPUs, memory, GPUs), 

551 pricing, and spot-to-on-demand price ratios as universal scarcity 

552 signals — no hardcoded instance families or GPU type lists. 

553 """ 

554 on_demand_price = self.get_on_demand_price(instance_type, region) 

555 

556 is_offered = self.check_instance_available_in_region(instance_type, region) 

557 

558 if not is_offered: 

559 return CapacityEstimate( 

560 instance_type=instance_type, 

561 region=region, 

562 availability_zone=None, 

563 capacity_type="on-demand", 

564 availability="unavailable", 

565 confidence=1.0, 

566 recommendation=f"{instance_type} is not offered in {region}", 

567 details={"reason": "Instance type not offered in region"}, 

568 ) 

569 

570 # Fetch spot placement scores if not provided (on-demand only mode) 

571 if spot_placement_scores is None: 

572 spot_placement_scores = self.get_spot_placement_score(instance_type, region) 

573 

574 if spot_prices is None: 

575 spot_prices = self.get_spot_price_history(instance_type, region) 

576 

577 az_coverage = self.get_az_coverage(instance_type, region) 

578 

579 availability, confidence, recommendation = self._assess_on_demand_availability( 

580 instance_type, 

581 instance_info, 

582 on_demand_price, 

583 spot_placement_scores, 

584 spot_prices, 

585 az_coverage, 

586 ) 

587 

588 if on_demand_price: 

589 recommendation += f" Price: ${on_demand_price:.4f}/hr." 

590 else: 

591 confidence -= 0.1 

592 recommendation += " Pricing data unavailable." 

593 

594 details: dict[str, Any] = { 

595 "price_per_hour": on_demand_price, 

596 "is_gpu": instance_info.is_gpu if instance_info else False, 

597 } 

598 if spot_placement_scores: 

599 scores = [s for s in spot_placement_scores.values() if s > 0] 

600 if scores: 600 ↛ 603line 600 didn't jump to line 603 because the condition on line 600 was always true

601 details["avg_spot_placement_score"] = round(sum(scores) / len(scores), 1) 

602 

603 return CapacityEstimate( 

604 instance_type=instance_type, 

605 region=region, 

606 availability_zone=None, 

607 capacity_type="on-demand", 

608 availability=availability, 

609 confidence=confidence, 

610 price_per_hour=on_demand_price, 

611 recommendation=recommendation, 

612 details=details, 

613 ) 

614 

615 @staticmethod 

616 def _assess_on_demand_availability( 

617 instance_type: str, 

618 instance_info: InstanceTypeInfo | None, 

619 on_demand_price: float | None, 

620 spot_placement_scores: dict[str, int] | None = None, 

621 spot_prices: list[SpotPriceInfo] | None = None, 

622 az_coverage: float | None = None, 

623 ) -> tuple[str, float, str]: 

624 """Assess on-demand availability using only live market signals. 

625 

626 Five live signals, zero hardcoded instance families: 

627 1. Spot placement score — AWS's own capacity assessment (1-10) 

628 2. Spot-to-on-demand price ratio — when spot approaches on-demand price, 

629 the spot market has very little excess capacity 

630 3. Spot price volatility — unstable prices reflect capacity fluctuations 

631 4. AZ coverage — fraction of AZs that offer this instance type; 

632 constrained instances are often available in fewer AZs 

633 5. Spot price availability — how many AZs have spot price data; 

634 missing price data in some AZs suggests limited capacity there 

635 

636 Confidence scales with the number of live signals available. 

637 When no signals exist, returns "unknown" rather than guessing. 

638 

639 Returns: 

640 Tuple of (availability, confidence, recommendation) 

641 """ 

642 price = on_demand_price or 0 

643 gpu_count = instance_info.gpu_count if instance_info else 0 

644 gpu_type = (instance_info.gpu_type or "") if instance_info else "" 

645 total_gpu_mem = instance_info.gpu_memory_gib if instance_info else 0 

646 

647 # --- Signal 1: Spot placement score --- 

648 avg_spot_score = 0.0 

649 has_spot_score = False 

650 if spot_placement_scores: 

651 scores = [s for s in spot_placement_scores.values() if s > 0] 

652 if scores: 652 ↛ 657line 652 didn't jump to line 657 because the condition on line 652 was always true

653 avg_spot_score = sum(scores) / len(scores) 

654 has_spot_score = True 

655 

656 # --- Signal 2 & 3: Spot price ratio and volatility --- 

657 avg_spot_ratio = 0.0 

658 avg_stability = 1.0 

659 has_price_signal = False 

660 if spot_prices and price > 0: 

661 ratios = [sp.current_price / price for sp in spot_prices if sp.current_price > 0] 

662 if ratios: 662 ↛ 665line 662 didn't jump to line 665 because the condition on line 662 was always true

663 avg_spot_ratio = sum(ratios) / len(ratios) 

664 has_price_signal = True 

665 stabilities = [sp.price_stability for sp in spot_prices] 

666 if stabilities: 666 ↛ 670line 666 didn't jump to line 670 because the condition on line 666 was always true

667 avg_stability = sum(stabilities) / len(stabilities) 

668 

669 # --- Signal 4: AZ coverage (passed in from caller) --- 

670 has_az_signal = az_coverage is not None 

671 

672 # --- Combine live signals into scarcity (0.0 - 1.0) --- 

673 scarcity = 0.0 

674 signal_count = 0 

675 

676 if has_spot_score: 

677 signal_count += 1 

678 if avg_spot_score <= 2: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true

679 scarcity += 0.5 

680 elif avg_spot_score <= 4: 680 ↛ 681line 680 didn't jump to line 681 because the condition on line 680 was never true

681 scarcity += 0.3 

682 elif avg_spot_score <= 6: 682 ↛ 683line 682 didn't jump to line 683 because the condition on line 682 was never true

683 scarcity += 0.15 

684 

685 if has_price_signal: 

686 signal_count += 1 

687 # Spot price near on-demand = spot market has minimal excess capacity 

688 if avg_spot_ratio >= 0.9: 688 ↛ 689line 688 didn't jump to line 689 because the condition on line 688 was never true

689 scarcity += 0.3 

690 elif avg_spot_ratio >= 0.7: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true

691 scarcity += 0.15 

692 elif avg_spot_ratio >= 0.5: 692 ↛ 696line 692 didn't jump to line 696 because the condition on line 692 was always true

693 scarcity += 0.05 

694 

695 # Price instability = capacity fluctuations 

696 if avg_stability < 0.6: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true

697 scarcity += 0.1 

698 elif avg_stability < 0.8: 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true

699 scarcity += 0.05 

700 

701 if has_az_signal and az_coverage is not None: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true

702 signal_count += 1 

703 # Available in fewer than half the AZs = constrained 

704 if az_coverage <= 0.3: 

705 scarcity += 0.2 

706 elif az_coverage <= 0.5: 

707 scarcity += 0.1 

708 

709 # --- Confidence scales with signal count --- 

710 confidence = min(0.5 + (signal_count * 0.12), 0.9) 

711 

712 # --- Map scarcity to availability --- 

713 desc = _instance_desc(instance_type, gpu_count, gpu_type, total_gpu_mem) 

714 

715 if signal_count == 0: 

716 # No live data — be honest about it 

717 return ( 

718 "unknown", 

719 0.3, 

720 f"No live capacity signals available for {instance_type}." 

721 " Unable to assess on-demand availability.", 

722 ) 

723 

724 if scarcity >= 0.6: 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true

725 return ( 

726 "low", 

727 confidence, 

728 f"On-demand {desc} is extremely scarce based on live capacity signals." 

729 " Capacity reservations or Capacity Blocks are strongly recommended.", 

730 ) 

731 

732 if scarcity >= 0.35: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true

733 return ( 

734 "low", 

735 confidence, 

736 f"On-demand {desc} has limited availability based on live capacity signals." 

737 " Consider capacity reservations.", 

738 ) 

739 

740 if scarcity >= 0.15: 740 ↛ 741line 740 didn't jump to line 741 because the condition on line 740 was never true

741 return ( 

742 "medium", 

743 confidence, 

744 f"On-demand {instance_type} may have constrained availability" 

745 " based on current market conditions.", 

746 ) 

747 

748 return ( 

749 "high", 

750 confidence, 

751 f"On-demand capacity likely available for {instance_type}" 

752 " based on live capacity signals.", 

753 ) 

754 

755 def recommend_capacity_type( 

756 self, instance_type: str, region: str, fault_tolerance: str = "medium" 

757 ) -> tuple[str, str]: 

758 """ 

759 Recommend spot vs on-demand based on actual capacity and requirements. 

760 

761 Args: 

762 instance_type: EC2 instance type 

763 region: AWS region 

764 fault_tolerance: "high" (can handle interruptions), 

765 "medium" (some tolerance), 

766 "low" (needs stability) 

767 

768 Returns: 

769 Tuple of (recommended_capacity_type, explanation) 

770 """ 

771 estimates = self.estimate_capacity(instance_type, region, "both") 

772 

773 spot_estimates = [e for e in estimates if e.capacity_type == "spot"] 

774 od_estimates = [e for e in estimates if e.capacity_type == "on-demand"] 

775 

776 # Check for unavailable 

777 if any(e.availability == "unavailable" for e in estimates): 

778 return "unavailable", f"{instance_type} is not available in {region}" 

779 

780 # Get best spot option (highest availability) 

781 best_spot = None 

782 if spot_estimates: 

783 available_spots = [e for e in spot_estimates if e.availability != "unknown"] 

784 if available_spots: 784 ↛ 792line 784 didn't jump to line 792 because the condition on line 784 was always true

785 # Sort by availability (high > medium > low) then by price 

786 avail_order = {"high": 0, "medium": 1, "low": 2} 

787 best_spot = min( 

788 available_spots, 

789 key=lambda x: (avail_order.get(x.availability, 3), x.price_per_hour or 999), 

790 ) 

791 

792 od_estimate = od_estimates[0] if od_estimates else None 

793 

794 # Decision logic based on fault tolerance and actual availability 

795 if fault_tolerance == "low": 

796 if od_estimate and od_estimate.availability in ("high", "medium"): 796 ↛ 798line 796 didn't jump to line 798 because the condition on line 796 was always true

797 return "on-demand", "Low fault tolerance requires stable on-demand capacity" 

798 return ( 

799 "on-demand", 

800 "On-demand recommended but capacity may be limited; consider capacity reservation", 

801 ) 

802 

803 if best_spot: 

804 if best_spot.availability == "high": 

805 savings = "" 

806 if best_spot.price_per_hour and od_estimate and od_estimate.price_per_hour: 806 ↛ 809line 806 didn't jump to line 809 because the condition on line 806 was always true

807 pct = (1 - best_spot.price_per_hour / od_estimate.price_per_hour) * 100 

808 savings = f" (save ~{pct:.0f}%)" 

809 return "spot", f"High spot availability (score-based){savings}" 

810 

811 if best_spot.availability == "medium": 

812 if fault_tolerance == "high": 

813 return "spot", "Medium spot availability acceptable with high fault tolerance" 

814 return ( 

815 "on-demand", 

816 "Spot availability is medium; on-demand recommended for reliability", 

817 ) 

818 

819 if best_spot.availability == "low": 819 ↛ 828line 819 didn't jump to line 828 because the condition on line 819 was always true

820 if fault_tolerance == "high": 820 ↛ 825line 820 didn't jump to line 825 because the condition on line 820 was always true

821 return ( 

822 "spot", 

823 "Low spot availability but acceptable with high fault tolerance", 

824 ) 

825 return "on-demand", "Spot capacity is limited; on-demand recommended" 

826 

827 # Default to on-demand 

828 return "on-demand", "On-demand recommended (spot availability unknown or limited)" 

829 

830 # ------------------------------------------------------------------------- 

831 # Capacity Reservations (ODCRs) and Capacity Blocks for ML 

832 # ------------------------------------------------------------------------- 

833 

834 def list_capacity_reservations( 

835 self, 

836 region: str, 

837 instance_type: str | None = None, 

838 state: str = "active", 

839 ) -> list[dict[str, Any]]: 

840 """ 

841 List EC2 On-Demand Capacity Reservations (ODCRs) in a region. 

842 

843 Args: 

844 region: AWS region to query 

845 instance_type: Filter by instance type (optional) 

846 state: Filter by state — "active" (default), or None for all 

847 

848 Returns: 

849 List of reservation dictionaries with availability details 

850 """ 

851 ec2 = self._session.client("ec2", region_name=region) 

852 

853 filters: list[dict[str, Any]] = [] 

854 if state: 854 ↛ 856line 854 didn't jump to line 856 because the condition on line 854 was always true

855 filters.append({"Name": "state", "Values": [state]}) 

856 if instance_type: 

857 filters.append({"Name": "instance-type", "Values": [instance_type]}) 

858 

859 reservations: list[dict[str, Any]] = [] 

860 try: 

861 paginator = ec2.get_paginator("describe_capacity_reservations") 

862 page_kwargs: dict[str, Any] = {} 

863 if filters: 863 ↛ 866line 863 didn't jump to line 866 because the condition on line 863 was always true

864 page_kwargs["Filters"] = filters 

865 

866 for page in paginator.paginate(**page_kwargs): 

867 for cr in page.get("CapacityReservations", []): 

868 total = cr.get("TotalInstanceCount", 0) 

869 available = cr.get("AvailableInstanceCount", 0) 

870 used = total - available 

871 

872 reservations.append( 

873 { 

874 "type": "odcr", 

875 "reservation_id": cr.get("CapacityReservationId"), 

876 "instance_type": cr.get("InstanceType"), 

877 "availability_zone": cr.get("AvailabilityZone"), 

878 "region": region, 

879 "state": cr.get("State"), 

880 "total_instances": total, 

881 "available_instances": available, 

882 "used_instances": used, 

883 "utilization_pct": round(used / total * 100, 1) if total else 0, 

884 "instance_platform": cr.get("InstancePlatform"), 

885 "tenancy": cr.get("Tenancy"), 

886 "instance_match_criteria": cr.get("InstanceMatchCriteria"), 

887 "end_date": (cr["EndDate"].isoformat() if cr.get("EndDate") else None), 

888 "end_date_type": cr.get("EndDateType"), 

889 "tags": {t["Key"]: t["Value"] for t in cr.get("Tags", [])}, 

890 } 

891 ) 

892 except ClientError as e: 

893 logger.debug("Failed to list capacity reservations in %s: %s", region, e) 

894 

895 return reservations 

896 

897 def list_capacity_block_offerings( 

898 self, 

899 region: str, 

900 instance_type: str, 

901 instance_count: int = 1, 

902 duration_hours: int = 24, 

903 ) -> list[dict[str, Any]]: 

904 """ 

905 List available Capacity Block offerings for ML workloads. 

906 

907 Capacity Blocks provide guaranteed GPU capacity for a fixed duration 

908 at a known price — ideal for training jobs with predictable runtimes. 

909 

910 Args: 

911 region: AWS region to query 

912 instance_type: GPU instance type (e.g. p5.48xlarge, p4d.24xlarge) 

913 instance_count: Number of instances needed 

914 duration_hours: Desired block duration in hours (must be a supported value) 

915 

916 Returns: 

917 List of available capacity block offerings 

918 """ 

919 ec2 = self._session.client("ec2", region_name=region) 

920 offerings: list[dict[str, Any]] = [] 

921 

922 try: 

923 response = ec2.describe_capacity_block_offerings( 

924 InstanceType=instance_type, 

925 InstanceCount=instance_count, 

926 CapacityDurationHours=duration_hours, 

927 ) 

928 

929 for offering in response.get("CapacityBlockOfferings", []): 

930 start_date = offering.get("StartDate") 

931 end_date = offering.get("EndDate") 

932 price = offering.get("UpfrontFee") 

933 

934 offerings.append( 

935 { 

936 "type": "capacity_block", 

937 "offering_id": offering.get("CapacityBlockOfferingId"), 

938 "instance_type": offering.get("InstanceType"), 

939 "availability_zone": offering.get("AvailabilityZone"), 

940 "region": region, 

941 "instance_count": offering.get("InstanceCount"), 

942 "duration_hours": offering.get("CapacityBlockDurationHours"), 

943 "start_date": start_date.isoformat() if start_date else None, 

944 "end_date": end_date.isoformat() if end_date else None, 

945 "upfront_fee": price, 

946 "currency": offering.get("CurrencyCode", "USD"), 

947 "tenancy": offering.get("Tenancy"), 

948 } 

949 ) 

950 except ClientError as e: 

951 error_code = e.response.get("Error", {}).get("Code", "") 

952 if error_code in ("Unsupported", "InvalidParameterValue"): 

953 pass # Instance type doesn't support Capacity Blocks — expected 

954 else: 

955 logger.warning("Failed to list capacity block offerings in %s: %s", region, e) 

956 

957 return offerings 

958 

959 def get_capacity_block_trend( 

960 self, 

961 instance_type: str, 

962 region: str, 

963 ) -> float: 

964 """ 

965 Estimate capacity block availability trend via time-series regression. 

966 

967 Queries offerings across the maximum 182-day (26-week) window, buckets 

968 them into weekly bins by start date, and fits a linear regression to 

969 the offering counts per week. The normalized slope indicates whether 

970 capacity is growing or shrinking over time. 

971 

972 Returns: 

973 Trend score from -1.0 to 1.0: 

974 > 0 = capacity growing (offerings increasing week-over-week) 

975 = 0 = stable or no data 

976 < 0 = capacity shrinking (offerings decreasing week-over-week) 

977 """ 

978 ec2 = self._session.client("ec2", region_name=region) 

979 

980 now = datetime.now(UTC) 

981 far_end = now + timedelta(days=182) 

982 

983 try: 

984 response = ec2.describe_capacity_block_offerings( 

985 InstanceType=instance_type, 

986 InstanceCount=1, 

987 CapacityDurationHours=24, # Minimum duration for broadest results 

988 StartDateRange=now, 

989 EndDateRange=far_end, 

990 ) 

991 except ClientError, Exception: 

992 return 0.0 

993 

994 offerings = response.get("CapacityBlockOfferings", []) 

995 if not offerings: 

996 return 0.0 

997 

998 # Bucket offerings into weekly bins (week 0 = this week, week 25 = ~6 months out) 

999 num_weeks = 26 

1000 bins = [0] * num_weeks 

1001 for o in offerings: 

1002 start = o.get("StartDate") 

1003 if start is None: 1003 ↛ 1004line 1003 didn't jump to line 1004 because the condition on line 1003 was never true

1004 continue 

1005 delta_days = (start - now).total_seconds() / 86400.0 

1006 week_idx = int(delta_days / 7) 

1007 if 0 <= week_idx < num_weeks: 1007 ↛ 1001line 1007 didn't jump to line 1001 because the condition on line 1007 was always true

1008 bins[week_idx] += 1 

1009 

1010 # Need at least 2 non-zero bins to detect a meaningful trend 

1011 non_zero = sum(1 for b in bins if b > 0) 

1012 if non_zero < 2: 

1013 return 0.0 

1014 

1015 # Linear regression: slope of offerings-per-week over time 

1016 # Using least-squares: slope = Σ((x-x̄)(y-ȳ)) / Σ((x-x̄)²) 

1017 n = len(bins) 

1018 x_mean = (n - 1) / 2.0 

1019 y_mean = statistics.mean(bins) 

1020 

1021 numerator = sum((i - x_mean) * (bins[i] - y_mean) for i in range(n)) 

1022 denominator = sum((i - x_mean) ** 2 for i in range(n)) 

1023 

1024 if denominator == 0: 1024 ↛ 1025line 1024 didn't jump to line 1025 because the condition on line 1024 was never true

1025 return 0.0 

1026 

1027 slope = numerator / denominator 

1028 

1029 # Normalize slope to -1..1 range relative to the mean offering count. 

1030 # A slope of +y_mean per 26 weeks would be a doubling → maps to ~1.0. 

1031 normalized = slope * num_weeks / (y_mean * 2) if y_mean > 0 else 0.0 

1032 

1033 return round(max(-1.0, min(1.0, normalized)), 4) 

1034 

1035 def list_all_reservations( 

1036 self, 

1037 instance_type: str | None = None, 

1038 regions: list[str] | None = None, 

1039 ) -> dict[str, Any]: 

1040 """ 

1041 List all capacity reservations (ODCRs) across deployed regions. 

1042 

1043 Args: 

1044 instance_type: Filter by instance type (optional) 

1045 regions: Regions to query (defaults to deployed GCO regions) 

1046 

1047 Returns: 

1048 Summary dict with reservations grouped by region 

1049 """ 

1050 if not regions: 1050 ↛ 1051line 1050 didn't jump to line 1051 because the condition on line 1050 was never true

1051 from cli.aws_client import get_aws_client 

1052 

1053 aws_client = get_aws_client(self.config) 

1054 stacks = aws_client.discover_regional_stacks() 

1055 regions = list(stacks.keys()) if stacks else [self.config.default_region] 

1056 

1057 all_reservations: list[dict[str, Any]] = [] 

1058 for region in regions: 

1059 all_reservations.extend( 

1060 self.list_capacity_reservations(region, instance_type=instance_type) 

1061 ) 

1062 

1063 total_reserved = sum(r["total_instances"] for r in all_reservations) 

1064 total_available = sum(r["available_instances"] for r in all_reservations) 

1065 

1066 return { 

1067 "regions_checked": regions, 

1068 "instance_type_filter": instance_type, 

1069 "total_reservations": len(all_reservations), 

1070 "total_reserved_instances": total_reserved, 

1071 "total_available_instances": total_available, 

1072 "reservations": all_reservations, 

1073 } 

1074 

1075 def check_reservation_availability( 

1076 self, 

1077 instance_type: str, 

1078 region: str | None = None, 

1079 min_count: int = 1, 

1080 include_capacity_blocks: bool = True, 

1081 block_duration_hours: int = 24, 

1082 ) -> dict[str, Any]: 

1083 """ 

1084 Check if capacity reservations or blocks have available instances. 

1085 

1086 Checks both ODCRs (existing reservations) and Capacity Block offerings 

1087 (purchasable guaranteed capacity) for a given instance type. 

1088 

1089 Args: 

1090 instance_type: EC2 instance type to check 

1091 region: Specific region (or None to check all deployed regions) 

1092 min_count: Minimum number of available instances needed 

1093 include_capacity_blocks: Also check Capacity Block offerings 

1094 block_duration_hours: Duration for capacity block search 

1095 

1096 Returns: 

1097 Dictionary with ODCR availability and capacity block offerings 

1098 """ 

1099 if region: 1099 ↛ 1102line 1099 didn't jump to line 1102 because the condition on line 1099 was always true

1100 regions = [region] 

1101 else: 

1102 from cli.aws_client import get_aws_client 

1103 

1104 aws_client = get_aws_client(self.config) 

1105 stacks = aws_client.discover_regional_stacks() 

1106 regions = list(stacks.keys()) if stacks else [self.config.default_region] 

1107 

1108 # Check ODCRs 

1109 odcr_results: list[dict[str, Any]] = [] 

1110 total_available = 0 

1111 total_reserved = 0 

1112 

1113 for r in regions: 

1114 reservations = self.list_capacity_reservations(r, instance_type=instance_type) 

1115 for res in reservations: 

1116 avail = res["available_instances"] 

1117 total_available += avail 

1118 total_reserved += res["total_instances"] 

1119 if avail > 0: 1119 ↛ 1115line 1119 didn't jump to line 1115 because the condition on line 1119 was always true

1120 odcr_results.append(res) 

1121 

1122 # Check Capacity Block offerings 

1123 block_offerings: list[dict[str, Any]] = [] 

1124 if include_capacity_blocks: 

1125 for r in regions: 

1126 offerings = self.list_capacity_block_offerings( 

1127 r, 

1128 instance_type=instance_type, 

1129 instance_count=min_count, 

1130 duration_hours=block_duration_hours, 

1131 ) 

1132 block_offerings.extend(offerings) 

1133 

1134 has_odcr = total_available >= min_count 

1135 has_blocks = len(block_offerings) > 0 

1136 

1137 # Build recommendation 

1138 if has_odcr: 

1139 recommendation = ( 

1140 f"ODCR capacity available: {total_available} instances " 

1141 f"across {len(odcr_results)} reservation(s)" 

1142 ) 

1143 elif has_blocks: 

1144 cheapest = min(block_offerings, key=lambda x: x.get("upfront_fee") or float("inf")) 

1145 recommendation = ( 

1146 f"No ODCR capacity, but {len(block_offerings)} Capacity Block offering(s) " 

1147 f"available (from ${cheapest.get('upfront_fee', '?')} " 

1148 f"for {block_duration_hours}h)" 

1149 ) 

1150 else: 

1151 recommendation = ( 

1152 "No reserved capacity or block offerings found. " 

1153 "Consider on-demand or spot, or request a Capacity Block " 

1154 "for a different duration/region." 

1155 ) 

1156 

1157 return { 

1158 "instance_type": instance_type, 

1159 "min_count_requested": min_count, 

1160 "regions_checked": regions, 

1161 "odcr": { 

1162 "total_reserved_instances": total_reserved, 

1163 "total_available_instances": total_available, 

1164 "has_availability": has_odcr, 

1165 "reservations": odcr_results, 

1166 }, 

1167 "capacity_blocks": { 

1168 "offerings_found": len(block_offerings), 

1169 "has_offerings": has_blocks, 

1170 "duration_hours": block_duration_hours, 

1171 "offerings": block_offerings, 

1172 }, 

1173 "recommendation": recommendation, 

1174 } 

1175 

1176 def purchase_capacity_block( 

1177 self, 

1178 offering_id: str, 

1179 region: str, 

1180 dry_run: bool = False, 

1181 ) -> dict[str, Any]: 

1182 """ 

1183 Purchase a Capacity Block offering by its ID. 

1184 

1185 Args: 

1186 offering_id: Capacity Block offering ID (cb-xxx) from list_capacity_block_offerings 

1187 region: AWS region where the offering exists 

1188 dry_run: If True, validate the offering without purchasing 

1189 

1190 Returns: 

1191 Dictionary with the created capacity reservation details 

1192 """ 

1193 ec2 = self._session.client("ec2", region_name=region) 

1194 

1195 if dry_run: 

1196 # Validate the offering exists by describing capacity block offerings 

1197 # and matching the ID 

1198 try: 

1199 # Use EC2 DryRun to validate permissions without purchasing 

1200 ec2.purchase_capacity_block( 

1201 CapacityBlockOfferingId=offering_id, 

1202 InstancePlatform="Linux/UNIX", 

1203 DryRun=True, 

1204 ) 

1205 except ClientError as e: 

1206 error_code = e.response.get("Error", {}).get("Code", "") 

1207 if error_code == "DryRunOperation": 

1208 # DryRunOperation means the request would have succeeded 

1209 return { 

1210 "success": True, 

1211 "dry_run": True, 

1212 "offering_id": offering_id, 

1213 "region": region, 

1214 "message": "Dry run succeeded — offering is valid and purchasable", 

1215 } 

1216 error_msg = e.response.get("Error", {}).get("Message", str(e)) 

1217 return { 

1218 "success": False, 

1219 "dry_run": True, 

1220 "offering_id": offering_id, 

1221 "region": region, 

1222 "error_code": error_code, 

1223 "error": error_msg, 

1224 } 

1225 

1226 try: 

1227 response = ec2.purchase_capacity_block( 

1228 CapacityBlockOfferingId=offering_id, 

1229 InstancePlatform="Linux/UNIX", 

1230 ) 

1231 

1232 reservation = response.get("CapacityReservation", {}) 

1233 reservation_id = reservation.get("CapacityReservationId", "") 

1234 instance_type = reservation.get("InstanceType", "") 

1235 az = reservation.get("AvailabilityZone", "") 

1236 total = reservation.get("TotalInstanceCount", 0) 

1237 start = reservation.get("StartDate") 

1238 end = reservation.get("EndDate") 

1239 

1240 return { 

1241 "success": True, 

1242 "dry_run": False, 

1243 "reservation_id": reservation_id, 

1244 "offering_id": offering_id, 

1245 "instance_type": instance_type, 

1246 "availability_zone": az, 

1247 "region": region, 

1248 "total_instances": total, 

1249 "start_date": start.isoformat() if start else None, 

1250 "end_date": end.isoformat() if end else None, 

1251 "state": reservation.get("State", ""), 

1252 } 

1253 except ClientError as e: 

1254 error_code = e.response.get("Error", {}).get("Code", "") 

1255 error_msg = e.response.get("Error", {}).get("Message", str(e)) 

1256 return { 

1257 "success": False, 

1258 "dry_run": False, 

1259 "offering_id": offering_id, 

1260 "region": region, 

1261 "error_code": error_code, 

1262 "error": error_msg, 

1263 } 

1264 

1265 def recommend_region_for_job( 

1266 self, 

1267 gpu_required: bool = False, 

1268 min_gpus: int = 0, 

1269 instance_type: str | None = None, 

1270 gpu_count: int = 0, 

1271 ) -> dict[str, Any]: 

1272 """ 

1273 Recommend the optimal region for job placement. 

1274 

1275 Delegates to MultiRegionCapacityChecker for cross-region analysis. 

1276 

1277 Args: 

1278 gpu_required: Whether the job requires GPUs 

1279 min_gpus: Minimum number of GPUs required 

1280 instance_type: Specific instance type for workload-aware scoring 

1281 gpu_count: Number of GPUs required 

1282 

1283 Returns: 

1284 Dictionary with recommended region and justification 

1285 """ 

1286 from .multi_region import MultiRegionCapacityChecker 

1287 

1288 checker = MultiRegionCapacityChecker(self.config) 

1289 return checker.recommend_region_for_job( 

1290 gpu_required, min_gpus, instance_type=instance_type, gpu_count=gpu_count 

1291 ) 

1292 

1293 

1294def get_capacity_checker(config: GCOConfig | None = None) -> CapacityChecker: 

1295 """Get a configured capacity checker instance.""" 

1296 return CapacityChecker(config)