Coverage for cli/images.py: 90%

491 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2Container image registry management for GCO CLI. 

3 

4Provides ``ImageManager`` for building, pushing, and managing user 

5container images stored in per-project ECR repositories under the 

6``gco/`` prefix. Builds run through the same container runtime 

7(Docker, Finch, or Podman) used by CDK asset bundling, detected via 

8``cli._container_runtime``. 

9 

10The ECR repository layout mirrors the project naming convention: 

11``<account>.dkr.ecr.<region>.amazonaws.com/gco/<name>:<tag>``. 

12 

13Read-only methods (``list_repos``, ``list_tags``, ``describe``, 

14``get_uri``, ``replication_get``, ``replication_status``) hit ECR 

15directly via boto3 and do not invoke any container runtime. 

16 

17Administrative methods (``init``, ``lifecycle_get``, ``lifecycle_set``, 

18``replication_sync``) configure the repository surface and are 

19idempotent — re-running them is safe. 

20 

21Destructive methods (``delete_tag``, ``delete_repo``, ``cleanup``, 

22``prune``, ``orphans``) require explicit caller intent and never run 

23implicitly. 

24""" 

25 

26from __future__ import annotations 

27 

28import base64 

29import json 

30import logging 

31import os 

32import re 

33import subprocess 

34from datetime import UTC, datetime, timedelta 

35from pathlib import Path 

36from typing import Any 

37 

38import boto3 

39from botocore.exceptions import ClientError 

40 

41from ._container_runtime import detect_container_runtime 

42from ._image_uri import ( 

43 rewrite_image_uri_for_region as _rewrite_image_uri_for_region, # noqa: F401 

44) 

45from .config import GCOConfig, get_config 

46 

47# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit 

48# Flowchart(s) generated from this file: 

49# * ``ImageManager.build`` -> ``diagrams/code_diagrams/cli/images.ImageManager_build.html`` 

50# (PNG: ``diagrams/code_diagrams/cli/images.ImageManager_build.png``) 

51# * ``ImageManager.push`` -> ``diagrams/code_diagrams/cli/images.ImageManager_push.html`` 

52# (PNG: ``diagrams/code_diagrams/cli/images.ImageManager_push.png``) 

53# * ``ImageManager.cleanup`` -> ``diagrams/code_diagrams/cli/images.ImageManager_cleanup.html`` 

54# (PNG: ``diagrams/code_diagrams/cli/images.ImageManager_cleanup.png``) 

55# Regenerate with ``python diagrams/code_diagrams/generate.py``. 

56# <pyflowchart-code-diagram> END 

57 

58 

59logger = logging.getLogger(__name__) 

60 

61# Image name and tag validation regexes. 

62# 

63# Names: short, dns-friendly. Lowercase letter start, lowercase 

64# alphanumerics and dashes after, max 63 characters total. The regex 

65# also accepts a single character (``^[a-z]$``) — any longer name 

66# requires a closing alphanumeric so dangling dashes are rejected. 

67_NAME_RE = re.compile(r"^[a-z][a-z0-9-]{0,62}$") 

68 

69# Tags: docker reference grammar. First character must be alnum or 

70# underscore; subsequent characters allow dot, dash, underscore. 

71# 128 chars max. 

72_TAG_RE = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_.\-]{0,127}$") 

73 

74# Project repository prefix. Every repo this manager creates lives 

75# under ``gco/`` so a single replication / lifecycle / removal policy 

76# rule can target the whole project. 

77_REPO_PREFIX = "gco" 

78 

79# Default lifecycle policy parameters. 

80_DEFAULT_KEEP_TAGGED = 20 

81_DEFAULT_EXPIRE_UNTAGGED_DAYS = 7 

82 

83# Digest extraction from ``docker push`` stdout/stderr. The runtime 

84# emits a line of the form ``... digest: sha256:... size: ...``. 

85_DIGEST_RE = re.compile(r"sha256:[a-f0-9]{64}") 

86 

87 

88class ImageManager: 

89 """Manages user container images in ECR. 

90 

91 Construction is cheap — no AWS calls happen until a method is 

92 invoked. The account ID and target region are resolved lazily. 

93 """ 

94 

95 def __init__(self, config: GCOConfig | None = None, region: str | None = None): 

96 self.config = config or get_config() 

97 self.region = self._resolve_region(region) 

98 self._account_id_cache: str | None = None 

99 

100 # ------------------------------------------------------------------ 

101 # Region / account helpers 

102 # ------------------------------------------------------------------ 

103 def _resolve_region(self, region: str | None) -> str: 

104 """Pick a region for ECR API calls. 

105 

106 Priority: explicit argument, ``config.regions[0]`` if the 

107 config exposes a regions list, ``AWS_DEFAULT_REGION``, then 

108 ``config.global_region``. 

109 """ 

110 if region: 

111 return region 

112 cfg_regions = getattr(self.config, "regions", None) 

113 if cfg_regions: 

114 return str(cfg_regions[0]) 

115 env_region = os.environ.get("AWS_DEFAULT_REGION") 

116 if env_region: 

117 return env_region 

118 return str(self.config.global_region) 

119 

120 def _account_id(self) -> str: 

121 """Return the AWS account ID via STS GetCallerIdentity (cached).""" 

122 if self._account_id_cache is None: 

123 sts = boto3.client("sts") 

124 self._account_id_cache = sts.get_caller_identity()["Account"] 

125 return self._account_id_cache 

126 

127 def _registry_host(self) -> str: 

128 """Return the ECR registry host for the manager's region.""" 

129 return f"{self._account_id()}.dkr.ecr.{self.region}.amazonaws.com" 

130 

131 def _repo_arn(self, name: str) -> str: 

132 """Return the full ARN of the repository under the project prefix.""" 

133 return f"arn:aws:ecr:{self.region}:{self._account_id()}:repository/{_REPO_PREFIX}/{name}" 

134 

135 def _ecr_client(self) -> Any: 

136 """Return a boto3 ECR client targeting the manager's region.""" 

137 return boto3.client("ecr", region_name=self.region) 

138 

139 # ------------------------------------------------------------------ 

140 # Validation helpers 

141 # ------------------------------------------------------------------ 

142 def _validate_context(self, context: str) -> Path: 

143 """Validate the build context path. 

144 

145 The path must exist on disk and resolve to a directory. Raw 

146 ``..`` segments in the supplied string are rejected outright 

147 so the caller can't trick the manager into reaching outside an 

148 intended workspace; the resolved path is then returned for use 

149 as ``cwd`` of the build. 

150 """ 

151 # Reject string-level traversal segments BEFORE resolving the 

152 # path so callers receive a clear error rather than a silent 

153 # rewrite up the tree. 

154 parts = Path(context).parts 

155 if ".." in parts: 

156 raise ValueError(f"Invalid build context: path traversal not allowed: {context}") 

157 resolved = Path(context).resolve() 

158 if not resolved.exists(): 

159 raise FileNotFoundError(f"Build context not found: {context}") 

160 if not resolved.is_dir(): 

161 raise ValueError(f"Build context is not a directory: {context}") 

162 return resolved 

163 

164 def _validate_name(self, name: str) -> str: 

165 """Validate an image name against ``_NAME_RE``.""" 

166 if not _NAME_RE.match(name): 

167 raise ValueError( 

168 f"Invalid image name: {name!r}. Expected lowercase letters, " 

169 "digits, and dashes; must start with a letter; max 63 chars." 

170 ) 

171 return name 

172 

173 def _validate_tag(self, tag: str) -> str: 

174 """Validate an image tag against ``_TAG_RE``.""" 

175 if not _TAG_RE.match(tag): 

176 raise ValueError( 

177 f"Invalid image tag: {tag!r}. Expected alphanumerics, dots, " 

178 "dashes, and underscores; max 128 chars." 

179 ) 

180 return tag 

181 

182 # ------------------------------------------------------------------ 

183 # Default-value helpers 

184 # ------------------------------------------------------------------ 

185 def _git_short_sha(self) -> str | None: 

186 """Return the current short git SHA, or ``None`` when unavailable.""" 

187 try: 

188 result = subprocess.run( 

189 ["git", "rev-parse", "--short", "HEAD"], 

190 capture_output=True, 

191 text=True, 

192 timeout=5, 

193 check=False, 

194 ) 

195 if result.returncode == 0 and result.stdout.strip(): 

196 return result.stdout.strip() 

197 except (FileNotFoundError, subprocess.TimeoutExpired, OSError) as e: 

198 logger.debug("git rev-parse failed: %s", e) 

199 return None 

200 

201 def _default_tag(self) -> str: 

202 """Return ``_git_short_sha()`` when available, else ``"latest"``.""" 

203 sha = self._git_short_sha() 

204 return sha if sha else "latest" 

205 

206 def _default_lifecycle_policy(self) -> dict[str, Any]: 

207 """Return the default ECR lifecycle policy as a dict. 

208 

209 The policy keeps the most recent ``_DEFAULT_KEEP_TAGGED`` tagged 

210 images and expires untagged images after 

211 ``_DEFAULT_EXPIRE_UNTAGGED_DAYS`` days. The structure matches 

212 the JSON shape that ``ecr.put_lifecycle_policy`` accepts after 

213 being JSON-stringified at the call site. 

214 """ 

215 return { 

216 "rules": [ 

217 { 

218 "rulePriority": 1, 

219 "description": (f"Keep last {_DEFAULT_KEEP_TAGGED} tagged images"), 

220 "selection": { 

221 "tagStatus": "tagged", 

222 "countType": "imageCountMoreThan", 

223 "countNumber": _DEFAULT_KEEP_TAGGED, 

224 "tagPatternList": ["*"], 

225 }, 

226 "action": {"type": "expire"}, 

227 }, 

228 { 

229 "rulePriority": 2, 

230 "description": (f"Expire untagged after {_DEFAULT_EXPIRE_UNTAGGED_DAYS} days"), 

231 "selection": { 

232 "tagStatus": "untagged", 

233 "countType": "sinceImagePushed", 

234 "countUnit": "days", 

235 "countNumber": _DEFAULT_EXPIRE_UNTAGGED_DAYS, 

236 }, 

237 "action": {"type": "expire"}, 

238 }, 

239 ], 

240 } 

241 

242 # ------------------------------------------------------------------ 

243 # Output helpers 

244 # ------------------------------------------------------------------ 

245 def _extract_digest(self, push_output: str) -> str | None: 

246 """Pull the first ``sha256:...`` digest out of push stdout/stderr.""" 

247 match = _DIGEST_RE.search(push_output) 

248 return match.group(0) if match else None 

249 

250 # ------------------------------------------------------------------ 

251 # ECR repository helpers (used by build/push) 

252 # ------------------------------------------------------------------ 

253 def _runtime_or_error(self) -> str: 

254 """Return the detected container runtime, or raise a friendly error.""" 

255 runtime = detect_container_runtime() 

256 if not runtime: 

257 raise RuntimeError( 

258 "No container runtime found. Install Docker, Finch, or " 

259 "Podman, or set CDK_DOCKER=<path>.\n" 

260 " - Docker: https://docs.docker.com/get-docker/\n" 

261 " - Finch: brew install finch && finch vm init\n" 

262 " - Podman: https://podman.io/getting-started/installation" 

263 ) 

264 return runtime 

265 

266 def _ecr_login(self, runtime: str) -> None: 

267 """Authenticate the runtime against the ECR registry.""" 

268 ecr = self._ecr_client() 

269 token = ecr.get_authorization_token()["authorizationData"][0]["authorizationToken"] 

270 username, password = base64.b64decode(token).decode().split(":", 1) 

271 registry = self._registry_host() 

272 result = subprocess.run( 

273 [runtime, "login", "-u", username, "--password-stdin", registry], 

274 input=password.encode(), 

275 capture_output=True, 

276 check=False, 

277 ) 

278 if result.returncode != 0: 

279 raise RuntimeError( 

280 f"{runtime} login to {registry} failed: " 

281 f"{result.stderr.decode(errors='replace').strip()}" 

282 ) 

283 

284 def _check_tag_immutable_collision(self, name: str, tag: str) -> None: 

285 """Block re-pushing a tag when the repo is immutable. 

286 

287 ECR repos can be configured with ``imageTagMutability=IMMUTABLE``, 

288 in which case attempting to overwrite an existing tag silently 

289 succeeds at build time but fails at push time with a confusing 

290 error. Catch this earlier and surface a helpful message. 

291 """ 

292 ecr = self._ecr_client() 

293 repo_name = f"{_REPO_PREFIX}/{name}" 

294 try: 

295 repo_resp = ecr.describe_repositories(repositoryNames=[repo_name]) 

296 except ecr.exceptions.RepositoryNotFoundException: 

297 return 

298 except ClientError as e: 

299 code = e.response.get("Error", {}).get("Code", "") 

300 if code == "RepositoryNotFoundException": 

301 return 

302 raise 

303 

304 repos = repo_resp.get("repositories", []) 

305 if not repos: 

306 return 

307 mutability = repos[0].get("imageTagMutability", "MUTABLE") 

308 if mutability != "IMMUTABLE": 

309 return 

310 

311 try: 

312 existing = ecr.describe_images( 

313 repositoryName=repo_name, 

314 imageIds=[{"imageTag": tag}], 

315 ) 

316 except ecr.exceptions.ImageNotFoundException: 

317 return 

318 except ClientError as e: 

319 code = e.response.get("Error", {}).get("Code", "") 

320 if code == "ImageNotFoundException": 

321 return 

322 raise 

323 

324 if existing.get("imageDetails"): 324 ↛ exitline 324 didn't return from function '_check_tag_immutable_collision' because the condition on line 324 was always true

325 raise RuntimeError( 

326 f"Tag {tag!r} already exists on immutable repo " 

327 f"{repo_name!r}. Re-run with a different tag, e.g. " 

328 f"--tag <new_tag>." 

329 ) 

330 

331 def _apply_retain_tag(self, name: str) -> None: 

332 """Apply the ``gco:retain=true`` resource tag to the repository.""" 

333 ecr = self._ecr_client() 

334 ecr.tag_resource( 

335 resourceArn=self._repo_arn(name), 

336 tags=[{"Key": "gco:retain", "Value": "true"}], 

337 ) 

338 

339 # ------------------------------------------------------------------ 

340 # build / push 

341 # ------------------------------------------------------------------ 

342 def build( 

343 self, 

344 context: str, 

345 name: str, 

346 tag: str | None = None, 

347 dockerfile: str = "Dockerfile", 

348 build_args: dict[str, str] | None = None, 

349 platform: str = "linux/amd64", 

350 retain: bool = False, 

351 ) -> dict[str, Any]: 

352 """Build a container image and push it to the project's ECR repo. 

353 

354 Args: 

355 context: Build context directory. 

356 name: Image name (validated; lowercase letters, digits, dashes). 

357 tag: Image tag (defaults to git short SHA, else ``latest``). 

358 dockerfile: Path to the Dockerfile, relative to ``context``. 

359 build_args: Optional ``KEY=value`` build args. 

360 platform: ``--platform`` argument for the build (default 

361 ``linux/amd64``). 

362 retain: When True, mark the repository with ``gco:retain=true`` 

363 so it survives stack destroys. 

364 

365 Returns: 

366 ``{"image_uri", "digest", "size_bytes", ...}``. 

367 """ 

368 ctx = self._validate_context(context) 

369 validated_name = self._validate_name(name) 

370 validated_tag = self._validate_tag(tag if tag is not None else self._default_tag()) 

371 

372 df_path = (ctx / dockerfile).resolve() 

373 if not df_path.exists() or not df_path.is_file(): 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 raise FileNotFoundError(f"Dockerfile not found: {df_path} (relative to {ctx})") 

375 # Confine the Dockerfile to the build context. 

376 if not str(df_path).startswith(str(ctx)): 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 raise ValueError(f"Dockerfile must live inside the build context: {df_path}") 

378 

379 runtime = self._runtime_or_error() 

380 self.init(name, retain=retain) 

381 self._check_tag_immutable_collision(validated_name, validated_tag) 

382 self._ecr_login(runtime) 

383 

384 full_uri = f"{self._registry_host()}/{_REPO_PREFIX}/{validated_name}:{validated_tag}" 

385 

386 build_cmd: list[str] = [ 

387 runtime, 

388 "build", 

389 "-t", 

390 full_uri, 

391 "--platform", 

392 platform, 

393 "-f", 

394 str(df_path), 

395 ] 

396 for key, value in (build_args or {}).items(): 396 ↛ 397line 396 didn't jump to line 397 because the loop on line 396 never started

397 build_cmd.extend(["--build-arg", f"{key}={value}"]) 

398 build_cmd.append(str(ctx)) 

399 

400 logger.info("Building image: %s", " ".join(build_cmd)) 

401 subprocess.run(build_cmd, check=True, cwd=str(ctx)) 

402 

403 push_result = subprocess.run( 

404 [runtime, "push", full_uri], 

405 capture_output=True, 

406 text=True, 

407 check=True, 

408 cwd=str(ctx), 

409 ) 

410 digest = self._extract_digest((push_result.stdout or "") + (push_result.stderr or "")) 

411 

412 if retain: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true

413 self._apply_retain_tag(validated_name) 

414 

415 size_bytes = self._image_size_bytes(validated_name, validated_tag) 

416 

417 return { 

418 "image_uri": full_uri, 

419 "digest": digest, 

420 "size_bytes": size_bytes, 

421 "runtime": runtime, 

422 "repository": f"{_REPO_PREFIX}/{validated_name}", 

423 "tag": validated_tag, 

424 "region": self.region, 

425 "retain": retain, 

426 } 

427 

428 def push( 

429 self, 

430 name: str, 

431 tag: str, 

432 local_image: str, 

433 retain: bool = False, 

434 ) -> dict[str, Any]: 

435 """Push an already-built local image to the project's ECR repo. 

436 

437 Tags ``local_image`` as the project URI before invoking 

438 ``<runtime> push``. Skips the build step but otherwise mirrors 

439 ``build`` (init repo, login, push, optional retain tag). 

440 """ 

441 validated_name = self._validate_name(name) 

442 validated_tag = self._validate_tag(tag) 

443 if not local_image: 

444 raise ValueError("local_image must be a non-empty image reference") 

445 

446 runtime = self._runtime_or_error() 

447 self.init(name, retain=retain) 

448 self._check_tag_immutable_collision(validated_name, validated_tag) 

449 self._ecr_login(runtime) 

450 

451 full_uri = f"{self._registry_host()}/{_REPO_PREFIX}/{validated_name}:{validated_tag}" 

452 

453 subprocess.run([runtime, "tag", local_image, full_uri], check=True) 

454 push_result = subprocess.run( 

455 [runtime, "push", full_uri], 

456 capture_output=True, 

457 text=True, 

458 check=True, 

459 ) 

460 digest = self._extract_digest((push_result.stdout or "") + (push_result.stderr or "")) 

461 

462 if retain: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true

463 self._apply_retain_tag(validated_name) 

464 

465 size_bytes = self._image_size_bytes(validated_name, validated_tag) 

466 

467 return { 

468 "image_uri": full_uri, 

469 "digest": digest, 

470 "size_bytes": size_bytes, 

471 "runtime": runtime, 

472 "repository": f"{_REPO_PREFIX}/{validated_name}", 

473 "tag": validated_tag, 

474 "region": self.region, 

475 "retain": retain, 

476 } 

477 

478 def _image_size_bytes(self, name: str, tag: str) -> int | None: 

479 """Best-effort ECR lookup for the pushed image size.""" 

480 ecr = self._ecr_client() 

481 try: 

482 resp = ecr.describe_images( 

483 repositoryName=f"{_REPO_PREFIX}/{name}", 

484 imageIds=[{"imageTag": tag}], 

485 ) 

486 details = resp.get("imageDetails", []) 

487 if details: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true

488 size = details[0].get("imageSizeInBytes") 

489 if isinstance(size, int): 

490 return size 

491 except Exception as e: # noqa: BLE001 

492 logger.debug("describe_images for size lookup failed: %s", e) 

493 return None 

494 

495 # ------------------------------------------------------------------ 

496 # Read-only methods 

497 # ------------------------------------------------------------------ 

498 def list_repos(self) -> list[dict[str, Any]]: 

499 """List every repository under the project's ``gco/`` prefix.""" 

500 ecr = self._ecr_client() 

501 repos: list[dict[str, Any]] = [] 

502 paginator = ecr.get_paginator("describe_repositories") 

503 for page in paginator.paginate(): 

504 for repo in page.get("repositories", []): 

505 repo_name = repo.get("repositoryName", "") 

506 if not repo_name.startswith(f"{_REPO_PREFIX}/"): 

507 continue 

508 image_count = self._image_count(repo_name) 

509 repos.append( 

510 { 

511 "name": repo_name, 

512 "arn": repo.get("repositoryArn"), 

513 "uri": repo.get("repositoryUri"), 

514 "created_at": _isoformat(repo.get("createdAt")), 

515 "image_count": image_count, 

516 "tag_mutability": repo.get("imageTagMutability"), 

517 } 

518 ) 

519 return repos 

520 

521 def _image_count(self, repository_name: str) -> int: 

522 """Best-effort count of images in a repository.""" 

523 ecr = self._ecr_client() 

524 try: 

525 count = 0 

526 paginator = ecr.get_paginator("describe_images") 

527 for page in paginator.paginate(repositoryName=repository_name): 

528 count += len(page.get("imageDetails", [])) 

529 return count 

530 except Exception as e: # noqa: BLE001 

531 logger.debug("describe_images count for %s failed: %s", repository_name, e) 

532 return 0 

533 

534 def list_tags(self, name: str) -> list[dict[str, Any]]: 

535 """List every tag (with digest, pushed date, size) on a repository.""" 

536 validated = self._validate_name(name) 

537 ecr = self._ecr_client() 

538 rows: list[dict[str, Any]] = [] 

539 paginator = ecr.get_paginator("describe_images") 

540 for page in paginator.paginate( 

541 repositoryName=f"{_REPO_PREFIX}/{validated}", 

542 ): 

543 for detail in page.get("imageDetails", []): 

544 for tag in detail.get("imageTags", []) or [None]: 

545 rows.append( 

546 { 

547 "tag": tag, 

548 "digest": detail.get("imageDigest"), 

549 "pushed_at": _isoformat(detail.get("imagePushedAt")), 

550 "size_bytes": detail.get("imageSizeInBytes"), 

551 } 

552 ) 

553 return rows 

554 

555 def describe(self, name: str, tag: str) -> dict[str, Any]: 

556 """Return the full ECR image details for a single tag.""" 

557 validated_name = self._validate_name(name) 

558 validated_tag = self._validate_tag(tag) 

559 ecr = self._ecr_client() 

560 resp = ecr.describe_images( 

561 repositoryName=f"{_REPO_PREFIX}/{validated_name}", 

562 imageIds=[{"imageTag": validated_tag}], 

563 ) 

564 details = resp.get("imageDetails", []) 

565 if not details: 

566 return {} 

567 detail = details[0] 

568 return { 

569 "name": f"{_REPO_PREFIX}/{validated_name}", 

570 "tag": validated_tag, 

571 "digest": detail.get("imageDigest"), 

572 "pushed_at": _isoformat(detail.get("imagePushedAt")), 

573 "size_bytes": detail.get("imageSizeInBytes"), 

574 "tags": detail.get("imageTags", []), 

575 "scan_findings_summary": detail.get("imageScanFindingsSummary"), 

576 } 

577 

578 def get_uri(self, name: str, tag: str = "latest") -> str: 

579 """Return the full registry URI for ``name:tag``. No API call.""" 

580 validated_name = self._validate_name(name) 

581 validated_tag = self._validate_tag(tag) 

582 return f"{self._registry_host()}/{_REPO_PREFIX}/{validated_name}:{validated_tag}" 

583 

584 def replication_get(self) -> dict[str, Any]: 

585 """Return the current ECR replication configuration, or ``{}``.""" 

586 ecr = self._ecr_client() 

587 try: 

588 resp = ecr.get_registry_policy() 

589 policy_text = resp.get("policyText") 

590 if policy_text: 590 ↛ 600line 590 didn't jump to line 600 because the condition on line 590 was always true

591 return { 

592 "registryId": resp.get("registryId"), 

593 "policy": json.loads(policy_text), 

594 } 

595 except ClientError as e: 

596 code = e.response.get("Error", {}).get("Code", "") 

597 if code in ("RegistryPolicyNotFoundException",): 

598 return {} 

599 raise 

600 return {} 

601 

602 def replication_status(self) -> list[dict[str, Any]]: 

603 """Per-repo replication status across the project repos.""" 

604 ecr = self._ecr_client() 

605 rows: list[dict[str, Any]] = [] 

606 for repo in self.list_repos(): 

607 repo_name = repo["name"] 

608 paginator = ecr.get_paginator("describe_images") 

609 try: 

610 for page in paginator.paginate(repositoryName=repo_name): 

611 for detail in page.get("imageDetails", []): 

612 digest = detail.get("imageDigest") 

613 try: 

614 status = ecr.describe_image_replication_status( 

615 repositoryName=repo_name, 

616 imageId={"imageDigest": digest}, 

617 ) 

618 for entry in status.get("replicationStatuses", []): 

619 rows.append( 

620 { 

621 "repository": repo_name, 

622 "digest": digest, 

623 "region": entry.get("region"), 

624 "status": entry.get("status"), 

625 "registry_id": entry.get("registryId"), 

626 } 

627 ) 

628 except (ClientError, AttributeError) as e: 

629 logger.debug( 

630 "describe_image_replication_status failed for %s %s: %s", 

631 repo_name, 

632 digest, 

633 e, 

634 ) 

635 except ClientError as e: 

636 logger.debug("describe_images failed for %s: %s", repo_name, e) 

637 return rows 

638 

639 # ------------------------------------------------------------------ 

640 # Administrative methods 

641 # ------------------------------------------------------------------ 

642 def init(self, name: str, retain: bool = False) -> dict[str, Any]: 

643 """Create the project repository idempotently with default lifecycle. 

644 

645 ``CreateRepository`` is invoked with ``imageTagMutability=MUTABLE`` 

646 and ``scanOnPush=True``. If the repository already exists, the 

647 method becomes a no-op for repository creation but still applies 

648 the default lifecycle policy and the optional ``gco:retain`` tag. 

649 """ 

650 validated = self._validate_name(name) 

651 repo_name = f"{_REPO_PREFIX}/{validated}" 

652 ecr = self._ecr_client() 

653 

654 created = False 

655 try: 

656 ecr.create_repository( 

657 repositoryName=repo_name, 

658 imageTagMutability="MUTABLE", 

659 imageScanningConfiguration={"scanOnPush": True}, 

660 tags=[ 

661 {"Key": "Project", "Value": self.config.project_name}, 

662 ], 

663 ) 

664 created = True 

665 except ecr.exceptions.RepositoryAlreadyExistsException: 

666 # Idempotent init — re-running ``gco images init`` against an 

667 # already-provisioned repo is a no-op for create_repository. 

668 # We still flow through the lifecycle/retain blocks below so 

669 # any drift in policy is healed on every call. 

670 logger.debug("repository %s already exists; skipping create", repo_name) 

671 except ClientError as e: 

672 code = e.response.get("Error", {}).get("Code", "") 

673 if code != "RepositoryAlreadyExistsException": 673 ↛ 676line 673 didn't jump to line 676 because the condition on line 673 was always true

674 raise 

675 

676 try: 

677 ecr.put_lifecycle_policy( 

678 repositoryName=repo_name, 

679 lifecyclePolicyText=json.dumps(self._default_lifecycle_policy()), 

680 ) 

681 except ClientError as e: 

682 logger.debug("put_lifecycle_policy on %s failed: %s", repo_name, e) 

683 

684 if retain: 

685 try: 

686 self._apply_retain_tag(validated) 

687 except ClientError as e: 

688 logger.debug("apply retain tag on %s failed: %s", repo_name, e) 

689 

690 return { 

691 "name": repo_name, 

692 "created": created, 

693 "retain": retain, 

694 } 

695 

696 def lifecycle_get(self, name: str) -> dict[str, Any]: 

697 """Return the lifecycle policy on a repository, or ``{}``.""" 

698 validated = self._validate_name(name) 

699 ecr = self._ecr_client() 

700 try: 

701 resp = ecr.get_lifecycle_policy( 

702 repositoryName=f"{_REPO_PREFIX}/{validated}", 

703 ) 

704 policy_text = resp.get("lifecyclePolicyText") 

705 if policy_text: 705 ↛ 717line 705 didn't jump to line 717 because the condition on line 705 was always true

706 return { 

707 "name": f"{_REPO_PREFIX}/{validated}", 

708 "policy": json.loads(policy_text), 

709 } 

710 except ecr.exceptions.LifecyclePolicyNotFoundException: 

711 return {} 

712 except ClientError as e: 

713 code = e.response.get("Error", {}).get("Code", "") 

714 if code == "LifecyclePolicyNotFoundException": 714 ↛ 715line 714 didn't jump to line 715 because the condition on line 714 was never true

715 return {} 

716 raise 

717 return {} 

718 

719 def lifecycle_set(self, name: str, policy: dict[str, Any]) -> dict[str, Any]: 

720 """Replace the lifecycle policy on a repository.""" 

721 validated = self._validate_name(name) 

722 ecr = self._ecr_client() 

723 resp = ecr.put_lifecycle_policy( 

724 repositoryName=f"{_REPO_PREFIX}/{validated}", 

725 lifecyclePolicyText=json.dumps(policy), 

726 ) 

727 return { 

728 "name": f"{_REPO_PREFIX}/{validated}", 

729 "registry_id": resp.get("registryId"), 

730 "policy": policy, 

731 } 

732 

733 def replication_sync(self) -> dict[str, Any]: 

734 """Apply the project's standard replication rule. 

735 

736 Replicates ``gco/*`` to every region in ``config.regions`` (when 

737 the config exposes one) — the rule object mirrors what the 

738 global stack provisions so the two stay aligned. 

739 """ 

740 ecr = self._ecr_client() 

741 regions = list(getattr(self.config, "regions", []) or []) 

742 # Don't replicate to the source region itself. 

743 destinations = [r for r in regions if r != self.region] 

744 

745 account = self._account_id() 

746 rule = { 

747 "destinations": [{"region": r, "registryId": account} for r in destinations], 

748 "repositoryFilters": [{"filter": f"{_REPO_PREFIX}/", "filterType": "PREFIX_MATCH"}], 

749 } 

750 config = {"rules": [rule]} if destinations else {"rules": []} 

751 resp = ecr.put_replication_configuration(replicationConfiguration=config) 

752 return { 

753 "configuration": config, 

754 "destinations": destinations, 

755 "registry_id": resp.get("replicationConfiguration", {}), 

756 } 

757 

758 # ------------------------------------------------------------------ 

759 # Destructive methods 

760 # ------------------------------------------------------------------ 

761 def delete_tag(self, name: str, tag: str) -> dict[str, Any]: 

762 """Delete a single tag from a repository.""" 

763 validated_name = self._validate_name(name) 

764 validated_tag = self._validate_tag(tag) 

765 ecr = self._ecr_client() 

766 resp = ecr.batch_delete_image( 

767 repositoryName=f"{_REPO_PREFIX}/{validated_name}", 

768 imageIds=[{"imageTag": validated_tag}], 

769 ) 

770 return { 

771 "name": f"{_REPO_PREFIX}/{validated_name}", 

772 "tag": validated_tag, 

773 "deleted": [ 

774 {"digest": d.get("imageDigest"), "tag": d.get("imageTag")} 

775 for d in resp.get("imageIds", []) 

776 ], 

777 "failures": resp.get("failures", []), 

778 } 

779 

780 def delete_repo(self, name: str, force: bool = False) -> dict[str, Any]: 

781 """Delete a repository (optionally including its images).""" 

782 validated = self._validate_name(name) 

783 ecr = self._ecr_client() 

784 resp = ecr.delete_repository( 

785 repositoryName=f"{_REPO_PREFIX}/{validated}", 

786 force=force, 

787 ) 

788 return { 

789 "name": f"{_REPO_PREFIX}/{validated}", 

790 "deleted": True, 

791 "registry_id": resp.get("repository", {}).get("registryId"), 

792 } 

793 

794 def cleanup( 

795 self, 

796 name: str | None = None, 

797 all: bool = False, 

798 ) -> dict[str, Any]: 

799 """Delete every untagged image across one or all project repos.""" 

800 if not name and not all: 

801 raise ValueError("cleanup() requires either a name or all=True") 

802 

803 repos: list[str] 

804 if name: 

805 validated = self._validate_name(name) 

806 repos = [f"{_REPO_PREFIX}/{validated}"] 

807 else: 

808 repos = [r["name"] for r in self.list_repos()] 

809 

810 ecr = self._ecr_client() 

811 repos_touched = 0 

812 tags_deleted = 0 

813 bytes_freed = 0 

814 

815 for repo_name in repos: 

816 untagged_ids: list[dict[str, str]] = [] 

817 untagged_size = 0 

818 try: 

819 paginator = ecr.get_paginator("describe_images") 

820 for page in paginator.paginate( 

821 repositoryName=repo_name, 

822 filter={"tagStatus": "UNTAGGED"}, 

823 ): 

824 for detail in page.get("imageDetails", []): 

825 digest = detail.get("imageDigest") 

826 if not digest: 

827 continue 

828 untagged_ids.append({"imageDigest": digest}) 

829 size = detail.get("imageSizeInBytes") or 0 

830 if isinstance(size, int): 830 ↛ 824line 830 didn't jump to line 824 because the condition on line 830 was always true

831 untagged_size += size 

832 except ClientError as e: 

833 logger.debug("describe_images for cleanup of %s failed: %s", repo_name, e) 

834 continue 

835 

836 if not untagged_ids: 836 ↛ 837line 836 didn't jump to line 837 because the condition on line 836 was never true

837 continue 

838 repos_touched += 1 

839 # batch_delete_image accepts up to 100 ids per call. 

840 for chunk_start in range(0, len(untagged_ids), 100): 

841 chunk = untagged_ids[chunk_start : chunk_start + 100] 

842 resp = ecr.batch_delete_image( 

843 repositoryName=repo_name, 

844 imageIds=chunk, 

845 ) 

846 tags_deleted += len(resp.get("imageIds", [])) 

847 bytes_freed += untagged_size 

848 

849 return { 

850 "repos_touched": repos_touched, 

851 "tags_deleted": tags_deleted, 

852 "bytes_freed": bytes_freed, 

853 } 

854 

855 def prune(self, dry_run: bool = True) -> dict[str, Any]: 

856 """Remove untagged images older than 30 days. 

857 

858 Returns the same shape as ``cleanup``; when ``dry_run`` is True 

859 (the default), no images are deleted. 

860 """ 

861 cutoff = datetime.now(UTC) - timedelta(days=30) 

862 ecr = self._ecr_client() 

863 repos_touched = 0 

864 tags_deleted = 0 

865 bytes_freed = 0 

866 

867 for repo in self.list_repos(): 

868 repo_name = repo["name"] 

869 stale_ids: list[dict[str, str]] = [] 

870 stale_size = 0 

871 try: 

872 paginator = ecr.get_paginator("describe_images") 

873 for page in paginator.paginate( 

874 repositoryName=repo_name, 

875 filter={"tagStatus": "UNTAGGED"}, 

876 ): 

877 for detail in page.get("imageDetails", []): 

878 pushed = detail.get("imagePushedAt") 

879 if pushed and pushed >= cutoff: 

880 continue 

881 digest = detail.get("imageDigest") 

882 if not digest: 882 ↛ 883line 882 didn't jump to line 883 because the condition on line 882 was never true

883 continue 

884 stale_ids.append({"imageDigest": digest}) 

885 size = detail.get("imageSizeInBytes") or 0 

886 if isinstance(size, int): 886 ↛ 877line 886 didn't jump to line 877 because the condition on line 886 was always true

887 stale_size += size 

888 except ClientError as e: 

889 logger.debug("describe_images for prune of %s failed: %s", repo_name, e) 

890 continue 

891 

892 if not stale_ids: 

893 continue 

894 repos_touched += 1 

895 tags_deleted += len(stale_ids) 

896 bytes_freed += stale_size 

897 if dry_run: 

898 continue 

899 for chunk_start in range(0, len(stale_ids), 100): 

900 chunk = stale_ids[chunk_start : chunk_start + 100] 

901 ecr.batch_delete_image( 

902 repositoryName=repo_name, 

903 imageIds=chunk, 

904 ) 

905 

906 return { 

907 "dry_run": dry_run, 

908 "repos_touched": repos_touched, 

909 "tags_deleted": tags_deleted, 

910 "bytes_freed": bytes_freed, 

911 } 

912 

913 def orphans(self, threshold_days: int = 30) -> list[dict[str, Any]]: 

914 """List ``gco/*`` tags older than ``threshold_days`` with no references. 

915 

916 Cross-references against: 

917 * inference endpoint specs (via :class:`cli.inference.InferenceManager`), 

918 * recent jobs (best-effort; returns empty for the jobs side when 

919 the queue table schema is unavailable). 

920 """ 

921 cutoff = datetime.now(UTC) - timedelta(days=threshold_days) 

922 referenced: set[str] = set() 

923 referenced.update(self._collect_inference_image_refs()) 

924 referenced.update(self._collect_recent_job_image_refs(threshold_days)) 

925 

926 rows: list[dict[str, Any]] = [] 

927 for repo in self.list_repos(): 

928 repo_name = repo["name"] 

929 for tag_row in self.list_tags(repo_name.removeprefix(f"{_REPO_PREFIX}/")): 

930 tag = tag_row.get("tag") 

931 if not tag: 931 ↛ 932line 931 didn't jump to line 932 because the condition on line 931 was never true

932 continue 

933 pushed = self._parse_iso(tag_row.get("pushed_at")) 

934 if pushed and pushed >= cutoff: 

935 continue 

936 uri = f"{self._registry_host()}/{repo_name}:{tag}" 

937 if uri in referenced: 

938 continue 

939 rows.append( 

940 { 

941 "repository": repo_name, 

942 "tag": tag, 

943 "digest": tag_row.get("digest"), 

944 "pushed_at": tag_row.get("pushed_at"), 

945 "uri": uri, 

946 } 

947 ) 

948 return rows 

949 

950 def _collect_inference_image_refs(self) -> set[str]: 

951 """Return every image URI referenced by a registered inference endpoint.""" 

952 try: 

953 from .inference import InferenceManager 

954 except Exception as e: # noqa: BLE001 

955 logger.debug("InferenceManager unavailable: %s", e) 

956 return set() 

957 try: 

958 manager = InferenceManager(self.config) 

959 endpoints = manager.list_endpoints() 

960 except Exception as e: # noqa: BLE001 

961 logger.debug("list_endpoints failed: %s", e) 

962 return set() 

963 refs: set[str] = set() 

964 for ep in endpoints or []: 

965 spec = ep.get("spec") or {} 

966 image = spec.get("image") if isinstance(spec, dict) else None 

967 if image: 

968 refs.add(image) 

969 canary = spec.get("canary") if isinstance(spec, dict) else None 

970 if isinstance(canary, dict) and canary.get("image"): 

971 refs.add(canary["image"]) 

972 return refs 

973 

974 def _collect_recent_job_image_refs(self, threshold_days: int = 30) -> set[str]: 

975 """Return image URIs referenced by jobs newer than ``threshold_days``. 

976 

977 Walks every deployed region via :class:`cli.jobs.JobManager` and 

978 unions the ``image_refs`` field on each ``JobInfo`` whose 

979 ``created_time`` is within the cutoff. Treats jobs without a 

980 ``created_time`` as in-window so a freshly-submitted job that 

981 hasn't yet been picked up by the cluster's status loop isn't 

982 accidentally considered orphaned. 

983 

984 Best-effort: any per-region failure is logged at debug and 

985 skipped so the orphan scan still completes against the 

986 regions that did respond. The deferred import breaks an 

987 otherwise-circular ``cli.images`` ↔ ``cli.jobs`` dependency. 

988 """ 

989 try: 

990 from .jobs import JobManager 

991 except Exception as e: # noqa: BLE001 

992 logger.debug("JobManager unavailable: %s", e) 

993 return set() 

994 

995 try: 

996 manager = JobManager(self.config) 

997 except Exception as e: # noqa: BLE001 

998 logger.debug("JobManager init failed: %s", e) 

999 return set() 

1000 

1001 try: 

1002 jobs = manager.list_jobs(all_regions=True) 

1003 except Exception as e: # noqa: BLE001 

1004 logger.debug("list_jobs(all_regions=True) failed: %s", e) 

1005 return set() 

1006 

1007 cutoff = datetime.now(UTC) - timedelta(days=threshold_days) 

1008 refs: set[str] = set() 

1009 for job in jobs or []: 

1010 created = getattr(job, "created_time", None) 

1011 if isinstance(created, datetime): 

1012 created_aware = created if created.tzinfo else created.replace(tzinfo=UTC) 

1013 if created_aware < cutoff: 

1014 continue 

1015 image_refs = getattr(job, "image_refs", None) or [] 

1016 for ref in image_refs: 

1017 if isinstance(ref, str) and ref: 

1018 refs.add(ref) 

1019 return refs 

1020 

1021 @staticmethod 

1022 def _parse_iso(value: Any) -> datetime | None: 

1023 """Parse an ISO-8601 string into a tz-aware datetime, else None.""" 

1024 if isinstance(value, datetime): 

1025 return value if value.tzinfo else value.replace(tzinfo=UTC) 

1026 if not isinstance(value, str): 

1027 return None 

1028 try: 

1029 parsed = datetime.fromisoformat(value) 

1030 except ValueError: 

1031 return None 

1032 return parsed if parsed.tzinfo else parsed.replace(tzinfo=UTC) 

1033 

1034 

1035def _isoformat(value: Any) -> str | None: 

1036 """Return ISO-8601 form of a datetime, or pass-through for strings.""" 

1037 if value is None: 

1038 return None 

1039 if isinstance(value, datetime): 

1040 return value.isoformat() 

1041 return str(value) 

1042 

1043 

1044def get_image_manager(config: GCOConfig | None = None, region: str | None = None) -> ImageManager: 

1045 """Factory function for ``ImageManager``.""" 

1046 return ImageManager(config=config, region=region)