Coverage for mcp/metric_readers/files.py: 68%
137 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Read a named field out of a metrics file and reduce it to one number.
3A job can persist its metrics in many shapes: a hand-written JSON or YAML
4document, a CSV table, a Hugging Face ``Trainer`` state file, a stream of
5JSON-per-line step records, or a columnar Parquet file. This module turns any
6of those into a single finite number a threshold check can read.
8The work is split into two layers:
10* A small set of pure, per-format **handlers**. Each handler takes the raw file
11 bytes, the caller's field name, and an aggregation mode, and returns one
12 finite number — or raises :class:`~.shape.MetricReaderError` with a stable
13 code describing why it could not. Handlers do no I/O of their own: the bytes
14 are handed in already, so the same handler serves both the shared-storage
15 reader and the local-filesystem reader.
16* A :data:`_HANDLERS` dispatch map from a format name to its handler. The tool
17 wrapper looks a format up here; a format with no entry is reported as
18 unsupported rather than crashing.
20Field resolution differs by format. The document formats (``json``, ``yaml``)
21resolve the field as a dot-path walked segment-by-segment through nested
22objects. The tabular and record formats (``csv``, ``jsonl``, the Hugging Face
23``log_history``) treat the field as a flat column or key name and gather one
24value per row, line, or entry. A single resolved number is returned as-is; a
25gathered sequence is collapsed with the chosen aggregation mode, which ignores
26any non-numeric entries along the way.
28Every parsing or decoding failure becomes a malformed-file error; a field that
29cannot be located becomes a field-not-found error; a value that is present but
30is not a real number becomes a non-numeric error. Nothing escapes as an
31unhandled exception.
32"""
34from __future__ import annotations
36import csv
37import io
38import json
39import os
40import tempfile
41from collections.abc import Callable
42from typing import Literal, cast
44import yaml
46from . import shape
47from .aggregate import reduce_sequence
49# The full set of file formats the reader understands. The document and record
50# formats are handled here; the columnar and TensorBoard formats are dispatched
51# through the same map and carry their own lazy-import handlers.
52ReaderFormat = Literal[
53 "json",
54 "csv",
55 "hf_trainer_state",
56 "jsonl",
57 "yaml",
58 "parquet",
59 "tfevents",
60]
63def _decode(content: bytes, fmt: str) -> str:
64 """Decode raw file bytes as UTF-8 text, or report a malformed file.
66 The text-oriented formats (``csv``, ``jsonl``) need to work over decoded
67 lines. Bytes that are not valid UTF-8 are surfaced as a malformed-file
68 error tagged with the format, rather than letting the decode error escape.
69 """
70 try:
71 return content.decode("utf-8")
72 except UnicodeDecodeError as exc:
73 raise shape.MetricReaderError(
74 shape.ErrorCode.MALFORMED_FILE,
75 {"format": fmt, "reason": "decode_error"},
76 ) from exc
79def _maybe_number(raw: object) -> object:
80 """Best-effort coerce a raw cell to a number, leaving non-numbers untouched.
82 Cells read from a CSV arrive as strings. A string that parses cleanly as an
83 integer or a float is returned as that number; anything else (an empty
84 cell, a label, ``None`` from a short row) is returned unchanged so the
85 downstream numeric filter can drop it. Non-string inputs pass straight
86 through.
87 """
88 if not isinstance(raw, str): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 return raw
90 text = raw.strip()
91 try:
92 return int(text)
93 except ValueError:
94 pass
95 try:
96 return float(text)
97 except ValueError:
98 return raw
101def _describe_value(value: object) -> dict[str, object]:
102 """Render a non-numeric value into JSON-safe error-detail fields.
104 Returns the offending value (kept verbatim when it is a simple scalar,
105 otherwise its ``repr``) alongside its type name, so an operator can see
106 both what was found and what kind of thing it was.
107 """
108 if value is None or isinstance(value, str | int | float | bool):
109 shown: object = value
110 else:
111 shown = repr(value)
112 return {"value": shown, "value_type": type(value).__name__}
115def _resolve_dot_path(obj: object, field: str) -> object:
116 """Walk a dot-separated path through nested objects and return the leaf.
118 Each segment of ``field`` indexes one level deeper into a mapping. A
119 segment that is missing, or a level that is not a mapping, means the field
120 is absent and raises a field-not-found error.
121 """
122 current: object = obj
123 for segment in field.split("."):
124 if isinstance(current, dict) and segment in current:
125 current = current[segment]
126 else:
127 raise shape.MetricReaderError(
128 shape.ErrorCode.FIELD_NOT_FOUND,
129 {"field": field},
130 )
131 return current
134def _reduce_resolved(value: object, field: str, mode: str) -> float:
135 """Turn a resolved field value into one number.
137 A value that is already a real, finite number is returned directly. A list
138 is collapsed with the aggregation mode. Anything else is present but not a
139 number, which is a non-numeric error carrying the offending value.
140 """
141 if shape.is_numeric_value(value):
142 return cast(float, value)
143 if isinstance(value, list): 143 ↛ 145line 143 didn't jump to line 145 because the condition on line 143 was always true
144 return reduce_sequence(value, mode)
145 raise shape.MetricReaderError(
146 shape.ErrorCode.NON_NUMERIC_VALUE,
147 {"field": field, **_describe_value(value)},
148 )
151def _handle_json(content: bytes, field: str, mode: str) -> float:
152 """Read a field from a plain JSON document.
154 The document is parsed, the field resolved by dot-path, and the leaf either
155 returned directly (a single number) or reduced (a list). Bytes that do not
156 parse as JSON are a malformed-file error.
157 """
158 try:
159 parsed: object = json.loads(content)
160 except (ValueError, UnicodeDecodeError) as exc:
161 raise shape.MetricReaderError(
162 shape.ErrorCode.MALFORMED_FILE,
163 {"format": "json"},
164 ) from exc
165 return _reduce_resolved(_resolve_dot_path(parsed, field), field, mode)
168def _handle_yaml(content: bytes, field: str, mode: str) -> float:
169 """Read a field from a YAML document using the safe loader.
171 Mirrors the JSON handler: dot-path resolution, then a single number
172 returned directly or a list reduced. A document the safe loader rejects is
173 a malformed-file error.
174 """
175 try:
176 parsed: object = yaml.safe_load(content)
177 except yaml.YAMLError as exc:
178 raise shape.MetricReaderError(
179 shape.ErrorCode.MALFORMED_FILE,
180 {"format": "yaml"},
181 ) from exc
182 return _reduce_resolved(_resolve_dot_path(parsed, field), field, mode)
185def _handle_csv(content: bytes, field: str, mode: str) -> float:
186 """Read one column from a CSV table and reduce it.
188 The first row is the header; ``field`` names one of its columns. Every data
189 row's cell in that column is coerced toward a number and the resulting
190 sequence is reduced with the aggregation mode (non-numeric cells are
191 ignored). A column name absent from the header is a field-not-found error.
192 """
193 text = _decode(content, "csv")
194 reader = csv.DictReader(io.StringIO(text))
195 fieldnames = reader.fieldnames
196 if not fieldnames or field not in fieldnames:
197 raise shape.MetricReaderError(
198 shape.ErrorCode.FIELD_NOT_FOUND,
199 {"field": field},
200 )
201 candidates: list[object] = [_maybe_number(row.get(field)) for row in reader]
202 return reduce_sequence(candidates, mode)
205def _handle_jsonl(content: bytes, field: str, mode: str) -> float:
206 """Read a field across a stream of one-JSON-object-per-line records.
208 Each non-blank line is parsed on its own; a line that is not valid JSON is
209 skipped rather than failing the whole read. The named key is gathered from
210 every object that carries it, and the gathered sequence is reduced. When no
211 line yields a usable number — whether the key was never present or never
212 numeric — the result is a no-numeric-value error.
213 """
214 text = _decode(content, "jsonl")
215 candidates: list[object] = []
216 for raw_line in text.splitlines():
217 stripped = raw_line.strip()
218 if not stripped: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 continue
220 try:
221 obj: object = json.loads(stripped)
222 except ValueError:
223 continue
224 if isinstance(obj, dict) and field in obj:
225 candidates.append(obj[field])
226 try:
227 return reduce_sequence(candidates, mode)
228 except shape.MetricReaderError as exc:
229 # A stream that carried the field nowhere collapses to the same
230 # "no usable number" outcome as one where every value was non-numeric.
231 if exc.code == shape.ErrorCode.EMPTY_SEQUENCE:
232 raise shape.MetricReaderError(
233 shape.ErrorCode.NO_NUMERIC_VALUE,
234 {"field": field},
235 ) from exc
236 raise
239def _handle_hf(content: bytes, field: str, mode: str) -> float:
240 """Read a scalar from a Hugging Face ``Trainer`` state file.
242 When the document carries a ``log_history`` list, the named field is
243 gathered from every per-step entry that includes it and the sequence is
244 reduced — this is the path for per-step scalars such as ``loss`` or
245 ``eval_loss``. When ``log_history`` is absent (or the field never appears
246 in it), the field is looked up as a top-level key and returned directly, as
247 in an ``all_results.json``. A field found in neither place is a
248 field-not-found error.
249 """
250 try:
251 parsed: object = json.loads(content)
252 except (ValueError, UnicodeDecodeError) as exc:
253 raise shape.MetricReaderError(
254 shape.ErrorCode.MALFORMED_FILE,
255 {"format": "hf_trainer_state"},
256 ) from exc
258 log_history: object = parsed.get("log_history") if isinstance(parsed, dict) else None
259 if isinstance(log_history, list): 259 ↛ 266line 259 didn't jump to line 266 because the condition on line 259 was always true
260 candidates: list[object] = [
261 entry[field] for entry in log_history if isinstance(entry, dict) and field in entry
262 ]
263 if candidates:
264 return reduce_sequence(candidates, mode)
266 if isinstance(parsed, dict) and field in parsed: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 value = parsed[field]
268 if shape.is_numeric_value(value):
269 return cast(float, value)
270 raise shape.MetricReaderError(
271 shape.ErrorCode.NON_NUMERIC_VALUE,
272 {"field": field, **_describe_value(value)},
273 )
275 raise shape.MetricReaderError(
276 shape.ErrorCode.FIELD_NOT_FOUND,
277 {"field": field},
278 )
281def _handle_parquet(content: bytes, field: str, mode: str) -> float:
282 """Reduce one column of a columnar (Parquet) file to a single number.
284 The columnar libraries (``pandas`` + ``pyarrow``) ship only in the
285 analytics extra, so they are imported lazily inside the handler. When
286 either is missing the failure is reported as a
287 :attr:`~.shape.ErrorCode.FORMAT_DEPENDENCY_UNAVAILABLE` envelope rather
288 than letting the ``ImportError`` escape.
290 Once loaded, the file is parsed into a frame; bytes that do not parse as
291 Parquet are a malformed-file error. The named column is gathered as native
292 Python values — ``Series.tolist()`` converts NumPy scalars to ``int`` /
293 ``float`` so the numeric guard recognises them — and reduced with the
294 aggregation mode. A column absent from the schema is a field-not-found
295 error. An aggregated result of exactly ``0`` is a valid number and is
296 returned as-is; the reducer never treats it as "missing".
297 """
298 try:
299 import pandas as pd
300 import pyarrow # noqa: F401 # read_parquet's engine; imported so a missing wheel surfaces here
301 except ImportError as exc:
302 raise shape.MetricReaderError(
303 shape.ErrorCode.FORMAT_DEPENDENCY_UNAVAILABLE,
304 {"format": "parquet", "dependency": "pandas+pyarrow"},
305 ) from exc
307 try:
308 frame = pd.read_parquet(io.BytesIO(content))
309 except Exception as exc: # noqa: BLE001 - any pandas/pyarrow read failure is a malformed file
310 raise shape.MetricReaderError(
311 shape.ErrorCode.MALFORMED_FILE,
312 {"format": "parquet"},
313 ) from exc
315 if field not in frame.columns:
316 raise shape.MetricReaderError(
317 shape.ErrorCode.FIELD_NOT_FOUND,
318 {"field": field},
319 )
321 column_values: list[object] = frame[field].tolist()
322 return reduce_sequence(column_values, mode)
325def _handle_tfevents(content: bytes, field: str, mode: str) -> float:
326 """Reduce the scalar sequence for a TensorBoard tag to a single number.
328 TensorBoard ``tfevents`` reading is the optional/stretch format: its parser
329 (``tbparse``, which pulls in ``tensorboard``) is not a baseline dependency,
330 so it is imported lazily inside the handler. When the parser is not
331 installed the failure is reported as a
332 :attr:`~.shape.ErrorCode.FORMAT_DEPENDENCY_UNAVAILABLE` envelope rather than
333 an import crash, so the baseline reader keeps working without the
334 heavyweight dependency.
336 When the parser is present, the handed-in bytes are staged into a
337 short-lived temporary event file (``tbparse`` reads from a path, not a
338 buffer), the scalar rows for the requested ``field`` tag are gathered, and
339 the sequence is reduced with the aggregation mode — the caller defaults this
340 to ``last``, i.e. the latest scalar for the tag. A tag that carries
341 no scalar rows is a field-not-found error, and bytes that do not parse as an
342 event file are a malformed-file error.
343 """
344 try:
345 from tbparse import SummaryReader
346 except ImportError as exc:
347 raise shape.MetricReaderError(
348 shape.ErrorCode.FORMAT_DEPENDENCY_UNAVAILABLE,
349 {"format": "tfevents", "dependency": "tbparse/tensorboard"},
350 ) from exc
352 with tempfile.TemporaryDirectory() as tmp_dir:
353 event_path = os.path.join(tmp_dir, "events.out.tfevents")
354 with open(event_path, "wb") as handle:
355 handle.write(content)
356 try:
357 frame = SummaryReader(event_path, pivot=False).scalars
358 except Exception as exc: # noqa: BLE001 - any tbparse read failure is a malformed file
359 raise shape.MetricReaderError(
360 shape.ErrorCode.MALFORMED_FILE,
361 {"format": "tfevents"},
362 ) from exc
364 if frame is None or getattr(frame, "empty", True) or "tag" not in frame.columns:
365 raise shape.MetricReaderError(
366 shape.ErrorCode.FIELD_NOT_FOUND,
367 {"field": field},
368 )
369 matched = frame[frame["tag"] == field]
370 if matched.empty:
371 raise shape.MetricReaderError(
372 shape.ErrorCode.FIELD_NOT_FOUND,
373 {"field": field},
374 )
375 tag_values: list[object] = matched["value"].tolist()
376 return reduce_sequence(tag_values, mode)
379# Format name -> handler. Each handler shares the
380# ``(content_bytes, field, mode) -> float`` contract and either returns one
381# finite number or raises a MetricReaderError with a stable code. A format that
382# is not a key here is treated as unsupported by the calling tool.
383#
384# The columnar (``parquet``) and TensorBoard (``tfevents``) handlers are
385# registered into this same map; their handlers carry lazy third-party imports
386# and are added alongside these baseline entries.
387_HANDLERS: dict[str, Callable[..., float]] = {
388 "json": _handle_json,
389 "csv": _handle_csv,
390 "hf_trainer_state": _handle_hf,
391 "jsonl": _handle_jsonl,
392 "yaml": _handle_yaml,
393 "parquet": _handle_parquet,
394 "tfevents": _handle_tfevents,
395}