Coverage for gco / services / metrics_publisher.py: 100%
92 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2CloudWatch Metrics Publisher for GCO (Global Capacity Orchestrator on AWS).
4This module provides classes for publishing custom metrics to CloudWatch
5for monitoring and alerting. It supports both single metric puts and
6batch operations for efficiency.
8Metric Namespaces:
9- GCO/HealthMonitor: Cluster health and resource utilization metrics
10- GCO/ManifestProcessor: Manifest submission and processing metrics
12Common Dimensions:
13- ClusterName: EKS cluster identifier
14- Region: AWS region
16Usage:
17 # Health monitor metrics
18 metrics = create_health_monitor_metrics()
19 metrics.publish_resource_utilization(cpu=45.2, memory=62.1, gpu=0.0, active_jobs=5)
21 # Manifest processor metrics
22 metrics = create_manifest_processor_metrics()
23 metrics.publish_submission_metrics(total=10, successful=9, failed=1, validation_failures=0)
24"""
26import logging
27import os
28from datetime import datetime
29from typing import Any
31import boto3
32from botocore.exceptions import ClientError
34logging.basicConfig(
35 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
36)
37logger = logging.getLogger(__name__)
40class MetricsPublisher:
41 """
42 Publishes custom metrics to CloudWatch
43 """
45 def __init__(self, namespace: str, cluster_name: str, region: str):
46 self.namespace = namespace
47 self.cluster_name = cluster_name
48 self.region = region
50 # Initialize CloudWatch client
51 try:
52 self.cloudwatch = boto3.client("cloudwatch", region_name=region)
53 except Exception as e:
54 logger.error(f"Failed to initialize CloudWatch client: {e}")
55 raise
57 def put_metric(
58 self,
59 metric_name: str,
60 value: float,
61 unit: str = "None",
62 dimensions: dict[str, str] | None = None,
63 timestamp: datetime | None = None,
64 ) -> bool:
65 """
66 Put a single metric to CloudWatch
68 Args:
69 metric_name: Name of the metric
70 value: Metric value
71 unit: Metric unit (Count, Percent, Seconds, etc.)
72 dimensions: Additional dimensions for the metric
73 timestamp: Timestamp for the metric (defaults to now)
75 Returns:
76 True if successful, False otherwise
77 """
78 try:
79 # Prepare dimensions
80 metric_dimensions = [
81 {"Name": "ClusterName", "Value": self.cluster_name},
82 {"Name": "Region", "Value": self.region},
83 ]
85 if dimensions:
86 for key, dim_value in dimensions.items():
87 metric_dimensions.append({"Name": key, "Value": dim_value})
89 # Prepare metric data
90 metric_data = {
91 "MetricName": metric_name,
92 "Value": value,
93 "Unit": unit,
94 "Dimensions": metric_dimensions,
95 }
97 if timestamp:
98 metric_data["Timestamp"] = timestamp
100 # Put metric to CloudWatch
101 self.cloudwatch.put_metric_data(Namespace=self.namespace, MetricData=[metric_data])
103 logger.debug(f"Published metric {metric_name}={value} to {self.namespace}")
104 return True
106 except ClientError as e:
107 logger.error(f"Failed to put metric {metric_name}: {e}")
108 return False
109 except Exception as e:
110 logger.error(f"Unexpected error putting metric {metric_name}: {e}")
111 return False
113 def put_metrics_batch(self, metrics: list[dict[str, Any]]) -> bool:
114 """
115 Put multiple metrics to CloudWatch in a batch
117 Args:
118 metrics: List of metric dictionaries with keys: name, value, unit, dimensions, timestamp
120 Returns:
121 True if successful, False otherwise
122 """
123 try:
124 metric_data = []
126 for metric in metrics:
127 # Prepare dimensions
128 metric_dimensions = [
129 {"Name": "ClusterName", "Value": self.cluster_name},
130 {"Name": "Region", "Value": self.region},
131 ]
133 if metric.get("dimensions"):
134 for key, value in metric["dimensions"].items():
135 metric_dimensions.append({"Name": key, "Value": value})
137 # Prepare metric data
138 metric_item = {
139 "MetricName": metric["name"],
140 "Value": metric["value"],
141 "Unit": metric.get("unit", "None"),
142 "Dimensions": metric_dimensions,
143 }
145 if metric.get("timestamp"):
146 metric_item["Timestamp"] = metric["timestamp"]
148 metric_data.append(metric_item)
150 # CloudWatch allows max 20 metrics per batch
151 batch_size = 20
152 for i in range(0, len(metric_data), batch_size):
153 batch = metric_data[i : i + batch_size]
154 self.cloudwatch.put_metric_data(Namespace=self.namespace, MetricData=batch)
156 logger.debug(f"Published {len(metrics)} metrics to {self.namespace}")
157 return True
159 except ClientError as e:
160 logger.error(f"Failed to put metrics batch: {e}")
161 return False
162 except Exception as e:
163 logger.error(f"Unexpected error putting metrics batch: {e}")
164 return False
167class HealthMonitorMetrics(MetricsPublisher):
168 """
169 Metrics publisher for Health Monitor service
170 """
172 def __init__(self, cluster_name: str, region: str):
173 super().__init__("GCO/HealthMonitor", cluster_name, region)
175 def publish_resource_utilization(
176 self, cpu_percent: float, memory_percent: float, gpu_percent: float, active_jobs: int
177 ) -> bool:
178 """
179 Publish resource utilization metrics
180 """
181 metrics = [
182 {"name": "ClusterCpuUtilization", "value": cpu_percent, "unit": "Percent"},
183 {"name": "ClusterMemoryUtilization", "value": memory_percent, "unit": "Percent"},
184 {"name": "ClusterGpuUtilization", "value": gpu_percent, "unit": "Percent"},
185 {"name": "ActiveJobs", "value": active_jobs, "unit": "Count"},
186 ]
188 return self.put_metrics_batch(metrics)
190 def publish_health_status(self, is_healthy: bool, threshold_violations: list[str]) -> bool:
191 """
192 Publish health status metrics
193 """
194 metrics = [
195 {"name": "ClusterHealthy", "value": 1.0 if is_healthy else 0.0, "unit": "None"},
196 {"name": "ThresholdViolations", "value": len(threshold_violations), "unit": "Count"},
197 ]
199 return self.put_metrics_batch(metrics)
202class ManifestProcessorMetrics(MetricsPublisher):
203 """
204 Metrics publisher for Manifest Processor service
205 """
207 def __init__(self, cluster_name: str, region: str):
208 super().__init__("GCO/ManifestProcessor", cluster_name, region)
210 def publish_submission_metrics(
211 self,
212 total_submissions: int,
213 successful_submissions: int,
214 failed_submissions: int,
215 validation_failures: int,
216 ) -> bool:
217 """
218 Publish manifest submission metrics
219 """
220 metrics = [
221 {"name": "ManifestSubmissions", "value": total_submissions, "unit": "Count"},
222 {"name": "ManifestSuccesses", "value": successful_submissions, "unit": "Count"},
223 {"name": "ManifestFailures", "value": failed_submissions, "unit": "Count"},
224 {"name": "ValidationFailures", "value": validation_failures, "unit": "Count"},
225 ]
227 if total_submissions > 0:
228 success_rate = (successful_submissions / total_submissions) * 100
229 metrics.append(
230 {"name": "ManifestSuccessRate", "value": success_rate, "unit": "Percent"}
231 )
233 return self.put_metrics_batch(metrics)
235 def publish_resource_metrics(
236 self, resources_created: int, resources_updated: int, resources_deleted: int
237 ) -> bool:
238 """
239 Publish resource management metrics
240 """
241 metrics = [
242 {"name": "ResourcesCreated", "value": resources_created, "unit": "Count"},
243 {"name": "ResourcesUpdated", "value": resources_updated, "unit": "Count"},
244 {"name": "ResourcesDeleted", "value": resources_deleted, "unit": "Count"},
245 ]
247 return self.put_metrics_batch(metrics)
249 def publish_performance_metrics(self, avg_processing_time: float, queue_size: int) -> bool:
250 """
251 Publish performance metrics
252 """
253 metrics = [
254 {"name": "AvgProcessingTime", "value": avg_processing_time, "unit": "Seconds"},
255 {"name": "QueueSize", "value": queue_size, "unit": "Count"},
256 ]
258 return self.put_metrics_batch(metrics)
261def create_health_monitor_metrics() -> HealthMonitorMetrics:
262 """
263 Create HealthMonitorMetrics instance from environment variables
264 """
265 cluster_name = os.getenv("CLUSTER_NAME", "unknown-cluster")
266 region = os.getenv("REGION", "unknown-region")
268 return HealthMonitorMetrics(cluster_name, region)
271def create_manifest_processor_metrics() -> ManifestProcessorMetrics:
272 """
273 Create ManifestProcessorMetrics instance from environment variables
274 """
275 cluster_name = os.getenv("CLUSTER_NAME", "unknown-cluster")
276 region = os.getenv("REGION", "unknown-region")
278 return ManifestProcessorMetrics(cluster_name, region)