Coverage for gco / services / metrics_publisher.py: 100%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2CloudWatch Metrics Publisher for GCO (Global Capacity Orchestrator on AWS). 

3 

4This module provides classes for publishing custom metrics to CloudWatch 

5for monitoring and alerting. It supports both single metric puts and 

6batch operations for efficiency. 

7 

8Metric Namespaces: 

9- GCO/HealthMonitor: Cluster health and resource utilization metrics 

10- GCO/ManifestProcessor: Manifest submission and processing metrics 

11 

12Common Dimensions: 

13- ClusterName: EKS cluster identifier 

14- Region: AWS region 

15 

16Usage: 

17 # Health monitor metrics 

18 metrics = create_health_monitor_metrics() 

19 metrics.publish_resource_utilization(cpu=45.2, memory=62.1, gpu=0.0, active_jobs=5) 

20 

21 # Manifest processor metrics 

22 metrics = create_manifest_processor_metrics() 

23 metrics.publish_submission_metrics(total=10, successful=9, failed=1, validation_failures=0) 

24""" 

25 

26import logging 

27import os 

28from datetime import datetime 

29from typing import Any 

30 

31import boto3 

32from botocore.exceptions import ClientError 

33 

34logging.basicConfig( 

35 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

36) 

37logger = logging.getLogger(__name__) 

38 

39 

40class MetricsPublisher: 

41 """ 

42 Publishes custom metrics to CloudWatch 

43 """ 

44 

45 def __init__(self, namespace: str, cluster_name: str, region: str): 

46 self.namespace = namespace 

47 self.cluster_name = cluster_name 

48 self.region = region 

49 

50 # Initialize CloudWatch client 

51 try: 

52 self.cloudwatch = boto3.client("cloudwatch", region_name=region) 

53 except Exception as e: 

54 logger.error(f"Failed to initialize CloudWatch client: {e}") 

55 raise 

56 

57 def put_metric( 

58 self, 

59 metric_name: str, 

60 value: float, 

61 unit: str = "None", 

62 dimensions: dict[str, str] | None = None, 

63 timestamp: datetime | None = None, 

64 ) -> bool: 

65 """ 

66 Put a single metric to CloudWatch 

67 

68 Args: 

69 metric_name: Name of the metric 

70 value: Metric value 

71 unit: Metric unit (Count, Percent, Seconds, etc.) 

72 dimensions: Additional dimensions for the metric 

73 timestamp: Timestamp for the metric (defaults to now) 

74 

75 Returns: 

76 True if successful, False otherwise 

77 """ 

78 try: 

79 # Prepare dimensions 

80 metric_dimensions = [ 

81 {"Name": "ClusterName", "Value": self.cluster_name}, 

82 {"Name": "Region", "Value": self.region}, 

83 ] 

84 

85 if dimensions: 

86 for key, dim_value in dimensions.items(): 

87 metric_dimensions.append({"Name": key, "Value": dim_value}) 

88 

89 # Prepare metric data 

90 metric_data = { 

91 "MetricName": metric_name, 

92 "Value": value, 

93 "Unit": unit, 

94 "Dimensions": metric_dimensions, 

95 } 

96 

97 if timestamp: 

98 metric_data["Timestamp"] = timestamp 

99 

100 # Put metric to CloudWatch 

101 self.cloudwatch.put_metric_data(Namespace=self.namespace, MetricData=[metric_data]) 

102 

103 logger.debug(f"Published metric {metric_name}={value} to {self.namespace}") 

104 return True 

105 

106 except ClientError as e: 

107 logger.error(f"Failed to put metric {metric_name}: {e}") 

108 return False 

109 except Exception as e: 

110 logger.error(f"Unexpected error putting metric {metric_name}: {e}") 

111 return False 

112 

113 def put_metrics_batch(self, metrics: list[dict[str, Any]]) -> bool: 

114 """ 

115 Put multiple metrics to CloudWatch in a batch 

116 

117 Args: 

118 metrics: List of metric dictionaries with keys: name, value, unit, dimensions, timestamp 

119 

120 Returns: 

121 True if successful, False otherwise 

122 """ 

123 try: 

124 metric_data = [] 

125 

126 for metric in metrics: 

127 # Prepare dimensions 

128 metric_dimensions = [ 

129 {"Name": "ClusterName", "Value": self.cluster_name}, 

130 {"Name": "Region", "Value": self.region}, 

131 ] 

132 

133 if metric.get("dimensions"): 

134 for key, value in metric["dimensions"].items(): 

135 metric_dimensions.append({"Name": key, "Value": value}) 

136 

137 # Prepare metric data 

138 metric_item = { 

139 "MetricName": metric["name"], 

140 "Value": metric["value"], 

141 "Unit": metric.get("unit", "None"), 

142 "Dimensions": metric_dimensions, 

143 } 

144 

145 if metric.get("timestamp"): 

146 metric_item["Timestamp"] = metric["timestamp"] 

147 

148 metric_data.append(metric_item) 

149 

150 # CloudWatch allows max 20 metrics per batch 

151 batch_size = 20 

152 for i in range(0, len(metric_data), batch_size): 

153 batch = metric_data[i : i + batch_size] 

154 self.cloudwatch.put_metric_data(Namespace=self.namespace, MetricData=batch) 

155 

156 logger.debug(f"Published {len(metrics)} metrics to {self.namespace}") 

157 return True 

158 

159 except ClientError as e: 

160 logger.error(f"Failed to put metrics batch: {e}") 

161 return False 

162 except Exception as e: 

163 logger.error(f"Unexpected error putting metrics batch: {e}") 

164 return False 

165 

166 

167class HealthMonitorMetrics(MetricsPublisher): 

168 """ 

169 Metrics publisher for Health Monitor service 

170 """ 

171 

172 def __init__(self, cluster_name: str, region: str): 

173 super().__init__("GCO/HealthMonitor", cluster_name, region) 

174 

175 def publish_resource_utilization( 

176 self, cpu_percent: float, memory_percent: float, gpu_percent: float, active_jobs: int 

177 ) -> bool: 

178 """ 

179 Publish resource utilization metrics 

180 """ 

181 metrics = [ 

182 {"name": "ClusterCpuUtilization", "value": cpu_percent, "unit": "Percent"}, 

183 {"name": "ClusterMemoryUtilization", "value": memory_percent, "unit": "Percent"}, 

184 {"name": "ClusterGpuUtilization", "value": gpu_percent, "unit": "Percent"}, 

185 {"name": "ActiveJobs", "value": active_jobs, "unit": "Count"}, 

186 ] 

187 

188 return self.put_metrics_batch(metrics) 

189 

190 def publish_health_status(self, is_healthy: bool, threshold_violations: list[str]) -> bool: 

191 """ 

192 Publish health status metrics 

193 """ 

194 metrics = [ 

195 {"name": "ClusterHealthy", "value": 1.0 if is_healthy else 0.0, "unit": "None"}, 

196 {"name": "ThresholdViolations", "value": len(threshold_violations), "unit": "Count"}, 

197 ] 

198 

199 return self.put_metrics_batch(metrics) 

200 

201 

202class ManifestProcessorMetrics(MetricsPublisher): 

203 """ 

204 Metrics publisher for Manifest Processor service 

205 """ 

206 

207 def __init__(self, cluster_name: str, region: str): 

208 super().__init__("GCO/ManifestProcessor", cluster_name, region) 

209 

210 def publish_submission_metrics( 

211 self, 

212 total_submissions: int, 

213 successful_submissions: int, 

214 failed_submissions: int, 

215 validation_failures: int, 

216 ) -> bool: 

217 """ 

218 Publish manifest submission metrics 

219 """ 

220 metrics = [ 

221 {"name": "ManifestSubmissions", "value": total_submissions, "unit": "Count"}, 

222 {"name": "ManifestSuccesses", "value": successful_submissions, "unit": "Count"}, 

223 {"name": "ManifestFailures", "value": failed_submissions, "unit": "Count"}, 

224 {"name": "ValidationFailures", "value": validation_failures, "unit": "Count"}, 

225 ] 

226 

227 if total_submissions > 0: 

228 success_rate = (successful_submissions / total_submissions) * 100 

229 metrics.append( 

230 {"name": "ManifestSuccessRate", "value": success_rate, "unit": "Percent"} 

231 ) 

232 

233 return self.put_metrics_batch(metrics) 

234 

235 def publish_resource_metrics( 

236 self, resources_created: int, resources_updated: int, resources_deleted: int 

237 ) -> bool: 

238 """ 

239 Publish resource management metrics 

240 """ 

241 metrics = [ 

242 {"name": "ResourcesCreated", "value": resources_created, "unit": "Count"}, 

243 {"name": "ResourcesUpdated", "value": resources_updated, "unit": "Count"}, 

244 {"name": "ResourcesDeleted", "value": resources_deleted, "unit": "Count"}, 

245 ] 

246 

247 return self.put_metrics_batch(metrics) 

248 

249 def publish_performance_metrics(self, avg_processing_time: float, queue_size: int) -> bool: 

250 """ 

251 Publish performance metrics 

252 """ 

253 metrics = [ 

254 {"name": "AvgProcessingTime", "value": avg_processing_time, "unit": "Seconds"}, 

255 {"name": "QueueSize", "value": queue_size, "unit": "Count"}, 

256 ] 

257 

258 return self.put_metrics_batch(metrics) 

259 

260 

261def create_health_monitor_metrics() -> HealthMonitorMetrics: 

262 """ 

263 Create HealthMonitorMetrics instance from environment variables 

264 """ 

265 cluster_name = os.getenv("CLUSTER_NAME", "unknown-cluster") 

266 region = os.getenv("REGION", "unknown-region") 

267 

268 return HealthMonitorMetrics(cluster_name, region) 

269 

270 

271def create_manifest_processor_metrics() -> ManifestProcessorMetrics: 

272 """ 

273 Create ManifestProcessorMetrics instance from environment variables 

274 """ 

275 cluster_name = os.getenv("CLUSTER_NAME", "unknown-cluster") 

276 region = os.getenv("REGION", "unknown-region") 

277 

278 return ManifestProcessorMetrics(cluster_name, region)