Coverage for mcp/tools/queue.py: 98%

61 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Queue management MCP tools (read-only).""" 

2 

3import asyncio 

4 

5import cli_runner 

6from audit import audit_logged 

7from server import mcp 

8 

9 

10@mcp.tool(tags={"safe", "queue"}) 

11@audit_logged 

12async def queue_list( 

13 region: str | None = None, 

14 status: str | None = None, 

15 namespace: str | None = None, 

16 limit: int = 50, 

17) -> str: 

18 """`gco queue list` — list jobs in the global queue. 

19 

20 Args: 

21 region: Filter by target region. 

22 status: Filter by status (queued, claimed, running, succeeded, failed, cancelled). 

23 namespace: Filter by Kubernetes namespace. 

24 limit: Maximum results (default 50). 

25 """ 

26 args = ["queue", "list"] 

27 if region: 

28 args += ["-r", region] 

29 if status: 

30 args += ["--status", status] 

31 if namespace: 

32 args += ["-n", namespace] 

33 args += ["--limit", str(limit)] 

34 return await asyncio.to_thread(cli_runner._run_cli, *args) 

35 

36 

37@mcp.tool(tags={"safe", "queue"}) 

38@audit_logged 

39async def queue_get(job_id: str, region: str | None = None) -> str: 

40 """`gco queue get` — fetch a single job from the global queue. 

41 

42 Args: 

43 job_id: Job identifier. 

44 region: Region to query (any region works). 

45 """ 

46 args = ["queue", "get", job_id] 

47 if region: 

48 args += ["-r", region] 

49 return await asyncio.to_thread(cli_runner._run_cli, *args) 

50 

51 

52@mcp.tool(tags={"safe", "queue"}) 

53@audit_logged 

54async def queue_stats(region: str | None = None) -> str: 

55 """`gco queue stats` — show aggregate stats for the global queue. 

56 

57 Args: 

58 region: Region to query (any region works). 

59 """ 

60 args = ["queue", "stats"] 

61 if region: 

62 args += ["-r", region] 

63 return await asyncio.to_thread(cli_runner._run_cli, *args) 

64 

65 

66# ============================================================================= 

67# Mutating tools (low-risk) 

68# ============================================================================= 

69 

70 

71@mcp.tool(tags={"low-risk", "queue"}) 

72@audit_logged 

73async def queue_submit( 

74 manifest_path: str, 

75 region: str, 

76 namespace: str | None = None, 

77 priority: int | None = None, 

78 labels: dict[str, str] | None = None, 

79) -> str: 

80 """`gco queue submit` — submit a job manifest to the global queue. 

81 

82 Args: 

83 manifest_path: Path to the job manifest YAML. 

84 region: Target region for job execution. 

85 namespace: Kubernetes namespace (defaults to ``gco-jobs`` server-side). 

86 priority: Job priority (0-100, higher = more important). 

87 labels: Optional ``key=value`` labels to attach to the queued job. 

88 """ 

89 args = ["queue", "submit", manifest_path, "-r", region] 

90 if namespace: 

91 args += ["-n", namespace] 

92 if priority is not None: 

93 args += ["--priority", str(priority)] 

94 if labels: 

95 for key, value in labels.items(): 

96 args += ["--label", f"{key}={value}"] 

97 return await asyncio.to_thread(cli_runner._run_cli, *args) 

98 

99 

100# ============================================================================= 

101# Destructive tools — gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS 

102# ============================================================================= 

103 

104 

105import contextlib # noqa: E402 

106 

107from feature_flags import FLAG_DESTRUCTIVE_OPERATIONS, is_enabled # noqa: E402 

108 

109 

110async def _ctx_warning(message: str) -> None: 

111 """Emit ``ctx.warning(...)`` from inside a tool body, no-op when no Context.""" 

112 try: 

113 from fastmcp.server.dependencies import get_context 

114 

115 ctx = get_context() 

116 except Exception: 

117 return 

118 with contextlib.suppress(Exception): 

119 await ctx.warning(message) 

120 

121 

122if is_enabled(FLAG_DESTRUCTIVE_OPERATIONS): 

123 

124 @mcp.tool(tags={"destructive", "queue"}) 

125 @audit_logged 

126 async def cancel_queue_job(job_id: str, region: str | None = None) -> str: 

127 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive. 

128 

129 `gco queue cancel` — cancel a queued job (only works for jobs not 

130 yet running). Cannot be undone — the job's queue record is 

131 transitioned to ``cancelled`` and any pending claim attempts stop. 

132 

133 Args: 

134 job_id: Job identifier. 

135 region: Region to query (any region works). 

136 """ 

137 await _ctx_warning(f"Cancelling queue job {job_id!r} — this cannot be undone.") 

138 args = ["queue", "cancel", job_id, "-y"] 

139 if region: 

140 args += ["-r", region] 

141 return await asyncio.to_thread(cli_runner._run_cli, *args)