Coverage for gco/services/api_shared.py: 98%

145 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2Shared state, models, and helpers for the Manifest API routers. 

3 

4This module holds the global state (manifest processor, DynamoDB stores), 

5Pydantic request/response models, and helper functions used across all API 

6route modules. Centralizing them here avoids circular imports between 

7manifest_api.py and the routers. 

8""" 

9 

10from __future__ import annotations 

11 

12import logging 

13from enum import StrEnum 

14from typing import Any 

15 

16from fastapi import HTTPException 

17from kubernetes.client.models import CoreV1Event, V1Job, V1Pod 

18from pydantic import BaseModel, Field 

19 

20from gco.services.manifest_processor import ManifestProcessor 

21from gco.services.metrics_publisher import ManifestProcessorMetrics 

22from gco.services.template_store import ( 

23 JobStore, 

24 TemplateStore, 

25 WebhookStore, 

26) 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31# --------------------------------------------------------------------------- 

32# Shared enums and Pydantic models 

33# --------------------------------------------------------------------------- 

34 

35 

36class SortOrder(StrEnum): 

37 ASC = "asc" 

38 DESC = "desc" 

39 

40 

41class JobStatus(StrEnum): 

42 PENDING = "pending" 

43 RUNNING = "running" 

44 COMPLETED = "completed" 

45 SUCCEEDED = "succeeded" 

46 FAILED = "failed" 

47 

48 

49class WebhookEvent(StrEnum): 

50 JOB_COMPLETED = "job.completed" 

51 JOB_FAILED = "job.failed" 

52 JOB_STARTED = "job.started" 

53 

54 

55class ManifestSubmissionAPIRequest(BaseModel): 

56 """API model for manifest submission requests.""" 

57 

58 manifests: list[dict[str, Any]] = Field( 

59 ..., description="List of Kubernetes manifests to apply" 

60 ) 

61 namespace: str | None = Field( 

62 None, description="Default namespace for resources without namespace specified" 

63 ) 

64 dry_run: bool = Field(False, description="If true, validate manifests without applying them") 

65 validate_manifests: bool = Field( 

66 True, description="If true, perform validation checks on manifests", alias="validate" 

67 ) 

68 

69 model_config = { 

70 "json_schema_extra": { 

71 "example": { 

72 "manifests": [ 

73 {"apiVersion": "batch/v1", "kind": "Job", "metadata": {"name": "example"}} 

74 ], 

75 "namespace": "gco-jobs", 

76 "dry_run": False, 

77 } 

78 } 

79 } 

80 

81 

82class ResourceIdentifier(BaseModel): 

83 api_version: str = Field(..., description="Kubernetes API version (e.g., 'apps/v1')") 

84 kind: str = Field(..., description="Kubernetes resource kind (e.g., 'Deployment')") 

85 name: str = Field(..., description="Resource name") 

86 namespace: str = Field(..., description="Resource namespace") 

87 

88 

89class BulkDeleteRequest(BaseModel): 

90 namespace: str | None = Field(None, description="Filter by namespace") 

91 status: JobStatus | None = Field(None, description="Filter by status") 

92 older_than_days: int | None = Field( 

93 None, description="Delete jobs older than N days", ge=1, le=365 

94 ) 

95 label_selector: str | None = Field(None, description="Kubernetes label selector") 

96 dry_run: bool = Field(False, description="If true, only return what would be deleted") 

97 

98 model_config = { 

99 "json_schema_extra": { 

100 "example": { 

101 "namespace": "gco-jobs", 

102 "status": "completed", 

103 "older_than_days": 7, 

104 "dry_run": False, 

105 } 

106 } 

107 } 

108 

109 

110class JobTemplateRequest(BaseModel): 

111 name: str = Field(..., description="Template name", min_length=1, max_length=63) 

112 description: str | None = Field(None, description="Template description") 

113 manifest: dict[str, Any] = Field(..., description="Job manifest template") 

114 parameters: dict[str, Any] | None = Field(None, description="Default parameter values") 

115 

116 model_config = { 

117 "json_schema_extra": { 

118 "example": { 

119 "name": "gpu-training-template", 

120 "description": "Template for GPU training jobs", 

121 "manifest": { 

122 "apiVersion": "batch/v1", 

123 "kind": "Job", 

124 "metadata": {"name": "{{name}}"}, 

125 }, 

126 "parameters": {"image": "pytorch/pytorch:latest"}, 

127 } 

128 } 

129 } 

130 

131 

132class JobFromTemplateRequest(BaseModel): 

133 name: str = Field(..., description="Job name", min_length=1, max_length=63) 

134 namespace: str = Field("gco-jobs", description="Target namespace") 

135 parameters: dict[str, Any] | None = Field(None, description="Parameter overrides") 

136 

137 model_config = { 

138 "json_schema_extra": { 

139 "example": { 

140 "name": "my-training-job", 

141 "namespace": "gco-jobs", 

142 "parameters": {"image": "my-custom-image:v1"}, 

143 } 

144 } 

145 } 

146 

147 

148class WebhookRequest(BaseModel): 

149 url: str = Field(..., description="Webhook URL to call") 

150 events: list[WebhookEvent] = Field(..., description="Events to subscribe to") 

151 namespace: str | None = Field(None, description="Filter by namespace (optional)") 

152 secret: str | None = Field(None, description="Secret for HMAC signature (optional)") 

153 

154 model_config = { 

155 "json_schema_extra": { 

156 "example": { 

157 "url": "https://example.com/webhook", 

158 "events": ["job.completed", "job.failed"], 

159 "namespace": "gco-jobs", 

160 } 

161 } 

162 } 

163 

164 

165class QueuedJobRequest(BaseModel): 

166 manifest: dict[str, Any] = Field(..., description="Kubernetes job manifest") 

167 target_region: str = Field(..., description="Target region for job execution") 

168 namespace: str = Field("gco-jobs", description="Kubernetes namespace") 

169 priority: int = Field(0, description="Job priority (higher = more important)", ge=0, le=100) 

170 labels: dict[str, str] | None = Field(None, description="Optional labels for filtering") 

171 

172 model_config = { 

173 "json_schema_extra": { 

174 "example": { 

175 "manifest": { 

176 "apiVersion": "batch/v1", 

177 "kind": "Job", 

178 "metadata": {"name": "my-training-job"}, 

179 }, 

180 "target_region": "us-east-1", 

181 "namespace": "gco-jobs", 

182 "priority": 10, 

183 } 

184 } 

185 } 

186 

187 

188class PaginatedResponse(BaseModel): 

189 total: int = Field(..., description="Total number of items") 

190 limit: int = Field(..., description="Items per page") 

191 offset: int = Field(..., description="Current offset") 

192 has_more: bool = Field(..., description="Whether more items exist") 

193 

194 

195class ErrorResponse(BaseModel): 

196 error: str = Field(..., description="Error type") 

197 detail: str = Field(..., description="Error details") 

198 timestamp: str = Field(..., description="Error timestamp") 

199 

200 

201# --------------------------------------------------------------------------- 

202# Global state — populated by the lifespan handler in manifest_api.py 

203# --------------------------------------------------------------------------- 

204manifest_processor: ManifestProcessor | None = None 

205manifest_metrics: ManifestProcessorMetrics | None = None 

206template_store: TemplateStore | None = None 

207webhook_store: WebhookStore | None = None 

208job_store: JobStore | None = None 

209 

210 

211# --------------------------------------------------------------------------- 

212# Helper functions 

213# --------------------------------------------------------------------------- 

214 

215 

216def _check_processor() -> ManifestProcessor: 

217 """Check if manifest processor is initialized and return it.""" 

218 # Import at call-time to read the global that lifespan populates on 

219 # the manifest_api module (tests also patch it there). 

220 from gco.services import manifest_api as _api 

221 

222 if _api.manifest_processor is None: 

223 raise HTTPException(status_code=503, detail="Manifest processor not initialized") 

224 return _api.manifest_processor 

225 

226 

227def _check_namespace(namespace: str, processor: ManifestProcessor) -> None: 

228 """Check if namespace is allowed.""" 

229 if namespace not in processor.allowed_namespaces: 

230 raise HTTPException( 

231 status_code=403, 

232 detail=f"Namespace '{namespace}' not allowed. Allowed: {list(processor.allowed_namespaces)}", 

233 ) 

234 

235 

236def _parse_job_to_dict(job: V1Job) -> dict[str, Any]: 

237 """Parse a Kubernetes Job object to a dictionary.""" 

238 metadata = job.metadata 

239 status = job.status 

240 spec = job.spec 

241 

242 conditions = status.conditions or [] 

243 computed_status = "pending" 

244 for condition in conditions: 

245 if condition.type == "Complete" and condition.status == "True": 

246 computed_status = "succeeded" 

247 break 

248 if condition.type == "Failed" and condition.status == "True": 248 ↛ 244line 248 didn't jump to line 244 because the condition on line 248 was always true

249 computed_status = "failed" 

250 break 

251 

252 if computed_status == "pending" and (status.active or 0) > 0: 

253 computed_status = "running" 

254 

255 # Pull container image refs from the pod template so callers (e.g. 

256 # the orphan-image cross-reference) can identify which ECR images 

257 # are still in use without a second round-trip per job. 

258 template = getattr(spec, "template", None) 

259 pod_spec = getattr(template, "spec", None) if template is not None else None 

260 containers = getattr(pod_spec, "containers", None) or [] 

261 init_containers = getattr(pod_spec, "init_containers", None) or [] 

262 container_specs = [ 

263 {"name": getattr(c, "name", ""), "image": getattr(c, "image", "")} for c in containers 

264 ] 

265 init_container_specs = [ 

266 {"name": getattr(c, "name", ""), "image": getattr(c, "image", "")} for c in init_containers 

267 ] 

268 

269 return { 

270 "metadata": { 

271 "name": metadata.name, 

272 "namespace": metadata.namespace, 

273 "creationTimestamp": ( 

274 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None 

275 ), 

276 "labels": metadata.labels or {}, 

277 "annotations": metadata.annotations or {}, 

278 "uid": metadata.uid, 

279 }, 

280 "spec": { 

281 "parallelism": spec.parallelism, 

282 "completions": spec.completions, 

283 "backoffLimit": spec.backoff_limit, 

284 "template": { 

285 "spec": { 

286 "containers": container_specs, 

287 "initContainers": init_container_specs, 

288 }, 

289 }, 

290 }, 

291 "status": { 

292 "active": status.active or 0, 

293 "succeeded": status.succeeded or 0, 

294 "failed": status.failed or 0, 

295 "startTime": status.start_time.isoformat() if status.start_time else None, 

296 "completionTime": ( 

297 status.completion_time.isoformat() if status.completion_time else None 

298 ), 

299 "conditions": [ 

300 { 

301 "type": c.type, 

302 "status": c.status, 

303 "reason": c.reason, 

304 "message": c.message, 

305 "lastTransitionTime": ( 

306 c.last_transition_time.isoformat() if c.last_transition_time else None 

307 ), 

308 } 

309 for c in conditions 

310 ], 

311 }, 

312 "computed_status": computed_status, 

313 } 

314 

315 

316def _parse_pod_to_dict(pod: V1Pod) -> dict[str, Any]: 

317 """Parse a Kubernetes Pod object to a dictionary.""" 

318 metadata = pod.metadata 

319 status = pod.status 

320 spec = pod.spec 

321 

322 container_statuses = [] 

323 for cs in status.container_statuses or []: 

324 container_status: dict[str, Any] = { 

325 "name": cs.name, 

326 "ready": cs.ready, 

327 "restartCount": cs.restart_count, 

328 "image": cs.image, 

329 } 

330 if cs.state: 330 ↛ 343line 330 didn't jump to line 343 because the condition on line 330 was always true

331 if cs.state.running: 

332 container_status["state"] = "running" 

333 container_status["startedAt"] = ( 

334 cs.state.running.started_at.isoformat() if cs.state.running.started_at else None 

335 ) 

336 elif cs.state.waiting: 

337 container_status["state"] = "waiting" 

338 container_status["reason"] = cs.state.waiting.reason 

339 elif cs.state.terminated: 339 ↛ 343line 339 didn't jump to line 343 because the condition on line 339 was always true

340 container_status["state"] = "terminated" 

341 container_status["exitCode"] = cs.state.terminated.exit_code 

342 container_status["reason"] = cs.state.terminated.reason 

343 container_statuses.append(container_status) 

344 

345 init_container_statuses = [] 

346 for cs in status.init_container_statuses or []: 

347 init_status = { 

348 "name": cs.name, 

349 "ready": cs.ready, 

350 "restartCount": cs.restart_count, 

351 } 

352 init_container_statuses.append(init_status) 

353 

354 return { 

355 "metadata": { 

356 "name": metadata.name, 

357 "namespace": metadata.namespace, 

358 "creationTimestamp": ( 

359 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None 

360 ), 

361 "labels": metadata.labels or {}, 

362 "uid": metadata.uid, 

363 }, 

364 "spec": { 

365 "nodeName": spec.node_name, 

366 "containers": [{"name": c.name, "image": c.image} for c in spec.containers], 

367 "initContainers": [ 

368 {"name": c.name, "image": c.image} for c in (spec.init_containers or []) 

369 ], 

370 }, 

371 "status": { 

372 "phase": status.phase, 

373 "hostIP": status.host_ip, 

374 "podIP": status.pod_ip, 

375 "startTime": status.start_time.isoformat() if status.start_time else None, 

376 "containerStatuses": container_statuses, 

377 "initContainerStatuses": init_container_statuses, 

378 }, 

379 } 

380 

381 

382def _parse_event_to_dict(event: CoreV1Event) -> dict[str, Any]: 

383 """Parse a Kubernetes Event object to a dictionary.""" 

384 return { 

385 "type": event.type, 

386 "reason": event.reason, 

387 "message": event.message, 

388 "count": event.count or 1, 

389 "firstTimestamp": (event.first_timestamp.isoformat() if event.first_timestamp else None), 

390 "lastTimestamp": (event.last_timestamp.isoformat() if event.last_timestamp else None), 

391 "source": { 

392 "component": event.source.component if event.source else None, 

393 "host": event.source.host if event.source else None, 

394 }, 

395 "involvedObject": { 

396 "kind": event.involved_object.kind if event.involved_object else None, 

397 "name": event.involved_object.name if event.involved_object else None, 

398 "namespace": event.involved_object.namespace if event.involved_object else None, 

399 }, 

400 } 

401 

402 

403def _apply_template_parameters( 

404 manifest: dict[str, Any], parameters: dict[str, Any] 

405) -> dict[str, Any]: 

406 """Apply parameter substitutions to a manifest template.""" 

407 import json 

408 import re 

409 

410 manifest_str = json.dumps(manifest) 

411 for key, value in parameters.items(): 

412 pattern = r"\{\{\s*" + re.escape(key) + r"\s*\}\}" 

413 manifest_str = re.sub(pattern, str(value), manifest_str) 

414 result: dict[str, Any] = json.loads(manifest_str) 

415 return result