Coverage for cli/images.py: 90%
491 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Container image registry management for GCO CLI.
4Provides ``ImageManager`` for building, pushing, and managing user
5container images stored in per-project ECR repositories under the
6``gco/`` prefix. Builds run through the same container runtime
7(Docker, Finch, or Podman) used by CDK asset bundling, detected via
8``cli._container_runtime``.
10The ECR repository layout mirrors the project naming convention:
11``<account>.dkr.ecr.<region>.amazonaws.com/gco/<name>:<tag>``.
13Read-only methods (``list_repos``, ``list_tags``, ``describe``,
14``get_uri``, ``replication_get``, ``replication_status``) hit ECR
15directly via boto3 and do not invoke any container runtime.
17Administrative methods (``init``, ``lifecycle_get``, ``lifecycle_set``,
18``replication_sync``) configure the repository surface and are
19idempotent — re-running them is safe.
21Destructive methods (``delete_tag``, ``delete_repo``, ``cleanup``,
22``prune``, ``orphans``) require explicit caller intent and never run
23implicitly.
24"""
26from __future__ import annotations
28import base64
29import json
30import logging
31import os
32import re
33import subprocess
34from datetime import UTC, datetime, timedelta
35from pathlib import Path
36from typing import Any
38import boto3
39from botocore.exceptions import ClientError
41from ._container_runtime import detect_container_runtime
42from ._image_uri import (
43 rewrite_image_uri_for_region as _rewrite_image_uri_for_region, # noqa: F401
44)
45from .config import GCOConfig, get_config
47# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit
48# Flowchart(s) generated from this file:
49# * ``ImageManager.build`` -> ``diagrams/code_diagrams/cli/images.ImageManager_build.html``
50# (PNG: ``diagrams/code_diagrams/cli/images.ImageManager_build.png``)
51# * ``ImageManager.push`` -> ``diagrams/code_diagrams/cli/images.ImageManager_push.html``
52# (PNG: ``diagrams/code_diagrams/cli/images.ImageManager_push.png``)
53# * ``ImageManager.cleanup`` -> ``diagrams/code_diagrams/cli/images.ImageManager_cleanup.html``
54# (PNG: ``diagrams/code_diagrams/cli/images.ImageManager_cleanup.png``)
55# Regenerate with ``python diagrams/code_diagrams/generate.py``.
56# <pyflowchart-code-diagram> END
59logger = logging.getLogger(__name__)
61# Image name and tag validation regexes.
62#
63# Names: short, dns-friendly. Lowercase letter start, lowercase
64# alphanumerics and dashes after, max 63 characters total. The regex
65# also accepts a single character (``^[a-z]$``) — any longer name
66# requires a closing alphanumeric so dangling dashes are rejected.
67_NAME_RE = re.compile(r"^[a-z][a-z0-9-]{0,62}$")
69# Tags: docker reference grammar. First character must be alnum or
70# underscore; subsequent characters allow dot, dash, underscore.
71# 128 chars max.
72_TAG_RE = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_.\-]{0,127}$")
74# Project repository prefix. Every repo this manager creates lives
75# under ``gco/`` so a single replication / lifecycle / removal policy
76# rule can target the whole project.
77_REPO_PREFIX = "gco"
79# Default lifecycle policy parameters.
80_DEFAULT_KEEP_TAGGED = 20
81_DEFAULT_EXPIRE_UNTAGGED_DAYS = 7
83# Digest extraction from ``docker push`` stdout/stderr. The runtime
84# emits a line of the form ``... digest: sha256:... size: ...``.
85_DIGEST_RE = re.compile(r"sha256:[a-f0-9]{64}")
88class ImageManager:
89 """Manages user container images in ECR.
91 Construction is cheap — no AWS calls happen until a method is
92 invoked. The account ID and target region are resolved lazily.
93 """
95 def __init__(self, config: GCOConfig | None = None, region: str | None = None):
96 self.config = config or get_config()
97 self.region = self._resolve_region(region)
98 self._account_id_cache: str | None = None
100 # ------------------------------------------------------------------
101 # Region / account helpers
102 # ------------------------------------------------------------------
103 def _resolve_region(self, region: str | None) -> str:
104 """Pick a region for ECR API calls.
106 Priority: explicit argument, ``config.regions[0]`` if the
107 config exposes a regions list, ``AWS_DEFAULT_REGION``, then
108 ``config.global_region``.
109 """
110 if region:
111 return region
112 cfg_regions = getattr(self.config, "regions", None)
113 if cfg_regions:
114 return str(cfg_regions[0])
115 env_region = os.environ.get("AWS_DEFAULT_REGION")
116 if env_region:
117 return env_region
118 return str(self.config.global_region)
120 def _account_id(self) -> str:
121 """Return the AWS account ID via STS GetCallerIdentity (cached)."""
122 if self._account_id_cache is None:
123 sts = boto3.client("sts")
124 self._account_id_cache = sts.get_caller_identity()["Account"]
125 return self._account_id_cache
127 def _registry_host(self) -> str:
128 """Return the ECR registry host for the manager's region."""
129 return f"{self._account_id()}.dkr.ecr.{self.region}.amazonaws.com"
131 def _repo_arn(self, name: str) -> str:
132 """Return the full ARN of the repository under the project prefix."""
133 return f"arn:aws:ecr:{self.region}:{self._account_id()}:repository/{_REPO_PREFIX}/{name}"
135 def _ecr_client(self) -> Any:
136 """Return a boto3 ECR client targeting the manager's region."""
137 return boto3.client("ecr", region_name=self.region)
139 # ------------------------------------------------------------------
140 # Validation helpers
141 # ------------------------------------------------------------------
142 def _validate_context(self, context: str) -> Path:
143 """Validate the build context path.
145 The path must exist on disk and resolve to a directory. Raw
146 ``..`` segments in the supplied string are rejected outright
147 so the caller can't trick the manager into reaching outside an
148 intended workspace; the resolved path is then returned for use
149 as ``cwd`` of the build.
150 """
151 # Reject string-level traversal segments BEFORE resolving the
152 # path so callers receive a clear error rather than a silent
153 # rewrite up the tree.
154 parts = Path(context).parts
155 if ".." in parts:
156 raise ValueError(f"Invalid build context: path traversal not allowed: {context}")
157 resolved = Path(context).resolve()
158 if not resolved.exists():
159 raise FileNotFoundError(f"Build context not found: {context}")
160 if not resolved.is_dir():
161 raise ValueError(f"Build context is not a directory: {context}")
162 return resolved
164 def _validate_name(self, name: str) -> str:
165 """Validate an image name against ``_NAME_RE``."""
166 if not _NAME_RE.match(name):
167 raise ValueError(
168 f"Invalid image name: {name!r}. Expected lowercase letters, "
169 "digits, and dashes; must start with a letter; max 63 chars."
170 )
171 return name
173 def _validate_tag(self, tag: str) -> str:
174 """Validate an image tag against ``_TAG_RE``."""
175 if not _TAG_RE.match(tag):
176 raise ValueError(
177 f"Invalid image tag: {tag!r}. Expected alphanumerics, dots, "
178 "dashes, and underscores; max 128 chars."
179 )
180 return tag
182 # ------------------------------------------------------------------
183 # Default-value helpers
184 # ------------------------------------------------------------------
185 def _git_short_sha(self) -> str | None:
186 """Return the current short git SHA, or ``None`` when unavailable."""
187 try:
188 result = subprocess.run(
189 ["git", "rev-parse", "--short", "HEAD"],
190 capture_output=True,
191 text=True,
192 timeout=5,
193 check=False,
194 )
195 if result.returncode == 0 and result.stdout.strip():
196 return result.stdout.strip()
197 except (FileNotFoundError, subprocess.TimeoutExpired, OSError) as e:
198 logger.debug("git rev-parse failed: %s", e)
199 return None
201 def _default_tag(self) -> str:
202 """Return ``_git_short_sha()`` when available, else ``"latest"``."""
203 sha = self._git_short_sha()
204 return sha if sha else "latest"
206 def _default_lifecycle_policy(self) -> dict[str, Any]:
207 """Return the default ECR lifecycle policy as a dict.
209 The policy keeps the most recent ``_DEFAULT_KEEP_TAGGED`` tagged
210 images and expires untagged images after
211 ``_DEFAULT_EXPIRE_UNTAGGED_DAYS`` days. The structure matches
212 the JSON shape that ``ecr.put_lifecycle_policy`` accepts after
213 being JSON-stringified at the call site.
214 """
215 return {
216 "rules": [
217 {
218 "rulePriority": 1,
219 "description": (f"Keep last {_DEFAULT_KEEP_TAGGED} tagged images"),
220 "selection": {
221 "tagStatus": "tagged",
222 "countType": "imageCountMoreThan",
223 "countNumber": _DEFAULT_KEEP_TAGGED,
224 "tagPatternList": ["*"],
225 },
226 "action": {"type": "expire"},
227 },
228 {
229 "rulePriority": 2,
230 "description": (f"Expire untagged after {_DEFAULT_EXPIRE_UNTAGGED_DAYS} days"),
231 "selection": {
232 "tagStatus": "untagged",
233 "countType": "sinceImagePushed",
234 "countUnit": "days",
235 "countNumber": _DEFAULT_EXPIRE_UNTAGGED_DAYS,
236 },
237 "action": {"type": "expire"},
238 },
239 ],
240 }
242 # ------------------------------------------------------------------
243 # Output helpers
244 # ------------------------------------------------------------------
245 def _extract_digest(self, push_output: str) -> str | None:
246 """Pull the first ``sha256:...`` digest out of push stdout/stderr."""
247 match = _DIGEST_RE.search(push_output)
248 return match.group(0) if match else None
250 # ------------------------------------------------------------------
251 # ECR repository helpers (used by build/push)
252 # ------------------------------------------------------------------
253 def _runtime_or_error(self) -> str:
254 """Return the detected container runtime, or raise a friendly error."""
255 runtime = detect_container_runtime()
256 if not runtime:
257 raise RuntimeError(
258 "No container runtime found. Install Docker, Finch, or "
259 "Podman, or set CDK_DOCKER=<path>.\n"
260 " - Docker: https://docs.docker.com/get-docker/\n"
261 " - Finch: brew install finch && finch vm init\n"
262 " - Podman: https://podman.io/getting-started/installation"
263 )
264 return runtime
266 def _ecr_login(self, runtime: str) -> None:
267 """Authenticate the runtime against the ECR registry."""
268 ecr = self._ecr_client()
269 token = ecr.get_authorization_token()["authorizationData"][0]["authorizationToken"]
270 username, password = base64.b64decode(token).decode().split(":", 1)
271 registry = self._registry_host()
272 result = subprocess.run(
273 [runtime, "login", "-u", username, "--password-stdin", registry],
274 input=password.encode(),
275 capture_output=True,
276 check=False,
277 )
278 if result.returncode != 0:
279 raise RuntimeError(
280 f"{runtime} login to {registry} failed: "
281 f"{result.stderr.decode(errors='replace').strip()}"
282 )
284 def _check_tag_immutable_collision(self, name: str, tag: str) -> None:
285 """Block re-pushing a tag when the repo is immutable.
287 ECR repos can be configured with ``imageTagMutability=IMMUTABLE``,
288 in which case attempting to overwrite an existing tag silently
289 succeeds at build time but fails at push time with a confusing
290 error. Catch this earlier and surface a helpful message.
291 """
292 ecr = self._ecr_client()
293 repo_name = f"{_REPO_PREFIX}/{name}"
294 try:
295 repo_resp = ecr.describe_repositories(repositoryNames=[repo_name])
296 except ecr.exceptions.RepositoryNotFoundException:
297 return
298 except ClientError as e:
299 code = e.response.get("Error", {}).get("Code", "")
300 if code == "RepositoryNotFoundException":
301 return
302 raise
304 repos = repo_resp.get("repositories", [])
305 if not repos:
306 return
307 mutability = repos[0].get("imageTagMutability", "MUTABLE")
308 if mutability != "IMMUTABLE":
309 return
311 try:
312 existing = ecr.describe_images(
313 repositoryName=repo_name,
314 imageIds=[{"imageTag": tag}],
315 )
316 except ecr.exceptions.ImageNotFoundException:
317 return
318 except ClientError as e:
319 code = e.response.get("Error", {}).get("Code", "")
320 if code == "ImageNotFoundException":
321 return
322 raise
324 if existing.get("imageDetails"): 324 ↛ exitline 324 didn't return from function '_check_tag_immutable_collision' because the condition on line 324 was always true
325 raise RuntimeError(
326 f"Tag {tag!r} already exists on immutable repo "
327 f"{repo_name!r}. Re-run with a different tag, e.g. "
328 f"--tag <new_tag>."
329 )
331 def _apply_retain_tag(self, name: str) -> None:
332 """Apply the ``gco:retain=true`` resource tag to the repository."""
333 ecr = self._ecr_client()
334 ecr.tag_resource(
335 resourceArn=self._repo_arn(name),
336 tags=[{"Key": "gco:retain", "Value": "true"}],
337 )
339 # ------------------------------------------------------------------
340 # build / push
341 # ------------------------------------------------------------------
342 def build(
343 self,
344 context: str,
345 name: str,
346 tag: str | None = None,
347 dockerfile: str = "Dockerfile",
348 build_args: dict[str, str] | None = None,
349 platform: str = "linux/amd64",
350 retain: bool = False,
351 ) -> dict[str, Any]:
352 """Build a container image and push it to the project's ECR repo.
354 Args:
355 context: Build context directory.
356 name: Image name (validated; lowercase letters, digits, dashes).
357 tag: Image tag (defaults to git short SHA, else ``latest``).
358 dockerfile: Path to the Dockerfile, relative to ``context``.
359 build_args: Optional ``KEY=value`` build args.
360 platform: ``--platform`` argument for the build (default
361 ``linux/amd64``).
362 retain: When True, mark the repository with ``gco:retain=true``
363 so it survives stack destroys.
365 Returns:
366 ``{"image_uri", "digest", "size_bytes", ...}``.
367 """
368 ctx = self._validate_context(context)
369 validated_name = self._validate_name(name)
370 validated_tag = self._validate_tag(tag if tag is not None else self._default_tag())
372 df_path = (ctx / dockerfile).resolve()
373 if not df_path.exists() or not df_path.is_file(): 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 raise FileNotFoundError(f"Dockerfile not found: {df_path} (relative to {ctx})")
375 # Confine the Dockerfile to the build context.
376 if not str(df_path).startswith(str(ctx)): 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 raise ValueError(f"Dockerfile must live inside the build context: {df_path}")
379 runtime = self._runtime_or_error()
380 self.init(name, retain=retain)
381 self._check_tag_immutable_collision(validated_name, validated_tag)
382 self._ecr_login(runtime)
384 full_uri = f"{self._registry_host()}/{_REPO_PREFIX}/{validated_name}:{validated_tag}"
386 build_cmd: list[str] = [
387 runtime,
388 "build",
389 "-t",
390 full_uri,
391 "--platform",
392 platform,
393 "-f",
394 str(df_path),
395 ]
396 for key, value in (build_args or {}).items(): 396 ↛ 397line 396 didn't jump to line 397 because the loop on line 396 never started
397 build_cmd.extend(["--build-arg", f"{key}={value}"])
398 build_cmd.append(str(ctx))
400 logger.info("Building image: %s", " ".join(build_cmd))
401 subprocess.run(build_cmd, check=True, cwd=str(ctx))
403 push_result = subprocess.run(
404 [runtime, "push", full_uri],
405 capture_output=True,
406 text=True,
407 check=True,
408 cwd=str(ctx),
409 )
410 digest = self._extract_digest((push_result.stdout or "") + (push_result.stderr or ""))
412 if retain: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true
413 self._apply_retain_tag(validated_name)
415 size_bytes = self._image_size_bytes(validated_name, validated_tag)
417 return {
418 "image_uri": full_uri,
419 "digest": digest,
420 "size_bytes": size_bytes,
421 "runtime": runtime,
422 "repository": f"{_REPO_PREFIX}/{validated_name}",
423 "tag": validated_tag,
424 "region": self.region,
425 "retain": retain,
426 }
428 def push(
429 self,
430 name: str,
431 tag: str,
432 local_image: str,
433 retain: bool = False,
434 ) -> dict[str, Any]:
435 """Push an already-built local image to the project's ECR repo.
437 Tags ``local_image`` as the project URI before invoking
438 ``<runtime> push``. Skips the build step but otherwise mirrors
439 ``build`` (init repo, login, push, optional retain tag).
440 """
441 validated_name = self._validate_name(name)
442 validated_tag = self._validate_tag(tag)
443 if not local_image:
444 raise ValueError("local_image must be a non-empty image reference")
446 runtime = self._runtime_or_error()
447 self.init(name, retain=retain)
448 self._check_tag_immutable_collision(validated_name, validated_tag)
449 self._ecr_login(runtime)
451 full_uri = f"{self._registry_host()}/{_REPO_PREFIX}/{validated_name}:{validated_tag}"
453 subprocess.run([runtime, "tag", local_image, full_uri], check=True)
454 push_result = subprocess.run(
455 [runtime, "push", full_uri],
456 capture_output=True,
457 text=True,
458 check=True,
459 )
460 digest = self._extract_digest((push_result.stdout or "") + (push_result.stderr or ""))
462 if retain: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true
463 self._apply_retain_tag(validated_name)
465 size_bytes = self._image_size_bytes(validated_name, validated_tag)
467 return {
468 "image_uri": full_uri,
469 "digest": digest,
470 "size_bytes": size_bytes,
471 "runtime": runtime,
472 "repository": f"{_REPO_PREFIX}/{validated_name}",
473 "tag": validated_tag,
474 "region": self.region,
475 "retain": retain,
476 }
478 def _image_size_bytes(self, name: str, tag: str) -> int | None:
479 """Best-effort ECR lookup for the pushed image size."""
480 ecr = self._ecr_client()
481 try:
482 resp = ecr.describe_images(
483 repositoryName=f"{_REPO_PREFIX}/{name}",
484 imageIds=[{"imageTag": tag}],
485 )
486 details = resp.get("imageDetails", [])
487 if details: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true
488 size = details[0].get("imageSizeInBytes")
489 if isinstance(size, int):
490 return size
491 except Exception as e: # noqa: BLE001
492 logger.debug("describe_images for size lookup failed: %s", e)
493 return None
495 # ------------------------------------------------------------------
496 # Read-only methods
497 # ------------------------------------------------------------------
498 def list_repos(self) -> list[dict[str, Any]]:
499 """List every repository under the project's ``gco/`` prefix."""
500 ecr = self._ecr_client()
501 repos: list[dict[str, Any]] = []
502 paginator = ecr.get_paginator("describe_repositories")
503 for page in paginator.paginate():
504 for repo in page.get("repositories", []):
505 repo_name = repo.get("repositoryName", "")
506 if not repo_name.startswith(f"{_REPO_PREFIX}/"):
507 continue
508 image_count = self._image_count(repo_name)
509 repos.append(
510 {
511 "name": repo_name,
512 "arn": repo.get("repositoryArn"),
513 "uri": repo.get("repositoryUri"),
514 "created_at": _isoformat(repo.get("createdAt")),
515 "image_count": image_count,
516 "tag_mutability": repo.get("imageTagMutability"),
517 }
518 )
519 return repos
521 def _image_count(self, repository_name: str) -> int:
522 """Best-effort count of images in a repository."""
523 ecr = self._ecr_client()
524 try:
525 count = 0
526 paginator = ecr.get_paginator("describe_images")
527 for page in paginator.paginate(repositoryName=repository_name):
528 count += len(page.get("imageDetails", []))
529 return count
530 except Exception as e: # noqa: BLE001
531 logger.debug("describe_images count for %s failed: %s", repository_name, e)
532 return 0
534 def list_tags(self, name: str) -> list[dict[str, Any]]:
535 """List every tag (with digest, pushed date, size) on a repository."""
536 validated = self._validate_name(name)
537 ecr = self._ecr_client()
538 rows: list[dict[str, Any]] = []
539 paginator = ecr.get_paginator("describe_images")
540 for page in paginator.paginate(
541 repositoryName=f"{_REPO_PREFIX}/{validated}",
542 ):
543 for detail in page.get("imageDetails", []):
544 for tag in detail.get("imageTags", []) or [None]:
545 rows.append(
546 {
547 "tag": tag,
548 "digest": detail.get("imageDigest"),
549 "pushed_at": _isoformat(detail.get("imagePushedAt")),
550 "size_bytes": detail.get("imageSizeInBytes"),
551 }
552 )
553 return rows
555 def describe(self, name: str, tag: str) -> dict[str, Any]:
556 """Return the full ECR image details for a single tag."""
557 validated_name = self._validate_name(name)
558 validated_tag = self._validate_tag(tag)
559 ecr = self._ecr_client()
560 resp = ecr.describe_images(
561 repositoryName=f"{_REPO_PREFIX}/{validated_name}",
562 imageIds=[{"imageTag": validated_tag}],
563 )
564 details = resp.get("imageDetails", [])
565 if not details:
566 return {}
567 detail = details[0]
568 return {
569 "name": f"{_REPO_PREFIX}/{validated_name}",
570 "tag": validated_tag,
571 "digest": detail.get("imageDigest"),
572 "pushed_at": _isoformat(detail.get("imagePushedAt")),
573 "size_bytes": detail.get("imageSizeInBytes"),
574 "tags": detail.get("imageTags", []),
575 "scan_findings_summary": detail.get("imageScanFindingsSummary"),
576 }
578 def get_uri(self, name: str, tag: str = "latest") -> str:
579 """Return the full registry URI for ``name:tag``. No API call."""
580 validated_name = self._validate_name(name)
581 validated_tag = self._validate_tag(tag)
582 return f"{self._registry_host()}/{_REPO_PREFIX}/{validated_name}:{validated_tag}"
584 def replication_get(self) -> dict[str, Any]:
585 """Return the current ECR replication configuration, or ``{}``."""
586 ecr = self._ecr_client()
587 try:
588 resp = ecr.get_registry_policy()
589 policy_text = resp.get("policyText")
590 if policy_text: 590 ↛ 600line 590 didn't jump to line 600 because the condition on line 590 was always true
591 return {
592 "registryId": resp.get("registryId"),
593 "policy": json.loads(policy_text),
594 }
595 except ClientError as e:
596 code = e.response.get("Error", {}).get("Code", "")
597 if code in ("RegistryPolicyNotFoundException",):
598 return {}
599 raise
600 return {}
602 def replication_status(self) -> list[dict[str, Any]]:
603 """Per-repo replication status across the project repos."""
604 ecr = self._ecr_client()
605 rows: list[dict[str, Any]] = []
606 for repo in self.list_repos():
607 repo_name = repo["name"]
608 paginator = ecr.get_paginator("describe_images")
609 try:
610 for page in paginator.paginate(repositoryName=repo_name):
611 for detail in page.get("imageDetails", []):
612 digest = detail.get("imageDigest")
613 try:
614 status = ecr.describe_image_replication_status(
615 repositoryName=repo_name,
616 imageId={"imageDigest": digest},
617 )
618 for entry in status.get("replicationStatuses", []):
619 rows.append(
620 {
621 "repository": repo_name,
622 "digest": digest,
623 "region": entry.get("region"),
624 "status": entry.get("status"),
625 "registry_id": entry.get("registryId"),
626 }
627 )
628 except (ClientError, AttributeError) as e:
629 logger.debug(
630 "describe_image_replication_status failed for %s %s: %s",
631 repo_name,
632 digest,
633 e,
634 )
635 except ClientError as e:
636 logger.debug("describe_images failed for %s: %s", repo_name, e)
637 return rows
639 # ------------------------------------------------------------------
640 # Administrative methods
641 # ------------------------------------------------------------------
642 def init(self, name: str, retain: bool = False) -> dict[str, Any]:
643 """Create the project repository idempotently with default lifecycle.
645 ``CreateRepository`` is invoked with ``imageTagMutability=MUTABLE``
646 and ``scanOnPush=True``. If the repository already exists, the
647 method becomes a no-op for repository creation but still applies
648 the default lifecycle policy and the optional ``gco:retain`` tag.
649 """
650 validated = self._validate_name(name)
651 repo_name = f"{_REPO_PREFIX}/{validated}"
652 ecr = self._ecr_client()
654 created = False
655 try:
656 ecr.create_repository(
657 repositoryName=repo_name,
658 imageTagMutability="MUTABLE",
659 imageScanningConfiguration={"scanOnPush": True},
660 tags=[
661 {"Key": "Project", "Value": self.config.project_name},
662 ],
663 )
664 created = True
665 except ecr.exceptions.RepositoryAlreadyExistsException:
666 # Idempotent init — re-running ``gco images init`` against an
667 # already-provisioned repo is a no-op for create_repository.
668 # We still flow through the lifecycle/retain blocks below so
669 # any drift in policy is healed on every call.
670 logger.debug("repository %s already exists; skipping create", repo_name)
671 except ClientError as e:
672 code = e.response.get("Error", {}).get("Code", "")
673 if code != "RepositoryAlreadyExistsException": 673 ↛ 676line 673 didn't jump to line 676 because the condition on line 673 was always true
674 raise
676 try:
677 ecr.put_lifecycle_policy(
678 repositoryName=repo_name,
679 lifecyclePolicyText=json.dumps(self._default_lifecycle_policy()),
680 )
681 except ClientError as e:
682 logger.debug("put_lifecycle_policy on %s failed: %s", repo_name, e)
684 if retain:
685 try:
686 self._apply_retain_tag(validated)
687 except ClientError as e:
688 logger.debug("apply retain tag on %s failed: %s", repo_name, e)
690 return {
691 "name": repo_name,
692 "created": created,
693 "retain": retain,
694 }
696 def lifecycle_get(self, name: str) -> dict[str, Any]:
697 """Return the lifecycle policy on a repository, or ``{}``."""
698 validated = self._validate_name(name)
699 ecr = self._ecr_client()
700 try:
701 resp = ecr.get_lifecycle_policy(
702 repositoryName=f"{_REPO_PREFIX}/{validated}",
703 )
704 policy_text = resp.get("lifecyclePolicyText")
705 if policy_text: 705 ↛ 717line 705 didn't jump to line 717 because the condition on line 705 was always true
706 return {
707 "name": f"{_REPO_PREFIX}/{validated}",
708 "policy": json.loads(policy_text),
709 }
710 except ecr.exceptions.LifecyclePolicyNotFoundException:
711 return {}
712 except ClientError as e:
713 code = e.response.get("Error", {}).get("Code", "")
714 if code == "LifecyclePolicyNotFoundException": 714 ↛ 715line 714 didn't jump to line 715 because the condition on line 714 was never true
715 return {}
716 raise
717 return {}
719 def lifecycle_set(self, name: str, policy: dict[str, Any]) -> dict[str, Any]:
720 """Replace the lifecycle policy on a repository."""
721 validated = self._validate_name(name)
722 ecr = self._ecr_client()
723 resp = ecr.put_lifecycle_policy(
724 repositoryName=f"{_REPO_PREFIX}/{validated}",
725 lifecyclePolicyText=json.dumps(policy),
726 )
727 return {
728 "name": f"{_REPO_PREFIX}/{validated}",
729 "registry_id": resp.get("registryId"),
730 "policy": policy,
731 }
733 def replication_sync(self) -> dict[str, Any]:
734 """Apply the project's standard replication rule.
736 Replicates ``gco/*`` to every region in ``config.regions`` (when
737 the config exposes one) — the rule object mirrors what the
738 global stack provisions so the two stay aligned.
739 """
740 ecr = self._ecr_client()
741 regions = list(getattr(self.config, "regions", []) or [])
742 # Don't replicate to the source region itself.
743 destinations = [r for r in regions if r != self.region]
745 account = self._account_id()
746 rule = {
747 "destinations": [{"region": r, "registryId": account} for r in destinations],
748 "repositoryFilters": [{"filter": f"{_REPO_PREFIX}/", "filterType": "PREFIX_MATCH"}],
749 }
750 config = {"rules": [rule]} if destinations else {"rules": []}
751 resp = ecr.put_replication_configuration(replicationConfiguration=config)
752 return {
753 "configuration": config,
754 "destinations": destinations,
755 "registry_id": resp.get("replicationConfiguration", {}),
756 }
758 # ------------------------------------------------------------------
759 # Destructive methods
760 # ------------------------------------------------------------------
761 def delete_tag(self, name: str, tag: str) -> dict[str, Any]:
762 """Delete a single tag from a repository."""
763 validated_name = self._validate_name(name)
764 validated_tag = self._validate_tag(tag)
765 ecr = self._ecr_client()
766 resp = ecr.batch_delete_image(
767 repositoryName=f"{_REPO_PREFIX}/{validated_name}",
768 imageIds=[{"imageTag": validated_tag}],
769 )
770 return {
771 "name": f"{_REPO_PREFIX}/{validated_name}",
772 "tag": validated_tag,
773 "deleted": [
774 {"digest": d.get("imageDigest"), "tag": d.get("imageTag")}
775 for d in resp.get("imageIds", [])
776 ],
777 "failures": resp.get("failures", []),
778 }
780 def delete_repo(self, name: str, force: bool = False) -> dict[str, Any]:
781 """Delete a repository (optionally including its images)."""
782 validated = self._validate_name(name)
783 ecr = self._ecr_client()
784 resp = ecr.delete_repository(
785 repositoryName=f"{_REPO_PREFIX}/{validated}",
786 force=force,
787 )
788 return {
789 "name": f"{_REPO_PREFIX}/{validated}",
790 "deleted": True,
791 "registry_id": resp.get("repository", {}).get("registryId"),
792 }
794 def cleanup(
795 self,
796 name: str | None = None,
797 all: bool = False,
798 ) -> dict[str, Any]:
799 """Delete every untagged image across one or all project repos."""
800 if not name and not all:
801 raise ValueError("cleanup() requires either a name or all=True")
803 repos: list[str]
804 if name:
805 validated = self._validate_name(name)
806 repos = [f"{_REPO_PREFIX}/{validated}"]
807 else:
808 repos = [r["name"] for r in self.list_repos()]
810 ecr = self._ecr_client()
811 repos_touched = 0
812 tags_deleted = 0
813 bytes_freed = 0
815 for repo_name in repos:
816 untagged_ids: list[dict[str, str]] = []
817 untagged_size = 0
818 try:
819 paginator = ecr.get_paginator("describe_images")
820 for page in paginator.paginate(
821 repositoryName=repo_name,
822 filter={"tagStatus": "UNTAGGED"},
823 ):
824 for detail in page.get("imageDetails", []):
825 digest = detail.get("imageDigest")
826 if not digest:
827 continue
828 untagged_ids.append({"imageDigest": digest})
829 size = detail.get("imageSizeInBytes") or 0
830 if isinstance(size, int): 830 ↛ 824line 830 didn't jump to line 824 because the condition on line 830 was always true
831 untagged_size += size
832 except ClientError as e:
833 logger.debug("describe_images for cleanup of %s failed: %s", repo_name, e)
834 continue
836 if not untagged_ids: 836 ↛ 837line 836 didn't jump to line 837 because the condition on line 836 was never true
837 continue
838 repos_touched += 1
839 # batch_delete_image accepts up to 100 ids per call.
840 for chunk_start in range(0, len(untagged_ids), 100):
841 chunk = untagged_ids[chunk_start : chunk_start + 100]
842 resp = ecr.batch_delete_image(
843 repositoryName=repo_name,
844 imageIds=chunk,
845 )
846 tags_deleted += len(resp.get("imageIds", []))
847 bytes_freed += untagged_size
849 return {
850 "repos_touched": repos_touched,
851 "tags_deleted": tags_deleted,
852 "bytes_freed": bytes_freed,
853 }
855 def prune(self, dry_run: bool = True) -> dict[str, Any]:
856 """Remove untagged images older than 30 days.
858 Returns the same shape as ``cleanup``; when ``dry_run`` is True
859 (the default), no images are deleted.
860 """
861 cutoff = datetime.now(UTC) - timedelta(days=30)
862 ecr = self._ecr_client()
863 repos_touched = 0
864 tags_deleted = 0
865 bytes_freed = 0
867 for repo in self.list_repos():
868 repo_name = repo["name"]
869 stale_ids: list[dict[str, str]] = []
870 stale_size = 0
871 try:
872 paginator = ecr.get_paginator("describe_images")
873 for page in paginator.paginate(
874 repositoryName=repo_name,
875 filter={"tagStatus": "UNTAGGED"},
876 ):
877 for detail in page.get("imageDetails", []):
878 pushed = detail.get("imagePushedAt")
879 if pushed and pushed >= cutoff:
880 continue
881 digest = detail.get("imageDigest")
882 if not digest: 882 ↛ 883line 882 didn't jump to line 883 because the condition on line 882 was never true
883 continue
884 stale_ids.append({"imageDigest": digest})
885 size = detail.get("imageSizeInBytes") or 0
886 if isinstance(size, int): 886 ↛ 877line 886 didn't jump to line 877 because the condition on line 886 was always true
887 stale_size += size
888 except ClientError as e:
889 logger.debug("describe_images for prune of %s failed: %s", repo_name, e)
890 continue
892 if not stale_ids:
893 continue
894 repos_touched += 1
895 tags_deleted += len(stale_ids)
896 bytes_freed += stale_size
897 if dry_run:
898 continue
899 for chunk_start in range(0, len(stale_ids), 100):
900 chunk = stale_ids[chunk_start : chunk_start + 100]
901 ecr.batch_delete_image(
902 repositoryName=repo_name,
903 imageIds=chunk,
904 )
906 return {
907 "dry_run": dry_run,
908 "repos_touched": repos_touched,
909 "tags_deleted": tags_deleted,
910 "bytes_freed": bytes_freed,
911 }
913 def orphans(self, threshold_days: int = 30) -> list[dict[str, Any]]:
914 """List ``gco/*`` tags older than ``threshold_days`` with no references.
916 Cross-references against:
917 * inference endpoint specs (via :class:`cli.inference.InferenceManager`),
918 * recent jobs (best-effort; returns empty for the jobs side when
919 the queue table schema is unavailable).
920 """
921 cutoff = datetime.now(UTC) - timedelta(days=threshold_days)
922 referenced: set[str] = set()
923 referenced.update(self._collect_inference_image_refs())
924 referenced.update(self._collect_recent_job_image_refs(threshold_days))
926 rows: list[dict[str, Any]] = []
927 for repo in self.list_repos():
928 repo_name = repo["name"]
929 for tag_row in self.list_tags(repo_name.removeprefix(f"{_REPO_PREFIX}/")):
930 tag = tag_row.get("tag")
931 if not tag: 931 ↛ 932line 931 didn't jump to line 932 because the condition on line 931 was never true
932 continue
933 pushed = self._parse_iso(tag_row.get("pushed_at"))
934 if pushed and pushed >= cutoff:
935 continue
936 uri = f"{self._registry_host()}/{repo_name}:{tag}"
937 if uri in referenced:
938 continue
939 rows.append(
940 {
941 "repository": repo_name,
942 "tag": tag,
943 "digest": tag_row.get("digest"),
944 "pushed_at": tag_row.get("pushed_at"),
945 "uri": uri,
946 }
947 )
948 return rows
950 def _collect_inference_image_refs(self) -> set[str]:
951 """Return every image URI referenced by a registered inference endpoint."""
952 try:
953 from .inference import InferenceManager
954 except Exception as e: # noqa: BLE001
955 logger.debug("InferenceManager unavailable: %s", e)
956 return set()
957 try:
958 manager = InferenceManager(self.config)
959 endpoints = manager.list_endpoints()
960 except Exception as e: # noqa: BLE001
961 logger.debug("list_endpoints failed: %s", e)
962 return set()
963 refs: set[str] = set()
964 for ep in endpoints or []:
965 spec = ep.get("spec") or {}
966 image = spec.get("image") if isinstance(spec, dict) else None
967 if image:
968 refs.add(image)
969 canary = spec.get("canary") if isinstance(spec, dict) else None
970 if isinstance(canary, dict) and canary.get("image"):
971 refs.add(canary["image"])
972 return refs
974 def _collect_recent_job_image_refs(self, threshold_days: int = 30) -> set[str]:
975 """Return image URIs referenced by jobs newer than ``threshold_days``.
977 Walks every deployed region via :class:`cli.jobs.JobManager` and
978 unions the ``image_refs`` field on each ``JobInfo`` whose
979 ``created_time`` is within the cutoff. Treats jobs without a
980 ``created_time`` as in-window so a freshly-submitted job that
981 hasn't yet been picked up by the cluster's status loop isn't
982 accidentally considered orphaned.
984 Best-effort: any per-region failure is logged at debug and
985 skipped so the orphan scan still completes against the
986 regions that did respond. The deferred import breaks an
987 otherwise-circular ``cli.images`` ↔ ``cli.jobs`` dependency.
988 """
989 try:
990 from .jobs import JobManager
991 except Exception as e: # noqa: BLE001
992 logger.debug("JobManager unavailable: %s", e)
993 return set()
995 try:
996 manager = JobManager(self.config)
997 except Exception as e: # noqa: BLE001
998 logger.debug("JobManager init failed: %s", e)
999 return set()
1001 try:
1002 jobs = manager.list_jobs(all_regions=True)
1003 except Exception as e: # noqa: BLE001
1004 logger.debug("list_jobs(all_regions=True) failed: %s", e)
1005 return set()
1007 cutoff = datetime.now(UTC) - timedelta(days=threshold_days)
1008 refs: set[str] = set()
1009 for job in jobs or []:
1010 created = getattr(job, "created_time", None)
1011 if isinstance(created, datetime):
1012 created_aware = created if created.tzinfo else created.replace(tzinfo=UTC)
1013 if created_aware < cutoff:
1014 continue
1015 image_refs = getattr(job, "image_refs", None) or []
1016 for ref in image_refs:
1017 if isinstance(ref, str) and ref:
1018 refs.add(ref)
1019 return refs
1021 @staticmethod
1022 def _parse_iso(value: Any) -> datetime | None:
1023 """Parse an ISO-8601 string into a tz-aware datetime, else None."""
1024 if isinstance(value, datetime):
1025 return value if value.tzinfo else value.replace(tzinfo=UTC)
1026 if not isinstance(value, str):
1027 return None
1028 try:
1029 parsed = datetime.fromisoformat(value)
1030 except ValueError:
1031 return None
1032 return parsed if parsed.tzinfo else parsed.replace(tzinfo=UTC)
1035def _isoformat(value: Any) -> str | None:
1036 """Return ISO-8601 form of a datetime, or pass-through for strings."""
1037 if value is None:
1038 return None
1039 if isinstance(value, datetime):
1040 return value.isoformat()
1041 return str(value)
1044def get_image_manager(config: GCOConfig | None = None, region: str | None = None) -> ImageManager:
1045 """Factory function for ``ImageManager``."""
1046 return ImageManager(config=config, region=region)