Coverage for mcp/tools/metrics.py: 68%

146 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Read-only metric-reader MCP tools. 

2 

3These tools surface a single training-style scalar (loss, accuracy, throughput, 

4GPU utilisation, …) in the canonical ``{"metrics": {"<key>": <number>}}`` shape 

5the Mission Observe_Phase already merges, so a ``metric_threshold`` criterion can 

6observe progress with zero scripting. 

7 

8Three readers are registered **default-on** here: 

9 

10* :func:`metrics_cloudwatch_get` — one CloudWatch ``GetMetricStatistics`` 

11 datapoint, region-scoped. 

12* :func:`metrics_from_job_logs` — a scalar pulled from the tail of a job's logs 

13 by JSON key or regex, reusing the existing read-only job-log retrieval path. 

14* :func:`metrics_from_shared_storage_file` — a named field read from a small 

15 metrics file on shared storage, reusing the existing read-only ``gco files`` 

16 path and dispatching on a ``format`` parameter. 

17 

18Every tool returns a plain ``dict`` (so FastMCP passes the top-level ``metrics`` 

19key through verbatim as ``structured_content``) and **never raises**: each wraps 

20its body in a ``try/except MetricReaderError`` that renders a structured error 

21envelope, plus a final ``except Exception`` that maps any unexpected 

22library/API failure to the reader's stable catch-all code. A failed read 

23therefore merges as ``inconclusive`` and the Mission loop keeps running. 

24 

25All three are strictly read-only against *remote* AWS / storage / job-log 

26surfaces. A fourth reader, :func:`metrics_from_local_file`, surfaces a field 

27from a *local-filesystem* metrics file confined to an allowlisted root; because 

28reading the local filesystem is a real security concern even for a read-only 

29tool, it is **flag-gated, default-off** behind ``GCO_ENABLE_LOCAL_METRICS`` and 

30its decorator only fires when that flag (or the umbrella ``GCO_ENABLE_ALL_TOOLS``) 

31is enabled. 

32""" 

33 

34import asyncio 

35import json 

36import os 

37import re 

38import sys 

39import tempfile 

40from datetime import UTC, datetime, timedelta 

41from pathlib import Path 

42from typing import Any 

43 

44import cli_runner 

45from audit import audit_logged 

46from feature_flags import is_enabled 

47from server import mcp 

48 

49# The pure ``metric_readers`` package lives under ``mcp/`` alongside this 

50# ``tools`` package. ``mcp/run_mcp.py`` puts ``mcp/`` on ``sys.path`` at 

51# runtime; mirror the mission.py path-injection convention so ``import 

52# metric_readers`` resolves the same way regardless of entrypoint. 

53sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) 

54 

55from metric_readers import aggregate, cloudwatch, files, localfs, logs # noqa: E402 

56from metric_readers.shape import ( # noqa: E402 

57 ErrorCode, 

58 MetricReaderError, 

59 default_metric_key, 

60 error_envelope, 

61 metrics_result, 

62 validate_metric_name, 

63) 

64 

65# Job-log tail bounds: a caller-supplied or default tail size is clamped 

66# into this inclusive range before any log volume is retrieved. 

67_TAIL_MIN = 1 

68_TAIL_MAX = 10_000 

69_TAIL_DEFAULT = 1000 

70 

71# File-reader default size cap: 10 MiB. The artifact's size is checked 

72# before its full content is read into memory. 

73_MAX_BYTES_DEFAULT = 10_485_760 

74 

75 

76def _resolve_key(output_name: str | None, source_hint: str) -> str: 

77 """Return a validated metric key from an explicit name or a source hint. 

78 

79 When ``output_name`` is supplied it must satisfy the single-Dot_Path-segment 

80 naming constraint; otherwise a deterministic, well-formed key 

81 is derived from ``source_hint``. 

82 """ 

83 if output_name: 

84 return validate_metric_name(output_name) 

85 return default_metric_key(source_hint) 

86 

87 

88# ============================================================================= 

89# CloudWatch reader (default-on) 

90# ============================================================================= 

91 

92 

93@mcp.tool(tags={"safe", "metrics"}) 

94@audit_logged 

95async def metrics_cloudwatch_get( 

96 metric_name: str, 

97 namespace: str, 

98 region: str, 

99 dimensions: dict[str, str] | None = None, 

100 statistic: str = "Average", 

101 period: int = 300, 

102 minutes_back: int = 60, 

103 start_time: str | None = None, 

104 end_time: str | None = None, 

105 output_name: str | None = None, 

106) -> dict[str, Any]: 

107 """[read-only] Read one CloudWatch datapoint as a canonical metric. 

108 

109 Issues a single region-scoped, read-only ``GetMetricStatistics`` request and 

110 returns the most-recent datapoint's statistic value in the canonical 

111 ``{"metrics": {...}}`` shape consumable by a ``metric_threshold`` criterion. 

112 The metric key defaults to the CloudWatch metric name when ``output_name`` 

113 is omitted. 

114 

115 Args: 

116 metric_name: The CloudWatch metric name. 

117 namespace: The CloudWatch namespace the metric lives in. 

118 region: AWS region to scope the read to (supports Multi_Region). 

119 dimensions: Name/value dimension pairs passed through unchanged. 

120 statistic: The statistic to request (Average, Sum, Maximum, Minimum, 

121 SampleCount). 

122 period: Aggregation period in seconds (default 300). 

123 minutes_back: Lookback window in minutes when start/end are not given. 

124 start_time: ISO-8601 window start (used with ``end_time``). 

125 end_time: ISO-8601 window end (used with ``start_time``). 

126 output_name: Explicit metric key; defaults to ``metric_name``. 

127 

128 Returns the Canonical_Metrics_Shape on success, or a Tool_Error_Envelope 

129 (``metric_name_invalid``, ``no_datapoints``, ``aws_unreachable``) on failure. 

130 """ 

131 try: 

132 key = _resolve_key(output_name, metric_name) 

133 

134 if start_time is not None and end_time is not None: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 start_dt = datetime.fromisoformat(start_time) 

136 end_dt = datetime.fromisoformat(end_time) 

137 else: 

138 end_dt = datetime.now(UTC) 

139 start_dt = end_dt - timedelta(minutes=minutes_back) 

140 

141 # The boto3 call blocks, so run it off the event loop. 

142 value, iso_timestamp = await asyncio.to_thread( 

143 cloudwatch.get_datapoint, 

144 metric_name=metric_name, 

145 namespace=namespace, 

146 dimensions=dimensions, 

147 region=region, 

148 period=period, 

149 statistic=statistic, 

150 start_time=start_dt, 

151 end_time=end_dt, 

152 ) 

153 

154 return metrics_result( 

155 key, 

156 value, 

157 source=f"cloudwatch:{namespace}/{metric_name}", 

158 region=region, 

159 statistic=statistic, 

160 datapoint_timestamp=iso_timestamp, 

161 ) 

162 except MetricReaderError as err: 

163 return error_envelope(err.code, **(err.details or {})) 

164 except Exception as exc: # noqa: BLE001 - no exception may escape the tool boundary 

165 # Catch-all for the CloudWatch reader maps to the unreachable class so 

166 # the criterion is left inconclusive rather than crashing the loop. 

167 return error_envelope( 

168 ErrorCode.AWS_UNREACHABLE, 

169 kind="client_error", 

170 region=region, 

171 message=str(exc), 

172 ) 

173 

174 

175# ============================================================================= 

176# Job-log reader (default-on) 

177# ============================================================================= 

178 

179 

180@mcp.tool(tags={"safe", "metrics"}) 

181@audit_logged 

182async def metrics_from_job_logs( 

183 job_name: str, 

184 region: str, 

185 namespace: str = "gco-jobs", 

186 json_key: str | None = None, 

187 regex: str | None = None, 

188 aggregation: str = "last", 

189 tail: int = _TAIL_DEFAULT, 

190 output_name: str | None = None, 

191) -> dict[str, Any]: 

192 """[read-only] Extract a scalar from the tail of a job's logs. 

193 

194 Tails a job's logs through the existing read-only retrieval path and pulls a 

195 candidate scalar from each line by **either** a JSON key (resolved as a 

196 dot-path) **or** a regex (first capture group) — exactly one must be set. 

197 The matched values are coerced to numbers and reduced to one Numeric_Value 

198 via the aggregation mode (default ``last`` = most recent match). Returns the 

199 canonical ``{"metrics": {...}}`` shape. The metric key defaults to the last 

200 segment of ``json_key`` (or ``value`` in regex mode) when ``output_name`` is 

201 omitted. 

202 

203 Args: 

204 job_name: Name of the job whose logs to read. 

205 region: AWS region where the job ran. 

206 namespace: Kubernetes namespace (default ``gco-jobs``). 

207 json_key: JSON dot-path key to resolve per line (mutually exclusive 

208 with ``regex``). 

209 regex: Regex whose first capture group is the scalar (mutually 

210 exclusive with ``json_key``). 

211 aggregation: One of last, first, min, max, mean (default ``last``). 

212 tail: Number of log lines to read; clamped to [1, 10000] (default 1000). 

213 output_name: Explicit metric key. 

214 

215 Returns the Canonical_Metrics_Shape on success, or a Tool_Error_Envelope 

216 (``invalid_extraction_mode``, ``invalid_regex``, ``invalid_aggregation_mode``, 

217 ``log_retrieval_failed``, ``no_match``, ``non_numeric_value``) on failure. 

218 """ 

219 try: 

220 has_key = bool(json_key) 

221 has_regex = bool(regex) 

222 # Exactly one extraction mode. 

223 if has_key == has_regex: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 raise MetricReaderError( 

225 ErrorCode.INVALID_EXTRACTION_MODE, 

226 {"json_key": json_key, "regex": regex}, 

227 ) 

228 

229 # Fail fast on an unknown aggregation mode before any log retrieval. 

230 if aggregation not in aggregate.VALID_MODES: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 raise MetricReaderError( 

232 ErrorCode.INVALID_AGGREGATION_MODE, 

233 {"mode": aggregation, "valid_modes": sorted(aggregate.VALID_MODES)}, 

234 ) 

235 

236 pattern: re.Pattern[str] | None = None 

237 if has_regex: 

238 # Compile and reject uncompilable or zero-capture-group patterns 

239 # . 

240 try: 

241 pattern = re.compile(regex) # type: ignore[arg-type] 

242 except re.error as exc: 

243 raise MetricReaderError( 

244 ErrorCode.INVALID_REGEX, 

245 {"regex": regex, "reason": str(exc)}, 

246 ) from exc 

247 if pattern.groups < 1: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 raise MetricReaderError( 

249 ErrorCode.INVALID_REGEX, 

250 {"regex": regex, "reason": "no capture group"}, 

251 ) 

252 

253 # Resolve the metric key. 

254 if has_key: 

255 key = _resolve_key(output_name, json_key.rsplit(".", 1)[-1]) # type: ignore[union-attr] 

256 else: 

257 key = _resolve_key(output_name, "value") 

258 

259 # Clamp the tail size into [1, 10_000]. 

260 clamped = max(_TAIL_MIN, min(int(tail), _TAIL_MAX)) 

261 

262 raw = await asyncio.to_thread( 

263 cli_runner._run_cli, 

264 "jobs", 

265 "logs", 

266 job_name, 

267 "-r", 

268 region, 

269 "-n", 

270 namespace, 

271 "--tail", 

272 str(clamped), 

273 ) 

274 

275 # Translate an ``{"error": ...}`` CLI payload into a retrieval failure 

276 # . Plain log text is not JSON and is used as-is. 

277 try: 

278 payload = json.loads(raw) 

279 except ValueError: 

280 payload = None 

281 if isinstance(payload, dict) and "error" in payload: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 message = str(payload["error"]) 

283 kind = "unknown_job" if "not found" in message.lower() else "unreachable" 

284 raise MetricReaderError( 

285 ErrorCode.LOG_RETRIEVAL_FAILED, 

286 {"kind": kind, "job_name": job_name, "message": message}, 

287 ) 

288 

289 lines = raw.splitlines() 

290 

291 if pattern is not None: 

292 candidates: list[object] = list(logs.extract_by_regex(lines, pattern)) 

293 else: 

294 candidates = logs.extract_by_json_key(lines, json_key) # type: ignore[arg-type] 

295 

296 # No line matched the key/pattern. 

297 if not candidates: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 raise MetricReaderError( 

299 ErrorCode.NO_MATCH, 

300 {"job_name": job_name, "mode": "json_key" if has_key else "regex"}, 

301 ) 

302 

303 # Coerce each matched value to a Numeric_Value; a value that cannot be 

304 # parsed surfaces as ``non_numeric_value`` with the offending raw value 

305 # . 

306 numeric = [logs.coerce_scalar(candidate) for candidate in candidates] 

307 value = aggregate.reduce_sequence(numeric, aggregation) 

308 

309 return metrics_result( 

310 key, 

311 value, 

312 source=f"job_logs:{job_name}", 

313 region=region, 

314 aggregation=aggregation, 

315 match_count=len(candidates), 

316 ) 

317 except MetricReaderError as err: 

318 return error_envelope(err.code, **(err.details or {})) 

319 except Exception as exc: # noqa: BLE001 - no exception may escape the tool boundary 

320 return error_envelope( 

321 ErrorCode.LOG_RETRIEVAL_FAILED, 

322 kind="unreachable", 

323 job_name=job_name, 

324 message=str(exc), 

325 ) 

326 

327 

328# ============================================================================= 

329# Shared-storage file reader (default-on) 

330# ============================================================================= 

331 

332 

333def _read_shared_storage(path: str, region: str, max_bytes: int) -> bytes: 

334 """Read a shared-storage artifact through the read-only ``gco files`` path. 

335 

336 Reuses ``gco files download`` (the existing read-only storage path) to fetch 

337 the artifact into a short-lived temporary file, then checks its size before 

338 its full content is read into memory: an artifact larger than ``max_bytes`` 

339 raises ``FILE_TOO_LARGE`` and **no** content is returned. A 

340 missing or unreadable artifact raises ``FILE_NOT_FOUND``. The reader 

341 never writes to, moves, or deletes the source artifact. 

342 """ 

343 with tempfile.TemporaryDirectory() as tmp_dir: 

344 local_path = os.path.join(tmp_dir, "artifact") 

345 raw = cli_runner._run_cli("files", "download", path, local_path, "-r", region) 

346 

347 # An explicit CLI error payload means the artifact could not be read. 

348 try: 

349 payload = json.loads(raw) 

350 except ValueError: 

351 payload = None 

352 if isinstance(payload, dict) and "error" in payload: 352 ↛ 353line 352 didn't jump to line 353 because the condition on line 352 was never true

353 raise MetricReaderError( 

354 ErrorCode.FILE_NOT_FOUND, 

355 {"path": path, "message": str(payload["error"])}, 

356 ) 

357 

358 if not os.path.isfile(local_path): 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 raise MetricReaderError(ErrorCode.FILE_NOT_FOUND, {"path": path}) 

360 

361 # Size check before the full content is read into memory. 

362 size = os.path.getsize(local_path) 

363 if size > max_bytes: 363 ↛ 369line 363 didn't jump to line 369 because the condition on line 363 was always true

364 raise MetricReaderError( 

365 ErrorCode.FILE_TOO_LARGE, 

366 {"path": path, "size": size, "max_bytes": max_bytes}, 

367 ) 

368 

369 with open(local_path, "rb") as handle: 

370 return handle.read() 

371 

372 

373@mcp.tool(tags={"safe", "metrics"}) 

374@audit_logged 

375async def metrics_from_shared_storage_file( 

376 path: str, 

377 region: str, 

378 field: str, 

379 format: str, 

380 aggregation: str = "last", 

381 max_bytes: int = _MAX_BYTES_DEFAULT, 

382 output_name: str | None = None, 

383) -> dict[str, Any]: 

384 """[read-only] Read a named field from a shared-storage metrics file. 

385 

386 Reads a small metrics file from shared storage (EFS / the cluster shared 

387 bucket) through the existing read-only ``gco files`` path and surfaces a 

388 named field as a metric in the canonical ``{"metrics": {...}}`` shape, 

389 dispatching on ``format``. Sequence-bearing formats (a list, a CSV column, a 

390 Hugging Face ``log_history``, a JSONL stream, a Parquet column) are reduced 

391 to one Numeric_Value via the aggregation mode. The metric key defaults to 

392 ``field`` when ``output_name`` is omitted. 

393 

394 Args: 

395 path: Shared-storage path to the metrics file. 

396 region: AWS region of the storage. 

397 field: Field/column/key name to read (dot-path for document formats). 

398 format: One of json, csv, hf_trainer_state, jsonl, yaml, parquet, 

399 tfevents. 

400 aggregation: One of last, first, min, max, mean (default ``last``). 

401 max_bytes: Maximum artifact size in bytes (default 10 MiB); the size is 

402 checked before the content is read. 

403 output_name: Explicit metric key; defaults to ``field``. 

404 

405 Returns the Canonical_Metrics_Shape on success, or a Tool_Error_Envelope 

406 (``unsupported_format``, ``metric_name_invalid``, ``file_not_found``, 

407 ``file_too_large``, ``malformed_file``, ``field_not_found``, 

408 ``non_numeric_value``, ``no_numeric_value``, ``empty_sequence``, 

409 ``format_dependency_unavailable``) on failure. 

410 """ 

411 try: 

412 # Reject an unsupported format up front. 

413 if format not in files._HANDLERS: 

414 raise MetricReaderError( 

415 ErrorCode.UNSUPPORTED_FORMAT, 

416 {"format": format, "supported": sorted(files._HANDLERS)}, 

417 ) 

418 

419 # Fail fast on an unknown aggregation mode before any storage read. 

420 if aggregation not in aggregate.VALID_MODES: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 raise MetricReaderError( 

422 ErrorCode.INVALID_AGGREGATION_MODE, 

423 {"mode": aggregation, "valid_modes": sorted(aggregate.VALID_MODES)}, 

424 ) 

425 

426 key = _resolve_key(output_name, field) 

427 

428 # Stat-and-read through the read-only storage path (blocking, so off the 

429 # event loop). Raises FILE_NOT_FOUND / FILE_TOO_LARGE as appropriate. 

430 content = await asyncio.to_thread(_read_shared_storage, path, region, max_bytes) 

431 

432 # Dispatch to the shared per-format handler. Handlers raise 

433 # MALFORMED_FILE / FIELD_NOT_FOUND / numeric-guard / dependency codes. 

434 handler = files._HANDLERS[format] 

435 value = handler(content, field, aggregation) 

436 

437 return metrics_result( 

438 key, 

439 value, 

440 source=f"file:{path}", 

441 region=region, 

442 format=format, 

443 aggregation=aggregation, 

444 ) 

445 except MetricReaderError as err: 

446 return error_envelope(err.code, **(err.details or {})) 

447 except Exception as exc: # noqa: BLE001 - no exception may escape the tool boundary 

448 # Catch-all for the file reader maps to the unreadable class. 

449 return error_envelope( 

450 ErrorCode.FILE_NOT_FOUND, 

451 path=path, 

452 message=str(exc), 

453 ) 

454 

455 

456# ============================================================================= 

457# Local-filesystem file reader (flag-gated, default-off) 

458# ============================================================================= 

459# 

460# This reader is the deliberate exception to the default-on rule: it reads the 

461# MCP host's *local* filesystem, a real security concern even for a read-only 

462# tool, so its decorator is wrapped in ``if is_enabled("GCO_ENABLE_LOCAL_METRICS")`` 

463# (mirroring the module-body gate in ``mcp/tools/mission.py``). With the flag 

464# unset the decorator never fires and FastMCP never sees the tool. The 

465# gate is evaluated **only** through ``feature_flags.is_enabled`` — never by 

466# reading ``os.environ`` for the flag decision — and inherits the 

467# umbrella ``GCO_ENABLE_ALL_TOOLS`` override. 

468 

469 

470def _read_local_file(resolved_path: Path, path: str, max_bytes: int) -> bytes: 

471 """Read a confined local artifact, enforcing the same size cap. 

472 

473 ``resolved_path`` is the Local_Root-confined path produced by 

474 :func:`localfs.resolve_within_root`; ``path`` is the caller-supplied path, 

475 carried through only for error provenance. The artifact's size is checked 

476 via ``stat`` **before** its full content is read into memory: an artifact 

477 larger than ``max_bytes`` raises ``FILE_TOO_LARGE`` and **no** content is 

478 returned. A missing or unreadable artifact raises 

479 ``FILE_NOT_FOUND``. The reader only reads — it never writes to, 

480 moves, or deletes the artifact. 

481 """ 

482 if not resolved_path.is_file(): 

483 raise MetricReaderError(ErrorCode.FILE_NOT_FOUND, {"path": path}) 

484 

485 # Size check before the full content is read into memory. 

486 size = resolved_path.stat().st_size 

487 if size > max_bytes: 

488 raise MetricReaderError( 

489 ErrorCode.FILE_TOO_LARGE, 

490 {"path": path, "size": size, "max_bytes": max_bytes}, 

491 ) 

492 

493 with open(resolved_path, "rb") as handle: 

494 return handle.read() 

495 

496 

497if is_enabled("GCO_ENABLE_LOCAL_METRICS"): 

498 

499 @mcp.tool(tags={"safe", "metrics"}) 

500 @audit_logged 

501 async def metrics_from_local_file( 

502 path: str, 

503 field: str, 

504 format: str, 

505 aggregation: str = "last", 

506 max_bytes: int = _MAX_BYTES_DEFAULT, 

507 output_name: str | None = None, 

508 ) -> dict[str, Any]: 

509 """[gated by GCO_ENABLE_LOCAL_METRICS] [read-only] Read a named field from a LOCAL metrics file. 

510 

511 Reads a small metrics file from a LOCAL filesystem path confined to the 

512 allowlisted root ``GCO_METRICS_LOCAL_ROOT`` and surfaces a named field 

513 as a metric in the canonical ``{"metrics": {...}}`` shape, dispatching 

514 on ``format``. Reuses the same format handlers, aggregation modes, size 

515 cap, and error model as ``metrics_from_shared_storage_file`` — the only 

516 difference is that the path is resolved and confined to the allowlisted 

517 root via realpath containment instead of going through the shared-storage 

518 path. There is **no** ``region`` parameter: local reads are not 

519 region-scoped. Sequence-bearing formats (a list, a CSV column, a Hugging 

520 Face ``log_history``, a JSONL stream, a Parquet column) are reduced to one 

521 Numeric_Value via the aggregation mode. The metric key defaults to 

522 ``field`` when ``output_name`` is omitted. 

523 

524 Args: 

525 path: Local filesystem path to the metrics file (confined to 

526 ``GCO_METRICS_LOCAL_ROOT``). 

527 field: Field/column/key name to read (dot-path for document formats). 

528 format: One of json, csv, hf_trainer_state, jsonl, yaml, parquet, 

529 tfevents. 

530 aggregation: One of last, first, min, max, mean (default ``last``). 

531 max_bytes: Maximum artifact size in bytes (default 10 MiB); the size 

532 is checked before the content is read. 

533 output_name: Explicit metric key; defaults to ``field``. 

534 

535 Returns the Canonical_Metrics_Shape on success, or a Tool_Error_Envelope 

536 (``local_root_not_configured``, ``path_traversal_escape``, 

537 ``symlink_escape``, ``unsupported_format``, ``metric_name_invalid``, 

538 ``file_not_found``, ``file_too_large``, ``malformed_file``, 

539 ``field_not_found``, ``non_numeric_value``, ``no_numeric_value``, 

540 ``empty_sequence``, ``format_dependency_unavailable``) on failure. 

541 """ 

542 try: 

543 # Reject an unsupported format up front. 

544 if format not in files._HANDLERS: 544 ↛ 545line 544 didn't jump to line 545 because the condition on line 544 was never true

545 raise MetricReaderError( 

546 ErrorCode.UNSUPPORTED_FORMAT, 

547 {"format": format, "supported": sorted(files._HANDLERS)}, 

548 ) 

549 

550 # Fail fast on an unknown aggregation mode before any read. 

551 if aggregation not in aggregate.VALID_MODES: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 raise MetricReaderError( 

553 ErrorCode.INVALID_AGGREGATION_MODE, 

554 {"mode": aggregation, "valid_modes": sorted(aggregate.VALID_MODES)}, 

555 ) 

556 

557 key = _resolve_key(output_name, field) 

558 

559 # Read the Local_Root once at the tool boundary and hand it to the 

560 # pure confinement helper. The helper never touches os.environ; an 

561 # unset/empty root raises LOCAL_ROOT_NOT_CONFIGURED, a ``..`` escape 

562 # raises PATH_TRAVERSAL_ESCAPE, and a symlink escape raises 

563 # SYMLINK_ESCAPE — in every escape case no file is read and the 

564 # canonical shape is never returned. 

565 root = os.environ.get("GCO_METRICS_LOCAL_ROOT", "") 

566 resolved_path = localfs.resolve_within_root(path, root) 

567 

568 # Stat-and-read the confined local path (blocking, so off the event 

569 # loop). Raises FILE_NOT_FOUND / FILE_TOO_LARGE as appropriate. 

570 content = await asyncio.to_thread(_read_local_file, resolved_path, path, max_bytes) 

571 

572 # Dispatch to the SAME per-format handler the shared-storage reader 

573 # uses. Handlers raise MALFORMED_FILE / FIELD_NOT_FOUND / 

574 # numeric-guard / dependency codes. 

575 handler = files._HANDLERS[format] 

576 value = handler(content, field, aggregation) 

577 

578 return metrics_result( 

579 key, 

580 value, 

581 source=f"local_file:{path}", 

582 format=format, 

583 aggregation=aggregation, 

584 ) 

585 except MetricReaderError as err: 

586 return error_envelope(err.code, **(err.details or {})) 

587 except Exception as exc: # noqa: BLE001 - no exception may escape the tool boundary 

588 # Catch-all for the local-file reader maps to the unreadable class 

589 # so the criterion is left inconclusive rather than crashing 

590 # the loop. 

591 return error_envelope( 

592 ErrorCode.FILE_NOT_FOUND, 

593 path=path, 

594 message=str(exc), 

595 )