Coverage for mcp/tools/images.py: 78%

114 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Container image registry MCP tools. 

2 

3All tools wrap ``cli/images.py::ImageManager`` so the MCP layer never 

4re-implements the underlying ECR/runtime logic. Read-only and 

5administrative tools are unconditional. Build/push tools register only 

6when ``GCO_ENABLE_IMAGE_PUBLISH`` is set; destructive tools register 

7only when ``GCO_ENABLE_DESTRUCTIVE_OPERATIONS`` is set. 

8""" 

9 

10from __future__ import annotations 

11 

12import asyncio 

13import json 

14from typing import Any 

15 

16from audit import audit_logged 

17from feature_flags import ( 

18 FLAG_DESTRUCTIVE_OPERATIONS, 

19 FLAG_IMAGE_PUBLISH, 

20 is_enabled, 

21) 

22from server import mcp 

23 

24from tools._long_task import _run_long_task 

25 

26# FastMCP's Progress / Context dependencies are optional from this 

27# module's perspective — when ``fastmcp[tasks]`` is reachable they 

28# inject real instances per call; otherwise the gated build/push tools 

29# still register but rely on caller-provided fakes (the test path). 

30try: 

31 from fastmcp.server.dependencies import CurrentContext, Progress 

32except ImportError: # pragma: no cover - degraded fastmcp install 

33 CurrentContext = None # type: ignore[assignment] 

34 Progress = None # type: ignore[misc,assignment] 

35 

36# TaskConfig is best-effort wired so MCP clients that opt into the task 

37# protocol can run build/push as background tasks. If the import path 

38# moves between fastmcp versions, the tools register without it. 

39try: 

40 from fastmcp.server.tasks.config import TaskConfig 

41 

42 _TASK_CONFIG_OPTIONAL: Any = TaskConfig(mode="optional") 

43except ImportError: # pragma: no cover - degraded fastmcp install 

44 _TASK_CONFIG_OPTIONAL = None 

45 

46 

47def _get_manager() -> Any: 

48 """Lazy-import ``cli.images.get_image_manager`` so MCP server 

49 import doesn't pull boto3 prematurely. 

50 """ 

51 from cli.images import get_image_manager 

52 

53 return get_image_manager() 

54 

55 

56# ============================================================================= 

57# Read-only tools — Risk_Tier "safe" 

58# ============================================================================= 

59 

60 

61@mcp.tool(tags={"safe", "images"}) 

62@audit_logged 

63async def images_list() -> str: 

64 """`gco images list` — list every gco/* repository in ECR.""" 

65 return await asyncio.to_thread(lambda: json.dumps(_get_manager().list_repos())) 

66 

67 

68@mcp.tool(tags={"safe", "images"}) 

69@audit_logged 

70async def images_tags(name: str) -> str: 

71 """`gco images tags` — list tags within a repository. 

72 

73 Args: 

74 name: Repository name (without the ``gco/`` prefix). 

75 """ 

76 return await asyncio.to_thread(lambda: json.dumps(_get_manager().list_tags(name))) 

77 

78 

79@mcp.tool(tags={"safe", "images"}) 

80@audit_logged 

81async def images_describe(name: str, tag: str) -> str: 

82 """`gco images describe` — full ECR details for a single image tag. 

83 

84 Args: 

85 name: Repository name (without the ``gco/`` prefix). 

86 tag: Image tag. 

87 """ 

88 return await asyncio.to_thread(lambda: json.dumps(_get_manager().describe(name, tag))) 

89 

90 

91@mcp.tool(tags={"safe", "images"}) 

92@audit_logged 

93async def images_uri(name: str, tag: str = "latest") -> str: 

94 """`gco images uri` — return the registry URI for an image. No AWS calls. 

95 

96 Args: 

97 name: Repository name (without the ``gco/`` prefix). 

98 tag: Image tag. Defaults to ``latest``. 

99 """ 

100 return await asyncio.to_thread( 

101 lambda: json.dumps({"uri": _get_manager().get_uri(name, tag=tag)}) 

102 ) 

103 

104 

105@mcp.tool(tags={"safe", "images"}) 

106@audit_logged 

107async def images_replication_get() -> str: 

108 """`gco images replication get` — current ECR replication configuration.""" 

109 return await asyncio.to_thread(lambda: json.dumps(_get_manager().replication_get())) 

110 

111 

112@mcp.tool(tags={"safe", "images"}) 

113@audit_logged 

114async def images_replication_status() -> str: 

115 """`gco images replication status` — per-image replication status across project repos.""" 

116 return await asyncio.to_thread(lambda: json.dumps(_get_manager().replication_status())) 

117 

118 

119@mcp.tool(tags={"safe", "images"}) 

120@audit_logged 

121async def images_orphans(threshold_days: int = 30) -> str: 

122 """`gco images orphans` — list gco/* tags older than ``threshold_days`` with no references. 

123 

124 Args: 

125 threshold_days: Age threshold in days. Defaults to 30. 

126 """ 

127 return await asyncio.to_thread( 

128 lambda: json.dumps(_get_manager().orphans(threshold_days=threshold_days)) 

129 ) 

130 

131 

132# ============================================================================= 

133# Administrative tools — Risk_Tier "low-risk" 

134# ============================================================================= 

135 

136 

137@mcp.tool(tags={"low-risk", "images"}) 

138@audit_logged 

139async def images_init(name: str, retain: bool = False) -> str: 

140 """`gco images init` — create the project ECR repo idempotently with default lifecycle. 

141 

142 Args: 

143 name: Repository name (without the ``gco/`` prefix). 

144 retain: When True, mark the repository with ``gco:retain=true`` so it 

145 survives stack destroys. 

146 """ 

147 return await asyncio.to_thread(lambda: json.dumps(_get_manager().init(name, retain=retain))) 

148 

149 

150@mcp.tool(tags={"low-risk", "images"}) 

151@audit_logged 

152async def images_lifecycle_get(name: str) -> str: 

153 """`gco images lifecycle get` — print the lifecycle policy on a repository. 

154 

155 Args: 

156 name: Repository name (without the ``gco/`` prefix). 

157 """ 

158 return await asyncio.to_thread(lambda: json.dumps(_get_manager().lifecycle_get(name))) 

159 

160 

161@mcp.tool(tags={"low-risk", "images"}) 

162@audit_logged 

163async def images_lifecycle_set(name: str, policy: dict[str, Any]) -> str: 

164 """`gco images lifecycle set` — replace the lifecycle policy on a repository. 

165 

166 Args: 

167 name: Repository name (without the ``gco/`` prefix). 

168 policy: ECR lifecycle policy document as a dict. 

169 """ 

170 return await asyncio.to_thread(lambda: json.dumps(_get_manager().lifecycle_set(name, policy))) 

171 

172 

173@mcp.tool(tags={"low-risk", "images"}) 

174@audit_logged 

175async def images_replication_sync() -> str: 

176 """`gco images replication sync` — apply the standard gco/* replication rule.""" 

177 return await asyncio.to_thread(lambda: json.dumps(_get_manager().replication_sync())) 

178 

179 

180# ============================================================================= 

181# Image publish — gated by GCO_ENABLE_IMAGE_PUBLISH 

182# ============================================================================= 

183# 

184# build/push are long-running data-upload operations. They run via 

185# ``_run_long_task`` so progress messages stream back through the 

186# FastMCP Progress dependency. 

187 

188if is_enabled(FLAG_IMAGE_PUBLISH): 

189 # Build the decorator kwargs dict so we only pass ``task=...`` when 

190 # TaskConfig was importable on this fastmcp version. 

191 _publish_decorator_kwargs: dict[str, Any] = {"tags": {"image", "images"}} 

192 if _TASK_CONFIG_OPTIONAL is not None: 192 ↛ 195line 192 didn't jump to line 195 because the condition on line 192 was always true

193 _publish_decorator_kwargs["task"] = _TASK_CONFIG_OPTIONAL 

194 

195 if Progress is not None and CurrentContext is not None: 195 ↛ 273line 195 didn't jump to line 273 because the condition on line 195 was always true

196 

197 @mcp.tool(**_publish_decorator_kwargs) # type: ignore[untyped-decorator] 

198 @audit_logged 

199 async def images_build( 

200 context: str, 

201 name: str, 

202 tag: str | None = None, 

203 dockerfile: str = "Dockerfile", 

204 platform: str = "linux/amd64", 

205 retain: bool = False, 

206 *, 

207 ctx: Any = CurrentContext(), 

208 progress: Any = Progress(), 

209 ) -> str: 

210 """[gated by GCO_ENABLE_IMAGE_PUBLISH] long-running, data-upload. 

211 

212 `gco images build` — build a container image and push to ECR. 

213 

214 Args: 

215 context: Build context directory. 

216 name: Image name (lowercase letters, digits, dashes; max 63 chars). 

217 tag: Image tag (defaults to git short SHA, else ``latest``). 

218 dockerfile: Path to the Dockerfile, relative to ``context``. 

219 platform: ``--platform`` argument for the build. 

220 retain: When True, mark the repository with ``gco:retain=true`` 

221 so it survives stack destroys. 

222 """ 

223 argv = ["gco", "images", "build", context, "--name", name] 

224 if tag: 

225 argv += ["--tag", tag] 

226 argv += ["--dockerfile", dockerfile, "--platform", platform] 

227 if retain: 

228 argv.append("--retain") 

229 return await _run_long_task(argv, ctx=ctx, progress=progress, is_stack_op=False) 

230 

231 @mcp.tool(**_publish_decorator_kwargs) # type: ignore[untyped-decorator] 

232 @audit_logged 

233 async def images_push( 

234 name: str, 

235 tag: str, 

236 local_image: str, 

237 retain: bool = False, 

238 *, 

239 ctx: Any = CurrentContext(), 

240 progress: Any = Progress(), 

241 ) -> str: 

242 """[gated by GCO_ENABLE_IMAGE_PUBLISH] long-running, data-upload. 

243 

244 `gco images push` — push an already-built local image to the project ECR repo. 

245 

246 Args: 

247 name: Image name (lowercase letters, digits, dashes; max 63 chars). 

248 tag: Image tag. 

249 local_image: Source image reference on the local container runtime. 

250 retain: When True, mark the repository with ``gco:retain=true`` 

251 so it survives stack destroys. 

252 """ 

253 argv = [ 

254 "gco", 

255 "images", 

256 "push", 

257 name, 

258 "--tag", 

259 tag, 

260 "--local-image", 

261 local_image, 

262 ] 

263 if retain: 

264 argv.append("--retain") 

265 return await _run_long_task(argv, ctx=ctx, progress=progress, is_stack_op=False) 

266 

267 

268# ============================================================================= 

269# Destructive image tools — gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS 

270# ============================================================================= 

271 

272 

273async def _ctx_warning(message: str) -> None: 

274 """Emit ``ctx.warning(...)`` from inside a tool body, no-op when no Context. 

275 

276 Tools wrapped here are short-lived enough that we don't need the full 

277 ``_run_long_task`` stack — we just want operators (and the audit log) 

278 to see a warning when destructive work runs. 

279 """ 

280 import contextlib as _contextlib 

281 

282 try: 

283 from fastmcp.server.dependencies import get_context 

284 

285 ctx = get_context() 

286 except Exception: 

287 return 

288 with _contextlib.suppress(Exception): 

289 await ctx.warning(message) 

290 

291 

292if is_enabled(FLAG_DESTRUCTIVE_OPERATIONS): 

293 

294 @mcp.tool(tags={"destructive", "images"}) 

295 @audit_logged 

296 async def images_delete_tag(name: str, tag: str) -> str: 

297 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive. 

298 

299 `gco images delete-tag` — delete a single tag from a repository. 

300 Cannot be undone — the image manifest is removed from ECR. 

301 

302 Args: 

303 name: Repository name (without the ``gco/`` prefix). 

304 tag: Image tag to delete. 

305 """ 

306 await _ctx_warning(f"Deleting tag {tag!r} from gco/{name} — this cannot be undone.") 

307 return await asyncio.to_thread(lambda: json.dumps(_get_manager().delete_tag(name, tag))) 

308 

309 @mcp.tool(tags={"destructive", "images"}) 

310 @audit_logged 

311 async def images_delete_repo(name: str, force: bool = False) -> str: 

312 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive. 

313 

314 `gco images delete-repo` — delete a whole repository. 

315 Cannot be undone — the repo and (when ``force=True``) every image 

316 inside it are permanently removed from ECR. 

317 

318 Args: 

319 name: Repository name (without the ``gco/`` prefix). 

320 force: When True, also delete every image inside the repo. 

321 """ 

322 await _ctx_warning( 

323 f"Deleting repository gco/{name} (force={force}) — this cannot be undone." 

324 ) 

325 return await asyncio.to_thread( 

326 lambda: json.dumps(_get_manager().delete_repo(name, force=force)) 

327 ) 

328 

329 @mcp.tool(tags={"destructive", "images"}) 

330 @audit_logged 

331 async def images_cleanup(name: str | None = None, all: bool = False) -> str: 

332 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive. 

333 

334 `gco images cleanup` — remove every untagged image across one or all project repos. 

335 Cannot be undone — untagged image manifests are permanently deleted. 

336 

337 Args: 

338 name: Repository name to clean (without the ``gco/`` prefix). Required 

339 unless ``all=True``. 

340 all: When True, clean every project repository. 

341 """ 

342 scope = "all repos" if all else f"gco/{name}" 

343 await _ctx_warning(f"Cleaning untagged images from {scope} — this cannot be undone.") 

344 return await asyncio.to_thread( 

345 lambda: json.dumps(_get_manager().cleanup(name=name, all=all)) 

346 ) 

347 

348 @mcp.tool(tags={"destructive", "images"}) 

349 @audit_logged 

350 async def images_prune(dry_run: bool = True) -> str: 

351 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive. 

352 

353 `gco images prune` — remove untagged images older than 30 days. 

354 Cannot be undone when ``dry_run=False``; the matching image manifests 

355 are permanently deleted. 

356 

357 Args: 

358 dry_run: When True (default), report what would be deleted without 

359 deleting anything. 

360 """ 

361 if not dry_run: 

362 await _ctx_warning( 

363 "Pruning untagged images older than 30 days — this cannot be undone." 

364 ) 

365 return await asyncio.to_thread(lambda: json.dumps(_get_manager().prune(dry_run=dry_run)))