Coverage for gco / services / api_shared.py: 98%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Shared state, models, and helpers for the Manifest API routers. 

3 

4This module holds the global state (manifest processor, DynamoDB stores), 

5Pydantic request/response models, and helper functions used across all API 

6route modules. Centralizing them here avoids circular imports between 

7manifest_api.py and the routers. 

8""" 

9 

10from __future__ import annotations 

11 

12import logging 

13from enum import StrEnum 

14from typing import Any 

15 

16from fastapi import HTTPException 

17from kubernetes.client.models import CoreV1Event, V1Job, V1Pod 

18from pydantic import BaseModel, Field 

19 

20from gco.services.manifest_processor import ManifestProcessor 

21from gco.services.metrics_publisher import ManifestProcessorMetrics 

22from gco.services.template_store import ( 

23 JobStore, 

24 TemplateStore, 

25 WebhookStore, 

26) 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31# --------------------------------------------------------------------------- 

32# Shared enums and Pydantic models 

33# --------------------------------------------------------------------------- 

34 

35 

36class SortOrder(StrEnum): 

37 ASC = "asc" 

38 DESC = "desc" 

39 

40 

41class JobStatus(StrEnum): 

42 PENDING = "pending" 

43 RUNNING = "running" 

44 COMPLETED = "completed" 

45 SUCCEEDED = "succeeded" 

46 FAILED = "failed" 

47 

48 

49class WebhookEvent(StrEnum): 

50 JOB_COMPLETED = "job.completed" 

51 JOB_FAILED = "job.failed" 

52 JOB_STARTED = "job.started" 

53 

54 

55class ManifestSubmissionAPIRequest(BaseModel): 

56 """API model for manifest submission requests.""" 

57 

58 manifests: list[dict[str, Any]] = Field( 

59 ..., description="List of Kubernetes manifests to apply" 

60 ) 

61 namespace: str | None = Field( 

62 None, description="Default namespace for resources without namespace specified" 

63 ) 

64 dry_run: bool = Field(False, description="If true, validate manifests without applying them") 

65 validate_manifests: bool = Field( 

66 True, description="If true, perform validation checks on manifests", alias="validate" 

67 ) 

68 

69 model_config = { 

70 "json_schema_extra": { 

71 "example": { 

72 "manifests": [ 

73 {"apiVersion": "batch/v1", "kind": "Job", "metadata": {"name": "example"}} 

74 ], 

75 "namespace": "gco-jobs", 

76 "dry_run": False, 

77 } 

78 } 

79 } 

80 

81 

82class ResourceIdentifier(BaseModel): 

83 api_version: str = Field(..., description="Kubernetes API version (e.g., 'apps/v1')") 

84 kind: str = Field(..., description="Kubernetes resource kind (e.g., 'Deployment')") 

85 name: str = Field(..., description="Resource name") 

86 namespace: str = Field(..., description="Resource namespace") 

87 

88 

89class BulkDeleteRequest(BaseModel): 

90 namespace: str | None = Field(None, description="Filter by namespace") 

91 status: JobStatus | None = Field(None, description="Filter by status") 

92 older_than_days: int | None = Field( 

93 None, description="Delete jobs older than N days", ge=1, le=365 

94 ) 

95 label_selector: str | None = Field(None, description="Kubernetes label selector") 

96 dry_run: bool = Field(False, description="If true, only return what would be deleted") 

97 

98 model_config = { 

99 "json_schema_extra": { 

100 "example": { 

101 "namespace": "gco-jobs", 

102 "status": "completed", 

103 "older_than_days": 7, 

104 "dry_run": False, 

105 } 

106 } 

107 } 

108 

109 

110class JobTemplateRequest(BaseModel): 

111 name: str = Field(..., description="Template name", min_length=1, max_length=63) 

112 description: str | None = Field(None, description="Template description") 

113 manifest: dict[str, Any] = Field(..., description="Job manifest template") 

114 parameters: dict[str, Any] | None = Field(None, description="Default parameter values") 

115 

116 model_config = { 

117 "json_schema_extra": { 

118 "example": { 

119 "name": "gpu-training-template", 

120 "description": "Template for GPU training jobs", 

121 "manifest": { 

122 "apiVersion": "batch/v1", 

123 "kind": "Job", 

124 "metadata": {"name": "{{name}}"}, 

125 }, 

126 "parameters": {"image": "pytorch/pytorch:latest"}, 

127 } 

128 } 

129 } 

130 

131 

132class JobFromTemplateRequest(BaseModel): 

133 name: str = Field(..., description="Job name", min_length=1, max_length=63) 

134 namespace: str = Field("gco-jobs", description="Target namespace") 

135 parameters: dict[str, Any] | None = Field(None, description="Parameter overrides") 

136 

137 model_config = { 

138 "json_schema_extra": { 

139 "example": { 

140 "name": "my-training-job", 

141 "namespace": "gco-jobs", 

142 "parameters": {"image": "my-custom-image:v1"}, 

143 } 

144 } 

145 } 

146 

147 

148class WebhookRequest(BaseModel): 

149 url: str = Field(..., description="Webhook URL to call") 

150 events: list[WebhookEvent] = Field(..., description="Events to subscribe to") 

151 namespace: str | None = Field(None, description="Filter by namespace (optional)") 

152 secret: str | None = Field(None, description="Secret for HMAC signature (optional)") 

153 

154 model_config = { 

155 "json_schema_extra": { 

156 "example": { 

157 "url": "https://example.com/webhook", 

158 "events": ["job.completed", "job.failed"], 

159 "namespace": "gco-jobs", 

160 } 

161 } 

162 } 

163 

164 

165class QueuedJobRequest(BaseModel): 

166 manifest: dict[str, Any] = Field(..., description="Kubernetes job manifest") 

167 target_region: str = Field(..., description="Target region for job execution") 

168 namespace: str = Field("gco-jobs", description="Kubernetes namespace") 

169 priority: int = Field(0, description="Job priority (higher = more important)", ge=0, le=100) 

170 labels: dict[str, str] | None = Field(None, description="Optional labels for filtering") 

171 

172 model_config = { 

173 "json_schema_extra": { 

174 "example": { 

175 "manifest": { 

176 "apiVersion": "batch/v1", 

177 "kind": "Job", 

178 "metadata": {"name": "my-training-job"}, 

179 }, 

180 "target_region": "us-east-1", 

181 "namespace": "gco-jobs", 

182 "priority": 10, 

183 } 

184 } 

185 } 

186 

187 

188class PaginatedResponse(BaseModel): 

189 total: int = Field(..., description="Total number of items") 

190 limit: int = Field(..., description="Items per page") 

191 offset: int = Field(..., description="Current offset") 

192 has_more: bool = Field(..., description="Whether more items exist") 

193 

194 

195class ErrorResponse(BaseModel): 

196 error: str = Field(..., description="Error type") 

197 detail: str = Field(..., description="Error details") 

198 timestamp: str = Field(..., description="Error timestamp") 

199 

200 

201# --------------------------------------------------------------------------- 

202# Global state — populated by the lifespan handler in manifest_api.py 

203# --------------------------------------------------------------------------- 

204manifest_processor: ManifestProcessor | None = None 

205manifest_metrics: ManifestProcessorMetrics | None = None 

206template_store: TemplateStore | None = None 

207webhook_store: WebhookStore | None = None 

208job_store: JobStore | None = None 

209 

210 

211# --------------------------------------------------------------------------- 

212# Helper functions 

213# --------------------------------------------------------------------------- 

214 

215 

216def _check_processor() -> ManifestProcessor: 

217 """Check if manifest processor is initialized and return it.""" 

218 # Import at call-time to read the global that lifespan populates on 

219 # the manifest_api module (tests also patch it there). 

220 from gco.services import manifest_api as _api 

221 

222 if _api.manifest_processor is None: 

223 raise HTTPException(status_code=503, detail="Manifest processor not initialized") 

224 return _api.manifest_processor 

225 

226 

227def _check_namespace(namespace: str, processor: ManifestProcessor) -> None: 

228 """Check if namespace is allowed.""" 

229 if namespace not in processor.allowed_namespaces: 

230 raise HTTPException( 

231 status_code=403, 

232 detail=f"Namespace '{namespace}' not allowed. Allowed: {list(processor.allowed_namespaces)}", 

233 ) 

234 

235 

236def _parse_job_to_dict(job: V1Job) -> dict[str, Any]: 

237 """Parse a Kubernetes Job object to a dictionary.""" 

238 metadata = job.metadata 

239 status = job.status 

240 spec = job.spec 

241 

242 conditions = status.conditions or [] 

243 computed_status = "pending" 

244 for condition in conditions: 

245 if condition.type == "Complete" and condition.status == "True": 

246 computed_status = "succeeded" 

247 break 

248 if condition.type == "Failed" and condition.status == "True": 248 ↛ 244line 248 didn't jump to line 244 because the condition on line 248 was always true

249 computed_status = "failed" 

250 break 

251 

252 if computed_status == "pending" and (status.active or 0) > 0: 

253 computed_status = "running" 

254 

255 return { 

256 "metadata": { 

257 "name": metadata.name, 

258 "namespace": metadata.namespace, 

259 "creationTimestamp": ( 

260 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None 

261 ), 

262 "labels": metadata.labels or {}, 

263 "annotations": metadata.annotations or {}, 

264 "uid": metadata.uid, 

265 }, 

266 "spec": { 

267 "parallelism": spec.parallelism, 

268 "completions": spec.completions, 

269 "backoffLimit": spec.backoff_limit, 

270 }, 

271 "status": { 

272 "active": status.active or 0, 

273 "succeeded": status.succeeded or 0, 

274 "failed": status.failed or 0, 

275 "startTime": status.start_time.isoformat() if status.start_time else None, 

276 "completionTime": ( 

277 status.completion_time.isoformat() if status.completion_time else None 

278 ), 

279 "conditions": [ 

280 { 

281 "type": c.type, 

282 "status": c.status, 

283 "reason": c.reason, 

284 "message": c.message, 

285 "lastTransitionTime": ( 

286 c.last_transition_time.isoformat() if c.last_transition_time else None 

287 ), 

288 } 

289 for c in conditions 

290 ], 

291 }, 

292 "computed_status": computed_status, 

293 } 

294 

295 

296def _parse_pod_to_dict(pod: V1Pod) -> dict[str, Any]: 

297 """Parse a Kubernetes Pod object to a dictionary.""" 

298 metadata = pod.metadata 

299 status = pod.status 

300 spec = pod.spec 

301 

302 container_statuses = [] 

303 for cs in status.container_statuses or []: 

304 container_status: dict[str, Any] = { 

305 "name": cs.name, 

306 "ready": cs.ready, 

307 "restartCount": cs.restart_count, 

308 "image": cs.image, 

309 } 

310 if cs.state: 310 ↛ 323line 310 didn't jump to line 323 because the condition on line 310 was always true

311 if cs.state.running: 

312 container_status["state"] = "running" 

313 container_status["startedAt"] = ( 

314 cs.state.running.started_at.isoformat() if cs.state.running.started_at else None 

315 ) 

316 elif cs.state.waiting: 

317 container_status["state"] = "waiting" 

318 container_status["reason"] = cs.state.waiting.reason 

319 elif cs.state.terminated: 319 ↛ 323line 319 didn't jump to line 323 because the condition on line 319 was always true

320 container_status["state"] = "terminated" 

321 container_status["exitCode"] = cs.state.terminated.exit_code 

322 container_status["reason"] = cs.state.terminated.reason 

323 container_statuses.append(container_status) 

324 

325 init_container_statuses = [] 

326 for cs in status.init_container_statuses or []: 

327 init_status = { 

328 "name": cs.name, 

329 "ready": cs.ready, 

330 "restartCount": cs.restart_count, 

331 } 

332 init_container_statuses.append(init_status) 

333 

334 return { 

335 "metadata": { 

336 "name": metadata.name, 

337 "namespace": metadata.namespace, 

338 "creationTimestamp": ( 

339 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None 

340 ), 

341 "labels": metadata.labels or {}, 

342 "uid": metadata.uid, 

343 }, 

344 "spec": { 

345 "nodeName": spec.node_name, 

346 "containers": [{"name": c.name, "image": c.image} for c in spec.containers], 

347 "initContainers": [ 

348 {"name": c.name, "image": c.image} for c in (spec.init_containers or []) 

349 ], 

350 }, 

351 "status": { 

352 "phase": status.phase, 

353 "hostIP": status.host_ip, 

354 "podIP": status.pod_ip, 

355 "startTime": status.start_time.isoformat() if status.start_time else None, 

356 "containerStatuses": container_statuses, 

357 "initContainerStatuses": init_container_statuses, 

358 }, 

359 } 

360 

361 

362def _parse_event_to_dict(event: CoreV1Event) -> dict[str, Any]: 

363 """Parse a Kubernetes Event object to a dictionary.""" 

364 return { 

365 "type": event.type, 

366 "reason": event.reason, 

367 "message": event.message, 

368 "count": event.count or 1, 

369 "firstTimestamp": (event.first_timestamp.isoformat() if event.first_timestamp else None), 

370 "lastTimestamp": (event.last_timestamp.isoformat() if event.last_timestamp else None), 

371 "source": { 

372 "component": event.source.component if event.source else None, 

373 "host": event.source.host if event.source else None, 

374 }, 

375 "involvedObject": { 

376 "kind": event.involved_object.kind if event.involved_object else None, 

377 "name": event.involved_object.name if event.involved_object else None, 

378 "namespace": event.involved_object.namespace if event.involved_object else None, 

379 }, 

380 } 

381 

382 

383def _apply_template_parameters( 

384 manifest: dict[str, Any], parameters: dict[str, Any] 

385) -> dict[str, Any]: 

386 """Apply parameter substitutions to a manifest template.""" 

387 import json 

388 import re 

389 

390 manifest_str = json.dumps(manifest) 

391 for key, value in parameters.items(): 

392 pattern = r"\{\{\s*" + re.escape(key) + r"\s*\}\}" 

393 manifest_str = re.sub(pattern, str(value), manifest_str) 

394 result: dict[str, Any] = json.loads(manifest_str) 

395 return result