Coverage for mcp/tools/dag.py: 100%
15 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""DAG (multi-step job pipeline) MCP tools (read-only)."""
3import asyncio
5import cli_runner
6from audit import audit_logged
7from server import mcp
10@mcp.tool(tags={"safe", "dag"})
11@audit_logged
12async def dag_validate(manifest_path: str) -> str:
13 """`gco dag validate` — statically validate a DAG manifest.
15 Args:
16 manifest_path: Path to the DAG manifest file.
17 """
18 return await asyncio.to_thread(cli_runner._run_cli, "dag", "validate", manifest_path)
21# =============================================================================
22# Mutating tools (low-risk)
23# =============================================================================
26@mcp.tool(tags={"low-risk", "dag"})
27@audit_logged
28async def dag_run(manifest_path: str, region: str, dry_run: bool = False) -> str:
29 """`gco dag run` — execute a DAG manifest end-to-end.
31 Args:
32 manifest_path: Path to the DAG manifest file.
33 region: Target region for the DAG run.
34 dry_run: When True, validate and plan without submitting jobs.
35 """
36 args = ["dag", "run", manifest_path, "-r", region]
37 if dry_run:
38 args.append("--dry-run")
39 return await asyncio.to_thread(cli_runner._run_cli, *args)