Coverage for gco/services/structured_logging.py: 91%
35 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Structured JSON logging for GCO services.
4Provides a JSON log formatter that outputs structured logs compatible with
5CloudWatch Logs Insights queries. Use ``configure_structured_logging()``
6at service startup to enable JSON output for all loggers.
8Example CloudWatch Insights query:
9 fields @timestamp, level, message, cluster_id, region
10 | filter level = "ERROR"
11 | sort @timestamp desc
13Environment Variables:
14 LOG_FORMAT: "json" for structured logging, "text" for human-readable (default: json)
15 LOG_LEVEL: Logging level (default: INFO)
16"""
18import json
19import logging
20import os
21import traceback
22from datetime import UTC, datetime
23from typing import Any
26class StructuredJsonFormatter(logging.Formatter):
27 """
28 Formats log records as single-line JSON objects.
30 Each log line includes:
31 - timestamp (ISO 8601)
32 - level
33 - logger (logger name)
34 - message
35 - Any extra fields passed via the ``extra`` dict
37 Exceptions are serialized into an ``exception`` field with type,
38 message, and traceback.
39 """
41 def __init__(self, service_name: str = "gco", **default_fields: Any):
42 super().__init__()
43 self.service_name = service_name
44 self.default_fields = default_fields
46 def format(self, record: logging.LogRecord) -> str:
47 log_entry: dict[str, Any] = {
48 "timestamp": datetime.now(UTC).isoformat(),
49 "level": record.levelname,
50 "logger": record.name,
51 "message": record.getMessage(),
52 "service": self.service_name,
53 }
55 # Include default fields (e.g., cluster_id, region)
56 log_entry.update(self.default_fields)
58 # Include any extra fields passed via logger.info("msg", extra={...})
59 # Filter out standard LogRecord attributes
60 standard_attrs = {
61 "name",
62 "msg",
63 "args",
64 "created",
65 "relativeCreated",
66 "exc_info",
67 "exc_text",
68 "stack_info",
69 "lineno",
70 "funcName",
71 "pathname",
72 "filename",
73 "module",
74 "thread",
75 "threadName",
76 "process",
77 "processName",
78 "levelname",
79 "levelno",
80 "msecs",
81 "message",
82 "taskName",
83 }
84 for key, value in record.__dict__.items():
85 if key not in standard_attrs and not key.startswith("_"): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 log_entry[key] = value
88 # Serialize exception info
89 if record.exc_info and record.exc_info[1]:
90 exc_type, exc_value, exc_tb = record.exc_info
91 log_entry["exception"] = {
92 "type": exc_type.__name__ if exc_type else "Unknown",
93 "message": str(exc_value),
94 "traceback": traceback.format_exception(exc_type, exc_value, exc_tb),
95 }
97 return json.dumps(log_entry, default=str)
100def configure_structured_logging(
101 service_name: str = "gco",
102 level: str | None = None,
103 **default_fields: Any,
104) -> None:
105 """
106 Configure the root logger with structured JSON output.
108 Call this once at service startup. All existing loggers will inherit
109 the JSON formatter.
111 Args:
112 service_name: Name included in every log line (e.g., "health-monitor").
113 level: Log level override. Defaults to LOG_LEVEL env var or INFO.
114 **default_fields: Extra fields included in every log line
115 (e.g., cluster_id="...", region="us-east-1").
116 """
117 log_format = os.environ.get("LOG_FORMAT", "json").lower()
118 log_level = level or os.environ.get("LOG_LEVEL") or "INFO"
120 root_logger = logging.getLogger()
121 root_logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
123 # Remove existing handlers to avoid duplicate output
124 root_logger.handlers.clear()
126 handler = logging.StreamHandler()
128 if log_format == "json": 128 ↛ 132line 128 didn't jump to line 132 because the condition on line 128 was always true
129 handler.setFormatter(StructuredJsonFormatter(service_name=service_name, **default_fields))
130 else:
131 # Human-readable fallback for local development
132 handler.setFormatter(
133 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
134 )
136 root_logger.addHandler(handler)
139def sanitize_log_value(value: Any) -> str:
140 """
141 Sanitize a value for safe inclusion in log messages.
143 Escapes CR/LF and other control characters that could be used for log
144 injection (log forging), where an attacker embeds newlines in user input
145 to spoof log entries.
147 Always call this on untrusted inputs (HTTP path/query params, job IDs,
148 namespaces, etc.) before passing them to ``logger.info/warning/error``.
150 Args:
151 value: Any value; will be coerced to ``str``.
153 Returns:
154 A string with control characters escaped (e.g. ``\\n`` becomes the
155 literal two-character sequence backslash-n).
156 """
157 return str(value).encode("unicode_escape").decode("ascii")