Coverage for cli / commands / dag_cmd.py: 78%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1"""DAG pipeline commands.""" 

2 

3import sys 

4from typing import Any 

5 

6import click 

7 

8from ..config import GCOConfig 

9from ..output import get_output_formatter 

10 

11pass_config = click.make_pass_decorator(GCOConfig, ensure=True) 

12 

13 

14@click.group() 

15@pass_config 

16def dag(config: Any) -> None: 

17 """Run multi-step job pipelines with dependencies.""" 

18 pass 

19 

20 

21@dag.command("run") 

22@click.argument("dag_file", type=click.Path(exists=True)) 

23@click.option("--region", "-r", help="Region to run in (default: from DAG file or first deployed)") 

24@click.option( 

25 "--timeout", "-t", default=3600, type=int, help="Timeout per step in seconds (default: 3600)" 

26) 

27@click.option("--dry-run", is_flag=True, help="Validate the DAG without running it") 

28@pass_config 

29def dag_run(config: Any, dag_file: Any, region: Any, timeout: Any, dry_run: Any) -> None: 

30 """Run a DAG pipeline from a YAML definition. 

31 

32 The DAG file defines steps with dependencies. Steps run in order, 

33 and downstream steps are skipped if a dependency fails. 

34 

35 Examples: 

36 gco dag run pipeline.yaml 

37 gco dag run pipeline.yaml -r us-east-1 

38 gco dag run pipeline.yaml --dry-run 

39 """ 

40 from ..dag import get_dag_runner, load_dag 

41 

42 formatter = get_output_formatter(config) 

43 

44 try: 

45 dag_def = load_dag(dag_file) 

46 errors = dag_def.validate() 

47 

48 if errors: 

49 for err in errors: 

50 formatter.print_error(err) 

51 sys.exit(1) 

52 

53 if dry_run: 

54 formatter.print_success(f"DAG '{dag_def.name}' is valid ({len(dag_def.steps)} steps)") 

55 print("\n Execution order:") 

56 # Show topological order 

57 completed: set[str] = set() 

58 order = 1 

59 remaining = list(dag_def.steps) 

60 while remaining: 

61 batch = [s for s in remaining if all(d in completed for d in s.depends_on)] 

62 for step in batch: 

63 deps = f" (after: {', '.join(step.depends_on)})" if step.depends_on else "" 

64 print(f" {order}. {step.name}{step.manifest}{deps}") 

65 completed.add(step.name) 

66 remaining.remove(step) 

67 order += 1 

68 print() 

69 return 

70 

71 runner = get_dag_runner(config) 

72 

73 def on_progress(step_name: str, status: str, msg: str) -> None: 

74 if status == "started": 

75 formatter.print_info(msg) 

76 elif status == "running": 

77 formatter.print_info(f" [{step_name}] {msg}") 

78 elif status == "succeeded": 

79 formatter.print_success(f" [{step_name}] ✓ Completed") 

80 elif status == "failed": 

81 formatter.print_error(f" [{step_name}] ✗ {msg}") 

82 elif status == "skipped": 

83 formatter.print_info(f" [{step_name}] ⊘ {msg}") 

84 elif "completed" in status: 

85 print() 

86 formatter.print_info(msg) 

87 

88 result = runner.run( 

89 dag_def, 

90 region=region, 

91 timeout_per_step=timeout, 

92 progress_callback=on_progress, 

93 ) 

94 

95 if result.has_failures(): 95 ↛ exitline 95 didn't return from function 'dag_run' because the condition on line 95 was always true

96 sys.exit(1) 

97 

98 except Exception as e: 

99 formatter.print_error(f"DAG execution failed: {e}") 

100 sys.exit(1) 

101 

102 

103@dag.command("validate") 

104@click.argument("dag_file", type=click.Path(exists=True)) 

105@pass_config 

106def dag_validate(config: Any, dag_file: Any) -> None: 

107 """Validate a DAG definition without running it. 

108 

109 Examples: 

110 gco dag validate pipeline.yaml 

111 """ 

112 from ..dag import load_dag 

113 

114 formatter = get_output_formatter(config) 

115 

116 try: 

117 dag_def = load_dag(dag_file) 

118 errors = dag_def.validate() 

119 

120 if errors: 

121 formatter.print_error(f"DAG '{dag_def.name}' has {len(errors)} error(s):") 

122 for err in errors: 

123 formatter.print_error(f" - {err}") 

124 sys.exit(1) 

125 

126 formatter.print_success(f"DAG '{dag_def.name}' is valid") 

127 print(f" Steps: {len(dag_def.steps)}") 

128 print(f" Region: {dag_def.region or '(auto-detect)'}") 

129 print(f" Namespace: {dag_def.namespace}") 

130 for step in dag_def.steps: 

131 deps = f" → depends on: {', '.join(step.depends_on)}" if step.depends_on else "" 

132 print(f" - {step.name}: {step.manifest}{deps}") 

133 print() 

134 

135 except Exception as e: 

136 formatter.print_error(f"Failed to load DAG: {e}") 

137 sys.exit(1)