Coverage for mcp/metric_readers/files.py: 68%

137 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Read a named field out of a metrics file and reduce it to one number. 

2 

3A job can persist its metrics in many shapes: a hand-written JSON or YAML 

4document, a CSV table, a Hugging Face ``Trainer`` state file, a stream of 

5JSON-per-line step records, or a columnar Parquet file. This module turns any 

6of those into a single finite number a threshold check can read. 

7 

8The work is split into two layers: 

9 

10* A small set of pure, per-format **handlers**. Each handler takes the raw file 

11 bytes, the caller's field name, and an aggregation mode, and returns one 

12 finite number — or raises :class:`~.shape.MetricReaderError` with a stable 

13 code describing why it could not. Handlers do no I/O of their own: the bytes 

14 are handed in already, so the same handler serves both the shared-storage 

15 reader and the local-filesystem reader. 

16* A :data:`_HANDLERS` dispatch map from a format name to its handler. The tool 

17 wrapper looks a format up here; a format with no entry is reported as 

18 unsupported rather than crashing. 

19 

20Field resolution differs by format. The document formats (``json``, ``yaml``) 

21resolve the field as a dot-path walked segment-by-segment through nested 

22objects. The tabular and record formats (``csv``, ``jsonl``, the Hugging Face 

23``log_history``) treat the field as a flat column or key name and gather one 

24value per row, line, or entry. A single resolved number is returned as-is; a 

25gathered sequence is collapsed with the chosen aggregation mode, which ignores 

26any non-numeric entries along the way. 

27 

28Every parsing or decoding failure becomes a malformed-file error; a field that 

29cannot be located becomes a field-not-found error; a value that is present but 

30is not a real number becomes a non-numeric error. Nothing escapes as an 

31unhandled exception. 

32""" 

33 

34from __future__ import annotations 

35 

36import csv 

37import io 

38import json 

39import os 

40import tempfile 

41from collections.abc import Callable 

42from typing import Literal, cast 

43 

44import yaml 

45 

46from . import shape 

47from .aggregate import reduce_sequence 

48 

49# The full set of file formats the reader understands. The document and record 

50# formats are handled here; the columnar and TensorBoard formats are dispatched 

51# through the same map and carry their own lazy-import handlers. 

52ReaderFormat = Literal[ 

53 "json", 

54 "csv", 

55 "hf_trainer_state", 

56 "jsonl", 

57 "yaml", 

58 "parquet", 

59 "tfevents", 

60] 

61 

62 

63def _decode(content: bytes, fmt: str) -> str: 

64 """Decode raw file bytes as UTF-8 text, or report a malformed file. 

65 

66 The text-oriented formats (``csv``, ``jsonl``) need to work over decoded 

67 lines. Bytes that are not valid UTF-8 are surfaced as a malformed-file 

68 error tagged with the format, rather than letting the decode error escape. 

69 """ 

70 try: 

71 return content.decode("utf-8") 

72 except UnicodeDecodeError as exc: 

73 raise shape.MetricReaderError( 

74 shape.ErrorCode.MALFORMED_FILE, 

75 {"format": fmt, "reason": "decode_error"}, 

76 ) from exc 

77 

78 

79def _maybe_number(raw: object) -> object: 

80 """Best-effort coerce a raw cell to a number, leaving non-numbers untouched. 

81 

82 Cells read from a CSV arrive as strings. A string that parses cleanly as an 

83 integer or a float is returned as that number; anything else (an empty 

84 cell, a label, ``None`` from a short row) is returned unchanged so the 

85 downstream numeric filter can drop it. Non-string inputs pass straight 

86 through. 

87 """ 

88 if not isinstance(raw, str): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 return raw 

90 text = raw.strip() 

91 try: 

92 return int(text) 

93 except ValueError: 

94 pass 

95 try: 

96 return float(text) 

97 except ValueError: 

98 return raw 

99 

100 

101def _describe_value(value: object) -> dict[str, object]: 

102 """Render a non-numeric value into JSON-safe error-detail fields. 

103 

104 Returns the offending value (kept verbatim when it is a simple scalar, 

105 otherwise its ``repr``) alongside its type name, so an operator can see 

106 both what was found and what kind of thing it was. 

107 """ 

108 if value is None or isinstance(value, str | int | float | bool): 

109 shown: object = value 

110 else: 

111 shown = repr(value) 

112 return {"value": shown, "value_type": type(value).__name__} 

113 

114 

115def _resolve_dot_path(obj: object, field: str) -> object: 

116 """Walk a dot-separated path through nested objects and return the leaf. 

117 

118 Each segment of ``field`` indexes one level deeper into a mapping. A 

119 segment that is missing, or a level that is not a mapping, means the field 

120 is absent and raises a field-not-found error. 

121 """ 

122 current: object = obj 

123 for segment in field.split("."): 

124 if isinstance(current, dict) and segment in current: 

125 current = current[segment] 

126 else: 

127 raise shape.MetricReaderError( 

128 shape.ErrorCode.FIELD_NOT_FOUND, 

129 {"field": field}, 

130 ) 

131 return current 

132 

133 

134def _reduce_resolved(value: object, field: str, mode: str) -> float: 

135 """Turn a resolved field value into one number. 

136 

137 A value that is already a real, finite number is returned directly. A list 

138 is collapsed with the aggregation mode. Anything else is present but not a 

139 number, which is a non-numeric error carrying the offending value. 

140 """ 

141 if shape.is_numeric_value(value): 

142 return cast(float, value) 

143 if isinstance(value, list): 143 ↛ 145line 143 didn't jump to line 145 because the condition on line 143 was always true

144 return reduce_sequence(value, mode) 

145 raise shape.MetricReaderError( 

146 shape.ErrorCode.NON_NUMERIC_VALUE, 

147 {"field": field, **_describe_value(value)}, 

148 ) 

149 

150 

151def _handle_json(content: bytes, field: str, mode: str) -> float: 

152 """Read a field from a plain JSON document. 

153 

154 The document is parsed, the field resolved by dot-path, and the leaf either 

155 returned directly (a single number) or reduced (a list). Bytes that do not 

156 parse as JSON are a malformed-file error. 

157 """ 

158 try: 

159 parsed: object = json.loads(content) 

160 except (ValueError, UnicodeDecodeError) as exc: 

161 raise shape.MetricReaderError( 

162 shape.ErrorCode.MALFORMED_FILE, 

163 {"format": "json"}, 

164 ) from exc 

165 return _reduce_resolved(_resolve_dot_path(parsed, field), field, mode) 

166 

167 

168def _handle_yaml(content: bytes, field: str, mode: str) -> float: 

169 """Read a field from a YAML document using the safe loader. 

170 

171 Mirrors the JSON handler: dot-path resolution, then a single number 

172 returned directly or a list reduced. A document the safe loader rejects is 

173 a malformed-file error. 

174 """ 

175 try: 

176 parsed: object = yaml.safe_load(content) 

177 except yaml.YAMLError as exc: 

178 raise shape.MetricReaderError( 

179 shape.ErrorCode.MALFORMED_FILE, 

180 {"format": "yaml"}, 

181 ) from exc 

182 return _reduce_resolved(_resolve_dot_path(parsed, field), field, mode) 

183 

184 

185def _handle_csv(content: bytes, field: str, mode: str) -> float: 

186 """Read one column from a CSV table and reduce it. 

187 

188 The first row is the header; ``field`` names one of its columns. Every data 

189 row's cell in that column is coerced toward a number and the resulting 

190 sequence is reduced with the aggregation mode (non-numeric cells are 

191 ignored). A column name absent from the header is a field-not-found error. 

192 """ 

193 text = _decode(content, "csv") 

194 reader = csv.DictReader(io.StringIO(text)) 

195 fieldnames = reader.fieldnames 

196 if not fieldnames or field not in fieldnames: 

197 raise shape.MetricReaderError( 

198 shape.ErrorCode.FIELD_NOT_FOUND, 

199 {"field": field}, 

200 ) 

201 candidates: list[object] = [_maybe_number(row.get(field)) for row in reader] 

202 return reduce_sequence(candidates, mode) 

203 

204 

205def _handle_jsonl(content: bytes, field: str, mode: str) -> float: 

206 """Read a field across a stream of one-JSON-object-per-line records. 

207 

208 Each non-blank line is parsed on its own; a line that is not valid JSON is 

209 skipped rather than failing the whole read. The named key is gathered from 

210 every object that carries it, and the gathered sequence is reduced. When no 

211 line yields a usable number — whether the key was never present or never 

212 numeric — the result is a no-numeric-value error. 

213 """ 

214 text = _decode(content, "jsonl") 

215 candidates: list[object] = [] 

216 for raw_line in text.splitlines(): 

217 stripped = raw_line.strip() 

218 if not stripped: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 continue 

220 try: 

221 obj: object = json.loads(stripped) 

222 except ValueError: 

223 continue 

224 if isinstance(obj, dict) and field in obj: 

225 candidates.append(obj[field]) 

226 try: 

227 return reduce_sequence(candidates, mode) 

228 except shape.MetricReaderError as exc: 

229 # A stream that carried the field nowhere collapses to the same 

230 # "no usable number" outcome as one where every value was non-numeric. 

231 if exc.code == shape.ErrorCode.EMPTY_SEQUENCE: 

232 raise shape.MetricReaderError( 

233 shape.ErrorCode.NO_NUMERIC_VALUE, 

234 {"field": field}, 

235 ) from exc 

236 raise 

237 

238 

239def _handle_hf(content: bytes, field: str, mode: str) -> float: 

240 """Read a scalar from a Hugging Face ``Trainer`` state file. 

241 

242 When the document carries a ``log_history`` list, the named field is 

243 gathered from every per-step entry that includes it and the sequence is 

244 reduced — this is the path for per-step scalars such as ``loss`` or 

245 ``eval_loss``. When ``log_history`` is absent (or the field never appears 

246 in it), the field is looked up as a top-level key and returned directly, as 

247 in an ``all_results.json``. A field found in neither place is a 

248 field-not-found error. 

249 """ 

250 try: 

251 parsed: object = json.loads(content) 

252 except (ValueError, UnicodeDecodeError) as exc: 

253 raise shape.MetricReaderError( 

254 shape.ErrorCode.MALFORMED_FILE, 

255 {"format": "hf_trainer_state"}, 

256 ) from exc 

257 

258 log_history: object = parsed.get("log_history") if isinstance(parsed, dict) else None 

259 if isinstance(log_history, list): 259 ↛ 266line 259 didn't jump to line 266 because the condition on line 259 was always true

260 candidates: list[object] = [ 

261 entry[field] for entry in log_history if isinstance(entry, dict) and field in entry 

262 ] 

263 if candidates: 

264 return reduce_sequence(candidates, mode) 

265 

266 if isinstance(parsed, dict) and field in parsed: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 value = parsed[field] 

268 if shape.is_numeric_value(value): 

269 return cast(float, value) 

270 raise shape.MetricReaderError( 

271 shape.ErrorCode.NON_NUMERIC_VALUE, 

272 {"field": field, **_describe_value(value)}, 

273 ) 

274 

275 raise shape.MetricReaderError( 

276 shape.ErrorCode.FIELD_NOT_FOUND, 

277 {"field": field}, 

278 ) 

279 

280 

281def _handle_parquet(content: bytes, field: str, mode: str) -> float: 

282 """Reduce one column of a columnar (Parquet) file to a single number. 

283 

284 The columnar libraries (``pandas`` + ``pyarrow``) ship only in the 

285 analytics extra, so they are imported lazily inside the handler. When 

286 either is missing the failure is reported as a 

287 :attr:`~.shape.ErrorCode.FORMAT_DEPENDENCY_UNAVAILABLE` envelope rather 

288 than letting the ``ImportError`` escape. 

289 

290 Once loaded, the file is parsed into a frame; bytes that do not parse as 

291 Parquet are a malformed-file error. The named column is gathered as native 

292 Python values — ``Series.tolist()`` converts NumPy scalars to ``int`` / 

293 ``float`` so the numeric guard recognises them — and reduced with the 

294 aggregation mode. A column absent from the schema is a field-not-found 

295 error. An aggregated result of exactly ``0`` is a valid number and is 

296 returned as-is; the reducer never treats it as "missing". 

297 """ 

298 try: 

299 import pandas as pd 

300 import pyarrow # noqa: F401 # read_parquet's engine; imported so a missing wheel surfaces here 

301 except ImportError as exc: 

302 raise shape.MetricReaderError( 

303 shape.ErrorCode.FORMAT_DEPENDENCY_UNAVAILABLE, 

304 {"format": "parquet", "dependency": "pandas+pyarrow"}, 

305 ) from exc 

306 

307 try: 

308 frame = pd.read_parquet(io.BytesIO(content)) 

309 except Exception as exc: # noqa: BLE001 - any pandas/pyarrow read failure is a malformed file 

310 raise shape.MetricReaderError( 

311 shape.ErrorCode.MALFORMED_FILE, 

312 {"format": "parquet"}, 

313 ) from exc 

314 

315 if field not in frame.columns: 

316 raise shape.MetricReaderError( 

317 shape.ErrorCode.FIELD_NOT_FOUND, 

318 {"field": field}, 

319 ) 

320 

321 column_values: list[object] = frame[field].tolist() 

322 return reduce_sequence(column_values, mode) 

323 

324 

325def _handle_tfevents(content: bytes, field: str, mode: str) -> float: 

326 """Reduce the scalar sequence for a TensorBoard tag to a single number. 

327 

328 TensorBoard ``tfevents`` reading is the optional/stretch format: its parser 

329 (``tbparse``, which pulls in ``tensorboard``) is not a baseline dependency, 

330 so it is imported lazily inside the handler. When the parser is not 

331 installed the failure is reported as a 

332 :attr:`~.shape.ErrorCode.FORMAT_DEPENDENCY_UNAVAILABLE` envelope rather than 

333 an import crash, so the baseline reader keeps working without the 

334 heavyweight dependency. 

335 

336 When the parser is present, the handed-in bytes are staged into a 

337 short-lived temporary event file (``tbparse`` reads from a path, not a 

338 buffer), the scalar rows for the requested ``field`` tag are gathered, and 

339 the sequence is reduced with the aggregation mode — the caller defaults this 

340 to ``last``, i.e. the latest scalar for the tag. A tag that carries 

341 no scalar rows is a field-not-found error, and bytes that do not parse as an 

342 event file are a malformed-file error. 

343 """ 

344 try: 

345 from tbparse import SummaryReader 

346 except ImportError as exc: 

347 raise shape.MetricReaderError( 

348 shape.ErrorCode.FORMAT_DEPENDENCY_UNAVAILABLE, 

349 {"format": "tfevents", "dependency": "tbparse/tensorboard"}, 

350 ) from exc 

351 

352 with tempfile.TemporaryDirectory() as tmp_dir: 

353 event_path = os.path.join(tmp_dir, "events.out.tfevents") 

354 with open(event_path, "wb") as handle: 

355 handle.write(content) 

356 try: 

357 frame = SummaryReader(event_path, pivot=False).scalars 

358 except Exception as exc: # noqa: BLE001 - any tbparse read failure is a malformed file 

359 raise shape.MetricReaderError( 

360 shape.ErrorCode.MALFORMED_FILE, 

361 {"format": "tfevents"}, 

362 ) from exc 

363 

364 if frame is None or getattr(frame, "empty", True) or "tag" not in frame.columns: 

365 raise shape.MetricReaderError( 

366 shape.ErrorCode.FIELD_NOT_FOUND, 

367 {"field": field}, 

368 ) 

369 matched = frame[frame["tag"] == field] 

370 if matched.empty: 

371 raise shape.MetricReaderError( 

372 shape.ErrorCode.FIELD_NOT_FOUND, 

373 {"field": field}, 

374 ) 

375 tag_values: list[object] = matched["value"].tolist() 

376 return reduce_sequence(tag_values, mode) 

377 

378 

379# Format name -> handler. Each handler shares the 

380# ``(content_bytes, field, mode) -> float`` contract and either returns one 

381# finite number or raises a MetricReaderError with a stable code. A format that 

382# is not a key here is treated as unsupported by the calling tool. 

383# 

384# The columnar (``parquet``) and TensorBoard (``tfevents``) handlers are 

385# registered into this same map; their handlers carry lazy third-party imports 

386# and are added alongside these baseline entries. 

387_HANDLERS: dict[str, Callable[..., float]] = { 

388 "json": _handle_json, 

389 "csv": _handle_csv, 

390 "hf_trainer_state": _handle_hf, 

391 "jsonl": _handle_jsonl, 

392 "yaml": _handle_yaml, 

393 "parquet": _handle_parquet, 

394 "tfevents": _handle_tfevents, 

395}