Coverage for mcp/tools/images.py: 78%
114 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Container image registry MCP tools.
3All tools wrap ``cli/images.py::ImageManager`` so the MCP layer never
4re-implements the underlying ECR/runtime logic. Read-only and
5administrative tools are unconditional. Build/push tools register only
6when ``GCO_ENABLE_IMAGE_PUBLISH`` is set; destructive tools register
7only when ``GCO_ENABLE_DESTRUCTIVE_OPERATIONS`` is set.
8"""
10from __future__ import annotations
12import asyncio
13import json
14from typing import Any
16from audit import audit_logged
17from feature_flags import (
18 FLAG_DESTRUCTIVE_OPERATIONS,
19 FLAG_IMAGE_PUBLISH,
20 is_enabled,
21)
22from server import mcp
24from tools._long_task import _run_long_task
26# FastMCP's Progress / Context dependencies are optional from this
27# module's perspective — when ``fastmcp[tasks]`` is reachable they
28# inject real instances per call; otherwise the gated build/push tools
29# still register but rely on caller-provided fakes (the test path).
30try:
31 from fastmcp.server.dependencies import CurrentContext, Progress
32except ImportError: # pragma: no cover - degraded fastmcp install
33 CurrentContext = None # type: ignore[assignment]
34 Progress = None # type: ignore[misc,assignment]
36# TaskConfig is best-effort wired so MCP clients that opt into the task
37# protocol can run build/push as background tasks. If the import path
38# moves between fastmcp versions, the tools register without it.
39try:
40 from fastmcp.server.tasks.config import TaskConfig
42 _TASK_CONFIG_OPTIONAL: Any = TaskConfig(mode="optional")
43except ImportError: # pragma: no cover - degraded fastmcp install
44 _TASK_CONFIG_OPTIONAL = None
47def _get_manager() -> Any:
48 """Lazy-import ``cli.images.get_image_manager`` so MCP server
49 import doesn't pull boto3 prematurely.
50 """
51 from cli.images import get_image_manager
53 return get_image_manager()
56# =============================================================================
57# Read-only tools — Risk_Tier "safe"
58# =============================================================================
61@mcp.tool(tags={"safe", "images"})
62@audit_logged
63async def images_list() -> str:
64 """`gco images list` — list every gco/* repository in ECR."""
65 return await asyncio.to_thread(lambda: json.dumps(_get_manager().list_repos()))
68@mcp.tool(tags={"safe", "images"})
69@audit_logged
70async def images_tags(name: str) -> str:
71 """`gco images tags` — list tags within a repository.
73 Args:
74 name: Repository name (without the ``gco/`` prefix).
75 """
76 return await asyncio.to_thread(lambda: json.dumps(_get_manager().list_tags(name)))
79@mcp.tool(tags={"safe", "images"})
80@audit_logged
81async def images_describe(name: str, tag: str) -> str:
82 """`gco images describe` — full ECR details for a single image tag.
84 Args:
85 name: Repository name (without the ``gco/`` prefix).
86 tag: Image tag.
87 """
88 return await asyncio.to_thread(lambda: json.dumps(_get_manager().describe(name, tag)))
91@mcp.tool(tags={"safe", "images"})
92@audit_logged
93async def images_uri(name: str, tag: str = "latest") -> str:
94 """`gco images uri` — return the registry URI for an image. No AWS calls.
96 Args:
97 name: Repository name (without the ``gco/`` prefix).
98 tag: Image tag. Defaults to ``latest``.
99 """
100 return await asyncio.to_thread(
101 lambda: json.dumps({"uri": _get_manager().get_uri(name, tag=tag)})
102 )
105@mcp.tool(tags={"safe", "images"})
106@audit_logged
107async def images_replication_get() -> str:
108 """`gco images replication get` — current ECR replication configuration."""
109 return await asyncio.to_thread(lambda: json.dumps(_get_manager().replication_get()))
112@mcp.tool(tags={"safe", "images"})
113@audit_logged
114async def images_replication_status() -> str:
115 """`gco images replication status` — per-image replication status across project repos."""
116 return await asyncio.to_thread(lambda: json.dumps(_get_manager().replication_status()))
119@mcp.tool(tags={"safe", "images"})
120@audit_logged
121async def images_orphans(threshold_days: int = 30) -> str:
122 """`gco images orphans` — list gco/* tags older than ``threshold_days`` with no references.
124 Args:
125 threshold_days: Age threshold in days. Defaults to 30.
126 """
127 return await asyncio.to_thread(
128 lambda: json.dumps(_get_manager().orphans(threshold_days=threshold_days))
129 )
132# =============================================================================
133# Administrative tools — Risk_Tier "low-risk"
134# =============================================================================
137@mcp.tool(tags={"low-risk", "images"})
138@audit_logged
139async def images_init(name: str, retain: bool = False) -> str:
140 """`gco images init` — create the project ECR repo idempotently with default lifecycle.
142 Args:
143 name: Repository name (without the ``gco/`` prefix).
144 retain: When True, mark the repository with ``gco:retain=true`` so it
145 survives stack destroys.
146 """
147 return await asyncio.to_thread(lambda: json.dumps(_get_manager().init(name, retain=retain)))
150@mcp.tool(tags={"low-risk", "images"})
151@audit_logged
152async def images_lifecycle_get(name: str) -> str:
153 """`gco images lifecycle get` — print the lifecycle policy on a repository.
155 Args:
156 name: Repository name (without the ``gco/`` prefix).
157 """
158 return await asyncio.to_thread(lambda: json.dumps(_get_manager().lifecycle_get(name)))
161@mcp.tool(tags={"low-risk", "images"})
162@audit_logged
163async def images_lifecycle_set(name: str, policy: dict[str, Any]) -> str:
164 """`gco images lifecycle set` — replace the lifecycle policy on a repository.
166 Args:
167 name: Repository name (without the ``gco/`` prefix).
168 policy: ECR lifecycle policy document as a dict.
169 """
170 return await asyncio.to_thread(lambda: json.dumps(_get_manager().lifecycle_set(name, policy)))
173@mcp.tool(tags={"low-risk", "images"})
174@audit_logged
175async def images_replication_sync() -> str:
176 """`gco images replication sync` — apply the standard gco/* replication rule."""
177 return await asyncio.to_thread(lambda: json.dumps(_get_manager().replication_sync()))
180# =============================================================================
181# Image publish — gated by GCO_ENABLE_IMAGE_PUBLISH
182# =============================================================================
183#
184# build/push are long-running data-upload operations. They run via
185# ``_run_long_task`` so progress messages stream back through the
186# FastMCP Progress dependency.
188if is_enabled(FLAG_IMAGE_PUBLISH):
189 # Build the decorator kwargs dict so we only pass ``task=...`` when
190 # TaskConfig was importable on this fastmcp version.
191 _publish_decorator_kwargs: dict[str, Any] = {"tags": {"image", "images"}}
192 if _TASK_CONFIG_OPTIONAL is not None: 192 ↛ 195line 192 didn't jump to line 195 because the condition on line 192 was always true
193 _publish_decorator_kwargs["task"] = _TASK_CONFIG_OPTIONAL
195 if Progress is not None and CurrentContext is not None: 195 ↛ 273line 195 didn't jump to line 273 because the condition on line 195 was always true
197 @mcp.tool(**_publish_decorator_kwargs) # type: ignore[untyped-decorator]
198 @audit_logged
199 async def images_build(
200 context: str,
201 name: str,
202 tag: str | None = None,
203 dockerfile: str = "Dockerfile",
204 platform: str = "linux/amd64",
205 retain: bool = False,
206 *,
207 ctx: Any = CurrentContext(),
208 progress: Any = Progress(),
209 ) -> str:
210 """[gated by GCO_ENABLE_IMAGE_PUBLISH] long-running, data-upload.
212 `gco images build` — build a container image and push to ECR.
214 Args:
215 context: Build context directory.
216 name: Image name (lowercase letters, digits, dashes; max 63 chars).
217 tag: Image tag (defaults to git short SHA, else ``latest``).
218 dockerfile: Path to the Dockerfile, relative to ``context``.
219 platform: ``--platform`` argument for the build.
220 retain: When True, mark the repository with ``gco:retain=true``
221 so it survives stack destroys.
222 """
223 argv = ["gco", "images", "build", context, "--name", name]
224 if tag:
225 argv += ["--tag", tag]
226 argv += ["--dockerfile", dockerfile, "--platform", platform]
227 if retain:
228 argv.append("--retain")
229 return await _run_long_task(argv, ctx=ctx, progress=progress, is_stack_op=False)
231 @mcp.tool(**_publish_decorator_kwargs) # type: ignore[untyped-decorator]
232 @audit_logged
233 async def images_push(
234 name: str,
235 tag: str,
236 local_image: str,
237 retain: bool = False,
238 *,
239 ctx: Any = CurrentContext(),
240 progress: Any = Progress(),
241 ) -> str:
242 """[gated by GCO_ENABLE_IMAGE_PUBLISH] long-running, data-upload.
244 `gco images push` — push an already-built local image to the project ECR repo.
246 Args:
247 name: Image name (lowercase letters, digits, dashes; max 63 chars).
248 tag: Image tag.
249 local_image: Source image reference on the local container runtime.
250 retain: When True, mark the repository with ``gco:retain=true``
251 so it survives stack destroys.
252 """
253 argv = [
254 "gco",
255 "images",
256 "push",
257 name,
258 "--tag",
259 tag,
260 "--local-image",
261 local_image,
262 ]
263 if retain:
264 argv.append("--retain")
265 return await _run_long_task(argv, ctx=ctx, progress=progress, is_stack_op=False)
268# =============================================================================
269# Destructive image tools — gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS
270# =============================================================================
273async def _ctx_warning(message: str) -> None:
274 """Emit ``ctx.warning(...)`` from inside a tool body, no-op when no Context.
276 Tools wrapped here are short-lived enough that we don't need the full
277 ``_run_long_task`` stack — we just want operators (and the audit log)
278 to see a warning when destructive work runs.
279 """
280 import contextlib as _contextlib
282 try:
283 from fastmcp.server.dependencies import get_context
285 ctx = get_context()
286 except Exception:
287 return
288 with _contextlib.suppress(Exception):
289 await ctx.warning(message)
292if is_enabled(FLAG_DESTRUCTIVE_OPERATIONS):
294 @mcp.tool(tags={"destructive", "images"})
295 @audit_logged
296 async def images_delete_tag(name: str, tag: str) -> str:
297 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive.
299 `gco images delete-tag` — delete a single tag from a repository.
300 Cannot be undone — the image manifest is removed from ECR.
302 Args:
303 name: Repository name (without the ``gco/`` prefix).
304 tag: Image tag to delete.
305 """
306 await _ctx_warning(f"Deleting tag {tag!r} from gco/{name} — this cannot be undone.")
307 return await asyncio.to_thread(lambda: json.dumps(_get_manager().delete_tag(name, tag)))
309 @mcp.tool(tags={"destructive", "images"})
310 @audit_logged
311 async def images_delete_repo(name: str, force: bool = False) -> str:
312 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive.
314 `gco images delete-repo` — delete a whole repository.
315 Cannot be undone — the repo and (when ``force=True``) every image
316 inside it are permanently removed from ECR.
318 Args:
319 name: Repository name (without the ``gco/`` prefix).
320 force: When True, also delete every image inside the repo.
321 """
322 await _ctx_warning(
323 f"Deleting repository gco/{name} (force={force}) — this cannot be undone."
324 )
325 return await asyncio.to_thread(
326 lambda: json.dumps(_get_manager().delete_repo(name, force=force))
327 )
329 @mcp.tool(tags={"destructive", "images"})
330 @audit_logged
331 async def images_cleanup(name: str | None = None, all: bool = False) -> str:
332 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive.
334 `gco images cleanup` — remove every untagged image across one or all project repos.
335 Cannot be undone — untagged image manifests are permanently deleted.
337 Args:
338 name: Repository name to clean (without the ``gco/`` prefix). Required
339 unless ``all=True``.
340 all: When True, clean every project repository.
341 """
342 scope = "all repos" if all else f"gco/{name}"
343 await _ctx_warning(f"Cleaning untagged images from {scope} — this cannot be undone.")
344 return await asyncio.to_thread(
345 lambda: json.dumps(_get_manager().cleanup(name=name, all=all))
346 )
348 @mcp.tool(tags={"destructive", "images"})
349 @audit_logged
350 async def images_prune(dry_run: bool = True) -> str:
351 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive.
353 `gco images prune` — remove untagged images older than 30 days.
354 Cannot be undone when ``dry_run=False``; the matching image manifests
355 are permanently deleted.
357 Args:
358 dry_run: When True (default), report what would be deleted without
359 deleting anything.
360 """
361 if not dry_run:
362 await _ctx_warning(
363 "Pruning untagged images older than 30 days — this cannot be undone."
364 )
365 return await asyncio.to_thread(lambda: json.dumps(_get_manager().prune(dry_run=dry_run)))