Coverage for mcp/tools/queue.py: 98%
61 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Queue management MCP tools (read-only)."""
3import asyncio
5import cli_runner
6from audit import audit_logged
7from server import mcp
10@mcp.tool(tags={"safe", "queue"})
11@audit_logged
12async def queue_list(
13 region: str | None = None,
14 status: str | None = None,
15 namespace: str | None = None,
16 limit: int = 50,
17) -> str:
18 """`gco queue list` — list jobs in the global queue.
20 Args:
21 region: Filter by target region.
22 status: Filter by status (queued, claimed, running, succeeded, failed, cancelled).
23 namespace: Filter by Kubernetes namespace.
24 limit: Maximum results (default 50).
25 """
26 args = ["queue", "list"]
27 if region:
28 args += ["-r", region]
29 if status:
30 args += ["--status", status]
31 if namespace:
32 args += ["-n", namespace]
33 args += ["--limit", str(limit)]
34 return await asyncio.to_thread(cli_runner._run_cli, *args)
37@mcp.tool(tags={"safe", "queue"})
38@audit_logged
39async def queue_get(job_id: str, region: str | None = None) -> str:
40 """`gco queue get` — fetch a single job from the global queue.
42 Args:
43 job_id: Job identifier.
44 region: Region to query (any region works).
45 """
46 args = ["queue", "get", job_id]
47 if region:
48 args += ["-r", region]
49 return await asyncio.to_thread(cli_runner._run_cli, *args)
52@mcp.tool(tags={"safe", "queue"})
53@audit_logged
54async def queue_stats(region: str | None = None) -> str:
55 """`gco queue stats` — show aggregate stats for the global queue.
57 Args:
58 region: Region to query (any region works).
59 """
60 args = ["queue", "stats"]
61 if region:
62 args += ["-r", region]
63 return await asyncio.to_thread(cli_runner._run_cli, *args)
66# =============================================================================
67# Mutating tools (low-risk)
68# =============================================================================
71@mcp.tool(tags={"low-risk", "queue"})
72@audit_logged
73async def queue_submit(
74 manifest_path: str,
75 region: str,
76 namespace: str | None = None,
77 priority: int | None = None,
78 labels: dict[str, str] | None = None,
79) -> str:
80 """`gco queue submit` — submit a job manifest to the global queue.
82 Args:
83 manifest_path: Path to the job manifest YAML.
84 region: Target region for job execution.
85 namespace: Kubernetes namespace (defaults to ``gco-jobs`` server-side).
86 priority: Job priority (0-100, higher = more important).
87 labels: Optional ``key=value`` labels to attach to the queued job.
88 """
89 args = ["queue", "submit", manifest_path, "-r", region]
90 if namespace:
91 args += ["-n", namespace]
92 if priority is not None:
93 args += ["--priority", str(priority)]
94 if labels:
95 for key, value in labels.items():
96 args += ["--label", f"{key}={value}"]
97 return await asyncio.to_thread(cli_runner._run_cli, *args)
100# =============================================================================
101# Destructive tools — gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS
102# =============================================================================
105import contextlib # noqa: E402
107from feature_flags import FLAG_DESTRUCTIVE_OPERATIONS, is_enabled # noqa: E402
110async def _ctx_warning(message: str) -> None:
111 """Emit ``ctx.warning(...)`` from inside a tool body, no-op when no Context."""
112 try:
113 from fastmcp.server.dependencies import get_context
115 ctx = get_context()
116 except Exception:
117 return
118 with contextlib.suppress(Exception):
119 await ctx.warning(message)
122if is_enabled(FLAG_DESTRUCTIVE_OPERATIONS):
124 @mcp.tool(tags={"destructive", "queue"})
125 @audit_logged
126 async def cancel_queue_job(job_id: str, region: str | None = None) -> str:
127 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive.
129 `gco queue cancel` — cancel a queued job (only works for jobs not
130 yet running). Cannot be undone — the job's queue record is
131 transitioned to ``cancelled`` and any pending claim attempts stop.
133 Args:
134 job_id: Job identifier.
135 region: Region to query (any region works).
136 """
137 await _ctx_warning(f"Cancelling queue job {job_id!r} — this cannot be undone.")
138 args = ["queue", "cancel", job_id, "-y"]
139 if region:
140 args += ["-r", region]
141 return await asyncio.to_thread(cli_runner._run_cli, *args)