Coverage for cli / commands / dag_cmd.py: 78%
88 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""DAG pipeline commands."""
3import sys
4from typing import Any
6import click
8from ..config import GCOConfig
9from ..output import get_output_formatter
11pass_config = click.make_pass_decorator(GCOConfig, ensure=True)
14@click.group()
15@pass_config
16def dag(config: Any) -> None:
17 """Run multi-step job pipelines with dependencies."""
18 pass
21@dag.command("run")
22@click.argument("dag_file", type=click.Path(exists=True))
23@click.option("--region", "-r", help="Region to run in (default: from DAG file or first deployed)")
24@click.option(
25 "--timeout", "-t", default=3600, type=int, help="Timeout per step in seconds (default: 3600)"
26)
27@click.option("--dry-run", is_flag=True, help="Validate the DAG without running it")
28@pass_config
29def dag_run(config: Any, dag_file: Any, region: Any, timeout: Any, dry_run: Any) -> None:
30 """Run a DAG pipeline from a YAML definition.
32 The DAG file defines steps with dependencies. Steps run in order,
33 and downstream steps are skipped if a dependency fails.
35 Examples:
36 gco dag run pipeline.yaml
37 gco dag run pipeline.yaml -r us-east-1
38 gco dag run pipeline.yaml --dry-run
39 """
40 from ..dag import get_dag_runner, load_dag
42 formatter = get_output_formatter(config)
44 try:
45 dag_def = load_dag(dag_file)
46 errors = dag_def.validate()
48 if errors:
49 for err in errors:
50 formatter.print_error(err)
51 sys.exit(1)
53 if dry_run:
54 formatter.print_success(f"DAG '{dag_def.name}' is valid ({len(dag_def.steps)} steps)")
55 print("\n Execution order:")
56 # Show topological order
57 completed: set[str] = set()
58 order = 1
59 remaining = list(dag_def.steps)
60 while remaining:
61 batch = [s for s in remaining if all(d in completed for d in s.depends_on)]
62 for step in batch:
63 deps = f" (after: {', '.join(step.depends_on)})" if step.depends_on else ""
64 print(f" {order}. {step.name} → {step.manifest}{deps}")
65 completed.add(step.name)
66 remaining.remove(step)
67 order += 1
68 print()
69 return
71 runner = get_dag_runner(config)
73 def on_progress(step_name: str, status: str, msg: str) -> None:
74 if status == "started":
75 formatter.print_info(msg)
76 elif status == "running":
77 formatter.print_info(f" [{step_name}] {msg}")
78 elif status == "succeeded":
79 formatter.print_success(f" [{step_name}] ✓ Completed")
80 elif status == "failed":
81 formatter.print_error(f" [{step_name}] ✗ {msg}")
82 elif status == "skipped":
83 formatter.print_info(f" [{step_name}] ⊘ {msg}")
84 elif "completed" in status:
85 print()
86 formatter.print_info(msg)
88 result = runner.run(
89 dag_def,
90 region=region,
91 timeout_per_step=timeout,
92 progress_callback=on_progress,
93 )
95 if result.has_failures(): 95 ↛ exitline 95 didn't return from function 'dag_run' because the condition on line 95 was always true
96 sys.exit(1)
98 except Exception as e:
99 formatter.print_error(f"DAG execution failed: {e}")
100 sys.exit(1)
103@dag.command("validate")
104@click.argument("dag_file", type=click.Path(exists=True))
105@pass_config
106def dag_validate(config: Any, dag_file: Any) -> None:
107 """Validate a DAG definition without running it.
109 Examples:
110 gco dag validate pipeline.yaml
111 """
112 from ..dag import load_dag
114 formatter = get_output_formatter(config)
116 try:
117 dag_def = load_dag(dag_file)
118 errors = dag_def.validate()
120 if errors:
121 formatter.print_error(f"DAG '{dag_def.name}' has {len(errors)} error(s):")
122 for err in errors:
123 formatter.print_error(f" - {err}")
124 sys.exit(1)
126 formatter.print_success(f"DAG '{dag_def.name}' is valid")
127 print(f" Steps: {len(dag_def.steps)}")
128 print(f" Region: {dag_def.region or '(auto-detect)'}")
129 print(f" Namespace: {dag_def.namespace}")
130 for step in dag_def.steps:
131 deps = f" → depends on: {', '.join(step.depends_on)}" if step.depends_on else ""
132 print(f" - {step.name}: {step.manifest}{deps}")
133 print()
135 except Exception as e:
136 formatter.print_error(f"Failed to load DAG: {e}")
137 sys.exit(1)