Coverage for mcp/tools/dag.py: 100%

15 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""DAG (multi-step job pipeline) MCP tools (read-only).""" 

2 

3import asyncio 

4 

5import cli_runner 

6from audit import audit_logged 

7from server import mcp 

8 

9 

10@mcp.tool(tags={"safe", "dag"}) 

11@audit_logged 

12async def dag_validate(manifest_path: str) -> str: 

13 """`gco dag validate` — statically validate a DAG manifest. 

14 

15 Args: 

16 manifest_path: Path to the DAG manifest file. 

17 """ 

18 return await asyncio.to_thread(cli_runner._run_cli, "dag", "validate", manifest_path) 

19 

20 

21# ============================================================================= 

22# Mutating tools (low-risk) 

23# ============================================================================= 

24 

25 

26@mcp.tool(tags={"low-risk", "dag"}) 

27@audit_logged 

28async def dag_run(manifest_path: str, region: str, dry_run: bool = False) -> str: 

29 """`gco dag run` — execute a DAG manifest end-to-end. 

30 

31 Args: 

32 manifest_path: Path to the DAG manifest file. 

33 region: Target region for the DAG run. 

34 dry_run: When True, validate and plan without submitting jobs. 

35 """ 

36 args = ["dag", "run", manifest_path, "-r", region] 

37 if dry_run: 

38 args.append("--dry-run") 

39 return await asyncio.to_thread(cli_runner._run_cli, *args)