Coverage for mcp/mission/predicate.py: 92%

203 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Restricted AST evaluator for ``Criterion(kind="predicate")`` expressions. 

2 

3A Mission criterion of kind ``predicate`` carries a small Python expression 

4that runs against an ``Observation`` dict. Operator-supplied source must be 

5treated as untrusted: the same JSON that carries it travels across MCP, the 

6CLI, and disk. We parse the expression once at session start (so the 

7operator sees errors immediately, not on iteration N), validate it against 

8a tight allowlist, and cache the AST on the criterion so every later 

9evaluation reuses it without reparsing. 

10 

11The sandbox has two layers: 

12 

131. **Parse-time validation.** :func:`parse_predicate` parses the source in 

14 ``eval`` mode and walks the tree with :class:`_PredicateValidator`. The 

15 first disallowed construct raises :class:`PredicateRejected` and the 

16 evaluator is never reached. 

172. **Eval-time isolation.** :func:`evaluate_predicate` compiles the 

18 already-validated AST and calls :func:`eval` with an empty 

19 ``__builtins__`` plus an explicit safe-callable namespace. With 

20 ``__builtins__`` cleared, even a tree that smuggled past the validator 

21 could not look up ``__import__``, ``open``, ``compile``, etc. 

22 

23Allowed surface 

24--------------- 

25Names: ``obs`` (the dict argument), and the read-only callables ``len``, 

26``min``, ``max``, ``sum``, ``abs``, ``any``, ``all``, ``sorted``, plus 

27the four type coercions ``str``, ``int``, ``float``, ``bool``. 

28 

29Operators: arithmetic (``+ - * / // % ** @``), unary (``+ - not ~``), 

30comparisons (``< <= > >= == != is is_not in not_in``), boolean 

31(``and or``), and the ternary ``a if b else c``. 

32 

33Containers and collections: ``List``, ``Tuple``, ``Dict``, ``Set``, plus 

34``ListComp``, ``SetComp``, ``DictComp``, ``GeneratorExp`` (their iteration 

35targets must not shadow a name from the allowlist). 

36 

37Calls: bare-name calls to one of the twelve stdlib callables above, OR 

38read-only method calls — ``.get(key[, default])``, ``.keys()``, 

39``.values()``, ``.items()``, ``.lower()``, ``.upper()``, ``.strip()``. 

40Method calls accept any receiver the predicate could otherwise 

41produce: ``Name`` (``obs``), ``Subscript`` 

42(``obs['tool_results']``), or comprehension-bound names 

43(``r.get('_status')`` inside ``for r in obs['tool_results']``). Method 

44calls outside the allowlist (``.update``, ``.pop``, ``.count``, 

45``.append``, ``.startswith``, ...) are rejected with 

46``call_target_method_not_allowed``. 

47 

48Attribute access: only ``obs.<attr>`` (one level), and the attribute 

49name itself must not start with ``__``. Anything more elaborate 

50(chained attribute walks, attributes on calls or subscripts) is 

51rejected — predicates that need nested data should use subscripting. 

52 

53Subscripts: any ``value[...]`` chain whose ultimate base is an allowlisted 

54name. Rejection happens automatically because every nested ``Name`` lookup 

55is validated. 

56 

57f-strings: ``JoinedStr`` and ``FormattedValue`` recurse normally so any 

58embedded name lookup re-enters this same allowlist check. 

59 

60Rejected outright 

61----------------- 

62``Import`` / ``ImportFrom`` (also unreachable in ``eval`` mode), ``Lambda`` 

63(it would let a predicate ship hidden code), the walrus ``NamedExpr``, 

64``Yield`` / ``YieldFrom`` / ``Await`` and other async constructs, any 

65identifier or string constant that starts with ``__``, and every 

66``Name``/``Attribute``/``Call`` whose target is not on the allowlist. 

67""" 

68 

69from __future__ import annotations 

70 

71import ast 

72from typing import Any, Final, NoReturn 

73 

74# --------------------------------------------------------------------------- 

75# Allowlists 

76# --------------------------------------------------------------------------- 

77 

78_ALLOWED_CALLABLES: Final[frozenset[str]] = frozenset( 

79 {"len", "min", "max", "sum", "abs", "any", "all", "sorted", "str", "int", "float", "bool"} 

80) 

81"""Builtin callables a predicate may invoke. Pure, side-effect-free. 

82 

83The eight stdlib aggregate / comparison helpers (``len``, ``min``, 

84``max``, ``sum``, ``abs``, ``any``, ``all``, ``sorted``) plus four 

85type coercions (``str``, ``int``, ``float``, ``bool``). The 

86coercions are useful for normalising values before comparison — 

87``str(r.get('count')) == '0'`` and ``bool(obs['errors'])`` are 

88common idioms — and none of them can escape the eval-time sandbox 

89(empty ``__builtins__``, no ``__import__`` / ``open`` / ``getattr`` 

90in scope) regardless of input. 

91""" 

92 

93_ALLOWED_METHOD_CALLS: Final[frozenset[str]] = frozenset( 

94 {"get", "keys", "values", "items", "lower", "upper", "strip"} 

95) 

96"""Read-only methods a predicate may invoke on any value. 

97 

98Models trained on Python idioms gravitate toward ``r.get('_status')``, 

99``r.items()``, and ``str(...).lower()`` for case-insensitive substring 

100search. The seven methods listed here are all pure read-only 

101accessors / transformations: 

102 

103* ``dict.get(key[, default])`` returns the value at ``key`` (or 

104 ``default``); identical to subscript except it tolerates missing 

105 keys without raising. 

106* ``dict.keys()`` / ``dict.values()`` / ``dict.items()`` return views 

107 that the comprehension protocol then iterates. 

108* ``str.lower()`` / ``str.upper()`` return a new string with 

109 case-folded contents; common in case-insensitive substring 

110 search like ``'foo' in str(x).lower()``. 

111* ``str.strip()`` returns a new string with leading and trailing 

112 whitespace removed; common in normalising values before 

113 comparison. 

114 

115None of the seven can mutate state, escape ``__builtins__``, or reach a 

116callable that we did not already opt into through the eval-time 

117sandbox (``__builtins__`` is empty; ``eval`` / ``compile`` / 

118``__import__`` / ``getattr`` / ``setattr`` / ``open`` are all 

119unreachable). Allowing them lets the model write the natural 

120expression ``any('inference' in str(r).lower() for r in obs['tool_results'])`` 

121instead of being forced into the more verbose subscript-only equivalent 

122that the model rarely produces unprompted. 

123 

124Method-call gating still applies in two places: 

125 

1261. The attribute *name* must be in this set. ``r.update(...)``, 

127 ``r.pop(...)``, ``r.setdefault(...)``, etc. raise 

128 ``call_target_method_not_allowed`` even though they would otherwise 

129 parse as ``Attribute -> Call``. 

1302. Method calls are only permitted on values produced by the 

131 predicate's allowed surface — ``Name``, ``Subscript``, comprehension 

132 targets. A method call on a literal expression (``[1, 2].count(1)``) 

133 parses but the call goes through ``visit_Call`` → still rejected 

134 because the receiver is not on the data namespace. See 

135 ``visit_Call`` for the full set of acceptable receivers. 

136""" 

137 

138_ALLOWED_DATA_NAMES: Final[frozenset[str]] = frozenset({"obs"}) 

139"""Top-level data names the predicate may read.""" 

140 

141_ALLOWED_NAMES: Final[frozenset[str]] = _ALLOWED_DATA_NAMES | _ALLOWED_CALLABLES 

142"""Every globally-allowed identifier the predicate may reference.""" 

143 

144_ALLOWED_BIN_OPS: Final[tuple[type[ast.operator], ...]] = ( 

145 ast.Add, 

146 ast.Sub, 

147 ast.Mult, 

148 ast.Div, 

149 ast.FloorDiv, 

150 ast.Mod, 

151 ast.Pow, 

152 ast.MatMult, 

153) 

154 

155_ALLOWED_UNARY_OPS: Final[tuple[type[ast.unaryop], ...]] = ( 

156 ast.UAdd, 

157 ast.USub, 

158 ast.Not, 

159 ast.Invert, 

160) 

161 

162_ALLOWED_COMPARE_OPS: Final[tuple[type[ast.cmpop], ...]] = ( 

163 ast.Eq, 

164 ast.NotEq, 

165 ast.Lt, 

166 ast.LtE, 

167 ast.Gt, 

168 ast.GtE, 

169 ast.Is, 

170 ast.IsNot, 

171 ast.In, 

172 ast.NotIn, 

173) 

174 

175_ALLOWED_BOOL_OPS: Final[tuple[type[ast.boolop], ...]] = (ast.And, ast.Or) 

176 

177 

178# --------------------------------------------------------------------------- 

179# Exception 

180# --------------------------------------------------------------------------- 

181 

182 

183class PredicateRejected(Exception): 

184 """Raised when a predicate source contains a disallowed construct. 

185 

186 The :attr:`reason` field is a short stable token (e.g. 

187 ``"forbidden_call"``) so callers can render structured errors. The 

188 :attr:`failing_node` field is the ``ast`` node that triggered the 

189 rejection; it is ``None`` only when the source failed to parse at all. 

190 """ 

191 

192 def __init__( 

193 self, 

194 reason: str, 

195 *, 

196 failing_node: ast.AST | None = None, 

197 message: str | None = None, 

198 ) -> None: 

199 self.reason: str = reason 

200 self.failing_node: ast.AST | None = failing_node 

201 self.lineno: int | None = ( 

202 getattr(failing_node, "lineno", None) if failing_node is not None else None 

203 ) 

204 self.col_offset: int | None = ( 

205 getattr(failing_node, "col_offset", None) if failing_node is not None else None 

206 ) 

207 rendered = message if message is not None else reason 

208 if self.lineno is not None: 

209 rendered = f"{rendered} (line {self.lineno}, col {self.col_offset})" 

210 super().__init__(rendered) 

211 

212 

213# --------------------------------------------------------------------------- 

214# Validator 

215# --------------------------------------------------------------------------- 

216 

217 

218class _PredicateValidator(ast.NodeVisitor): 

219 """Walk a predicate AST and reject any construct outside the allowlist. 

220 

221 The validator tracks per-scope local names introduced by comprehensions 

222 so a tight expression like ``all(x > 0 for x in obs["xs"])`` works 

223 while the comprehension target ``x`` cannot shadow ``obs`` or any of 

224 the allowed callables. 

225 """ 

226 

227 def __init__(self) -> None: 

228 # Stack of frozensets of locally-bound names. The base scope is 

229 # empty; comprehensions push a frame containing their targets. 

230 self._scopes: list[frozenset[str]] = [frozenset()] 

231 

232 # ---- helpers ------------------------------------------------------- 

233 

234 def _current_locals(self) -> frozenset[str]: 

235 return self._scopes[-1] 

236 

237 def _name_is_visible(self, name: str) -> bool: 

238 return name in _ALLOWED_NAMES or name in self._current_locals() 

239 

240 @staticmethod 

241 def _is_dunder(name: str) -> bool: 

242 return name.startswith("__") 

243 

244 @staticmethod 

245 def _reject(reason: str, node: ast.AST, message: str | None = None) -> NoReturn: 

246 raise PredicateRejected(reason, failing_node=node, message=message) 

247 

248 def _push_scope(self, locals_: frozenset[str]) -> None: 

249 self._scopes.append(self._current_locals() | locals_) 

250 

251 def _pop_scope(self) -> None: 

252 self._scopes.pop() 

253 

254 def _collect_target_names(self, target: ast.AST) -> list[ast.Name]: 

255 """Flatten a comprehension/assignment target into Name nodes. 

256 

257 Tuples and lists nest (``for (a, b) in pairs``); Starred wraps 

258 (``for *xs, last in rows``). Anything else under a target is a 

259 validation error reported by the caller. 

260 """ 

261 if isinstance(target, ast.Name): 

262 return [target] 

263 if isinstance(target, (ast.Tuple, ast.List)): 

264 collected: list[ast.Name] = [] 

265 for elt in target.elts: 

266 collected.extend(self._collect_target_names(elt)) 

267 return collected 

268 if isinstance(target, ast.Starred): 268 ↛ 271line 268 didn't jump to line 271 because the condition on line 268 was always true

269 return self._collect_target_names(target.value) 

270 # Anything else (Subscript, Attribute, ...) as a target is invalid. 

271 self._reject( 

272 "invalid_comprehension_target", 

273 target, 

274 "comprehension target must be a plain identifier", 

275 ) 

276 return [] # unreachable; _reject raises 

277 

278 # ---- top-level entry ---------------------------------------------- 

279 

280 def visit_Expression(self, node: ast.Expression) -> None: 

281 # ast.parse(..., mode="eval") guarantees a single Expression root; 

282 # walk its body. 

283 self.visit(node.body) 

284 

285 # ---- catch-all ----------------------------------------------------- 

286 

287 def generic_visit(self, node: ast.AST) -> None: 

288 # Default rejection: every node type we accept has a dedicated 

289 # ``visit_*`` method below. If we reach generic_visit it means the 

290 # source contained something we did not explicitly opt into 

291 # (Lambda, NamedExpr, Yield, async constructs, FunctionDef, etc.). 

292 self._reject( 

293 "forbidden_node", 

294 node, 

295 f"{type(node).__name__} is not allowed in a predicate", 

296 ) 

297 

298 # ---- leaves -------------------------------------------------------- 

299 

300 def visit_Constant(self, node: ast.Constant) -> None: 

301 # Reject dunder strings even when used as plain data. We never 

302 # need them in a numeric/boolean/string literal, and forbidding 

303 # them closes off the most common escape patterns 

304 # (``getattr(x, "__class__")``, ``obs["__import__"]``, etc.) even 

305 # if a future change accidentally widens the allowlist. 

306 if isinstance(node.value, str) and self._is_dunder(node.value): 

307 self._reject( 

308 "dunder_string", 

309 node, 

310 "string constants starting with '__' are not allowed", 

311 ) 

312 # Other constants (int, float, bool, None, bytes, complex, str) 

313 # are inert. 

314 

315 def visit_Name(self, node: ast.Name) -> None: 

316 if self._is_dunder(node.id): 

317 self._reject( 

318 "dunder_name", 

319 node, 

320 f"identifier '{node.id}' starts with '__'", 

321 ) 

322 if not self._name_is_visible(node.id): 

323 self._reject( 

324 "name_not_allowed", 

325 node, 

326 f"name '{node.id}' is not in the predicate allowlist", 

327 ) 

328 

329 # ---- containers ---------------------------------------------------- 

330 

331 def visit_List(self, node: ast.List) -> None: 

332 for elt in node.elts: 

333 self.visit(elt) 

334 

335 def visit_Tuple(self, node: ast.Tuple) -> None: 

336 for elt in node.elts: 

337 self.visit(elt) 

338 

339 def visit_Set(self, node: ast.Set) -> None: 

340 for elt in node.elts: 

341 self.visit(elt) 

342 

343 def visit_Dict(self, node: ast.Dict) -> None: 

344 for key in node.keys: 

345 if key is not None: 

346 self.visit(key) 

347 else: 

348 # ``{**other}`` unpacking would let an attacker splat 

349 # arbitrary mappings; reject to keep the surface tight. 

350 self._reject( 

351 "dict_unpacking", 

352 node, 

353 "dict unpacking is not allowed in a predicate", 

354 ) 

355 for value in node.values: 

356 self.visit(value) 

357 

358 def visit_Starred(self, node: ast.Starred) -> None: 

359 # ``[*xs]`` / ``f(*xs)`` — recurse into the inner expression so 

360 # the nested Name still hits the allowlist check. 

361 self.visit(node.value) 

362 

363 # ---- operators ----------------------------------------------------- 

364 

365 def visit_BinOp(self, node: ast.BinOp) -> None: 

366 if not isinstance(node.op, _ALLOWED_BIN_OPS): 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true

367 self._reject( 

368 "binop_not_allowed", 

369 node, 

370 f"binary operator {type(node.op).__name__} is not allowed", 

371 ) 

372 self.visit(node.left) 

373 self.visit(node.right) 

374 

375 def visit_UnaryOp(self, node: ast.UnaryOp) -> None: 

376 if not isinstance(node.op, _ALLOWED_UNARY_OPS): 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 self._reject( 

378 "unaryop_not_allowed", 

379 node, 

380 f"unary operator {type(node.op).__name__} is not allowed", 

381 ) 

382 self.visit(node.operand) 

383 

384 def visit_BoolOp(self, node: ast.BoolOp) -> None: 

385 if not isinstance(node.op, _ALLOWED_BOOL_OPS): 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 self._reject( 

387 "boolop_not_allowed", 

388 node, 

389 f"bool operator {type(node.op).__name__} is not allowed", 

390 ) 

391 for value in node.values: 

392 self.visit(value) 

393 

394 def visit_Compare(self, node: ast.Compare) -> None: 

395 for op in node.ops: 

396 if not isinstance(op, _ALLOWED_COMPARE_OPS): 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true

397 self._reject( 

398 "compareop_not_allowed", 

399 node, 

400 f"comparison operator {type(op).__name__} is not allowed", 

401 ) 

402 self.visit(node.left) 

403 for comparator in node.comparators: 

404 self.visit(comparator) 

405 

406 def visit_IfExp(self, node: ast.IfExp) -> None: 

407 self.visit(node.test) 

408 self.visit(node.body) 

409 self.visit(node.orelse) 

410 

411 # ---- attribute and subscript -------------------------------------- 

412 

413 def visit_Attribute(self, node: ast.Attribute) -> None: 

414 # Three shapes are allowed: 

415 # 

416 # 1. ``obs.<attr>`` — single-level read off the data dict. 

417 # 2. ``<inner>.<method>`` *only when* visited from 

418 # ``visit_Call`` and ``<method>`` is in 

419 # ``_ALLOWED_METHOD_CALLS``. ``visit_Call`` handles that 

420 # case by validating the inner expression itself rather 

421 # than recursing into ``visit_Attribute``, so by the time a 

422 # bare ``Attribute`` lands here we know it is *not* the 

423 # receiver of an allowed method call. 

424 # 3. Nothing else: chained walks (``obs.a.b``), attributes on 

425 # calls, and attributes on subscripts are all rejected. 

426 if self._is_dunder(node.attr): 

427 self._reject( 

428 "dunder_attribute", 

429 node, 

430 f"attribute '{node.attr}' starts with '__'", 

431 ) 

432 if not (isinstance(node.value, ast.Name) and node.value.id in _ALLOWED_DATA_NAMES): 432 ↛ 441line 432 didn't jump to line 441 because the condition on line 432 was always true

433 self._reject( 

434 "attribute_target_not_allowed", 

435 node, 

436 "attribute access is only allowed on 'obs' " 

437 "(or as a read-only method call on a dict/list)", 

438 ) 

439 # The base Name is in _ALLOWED_DATA_NAMES, so we know it passes 

440 # the visit_Name check; visit it anyway to stay regular. 

441 self.visit(node.value) 

442 

443 def visit_Subscript(self, node: ast.Subscript) -> None: 

444 # No special restriction beyond "the base Name must be on the 

445 # allowlist", which falls out of recursing into ``node.value``. 

446 # ``node.slice`` may itself contain Names and Calls; recurse so 

447 # they hit the same allowlist gate. 

448 self.visit(node.value) 

449 self.visit(node.slice) 

450 

451 def visit_Slice(self, node: ast.Slice) -> None: 

452 if node.lower is not None: 452 ↛ 454line 452 didn't jump to line 454 because the condition on line 452 was always true

453 self.visit(node.lower) 

454 if node.upper is not None: 454 ↛ 456line 454 didn't jump to line 456 because the condition on line 454 was always true

455 self.visit(node.upper) 

456 if node.step is not None: 456 ↛ exitline 456 didn't return from function 'visit_Slice' because the condition on line 456 was always true

457 self.visit(node.step) 

458 

459 # ---- calls --------------------------------------------------------- 

460 

461 def visit_Call(self, node: ast.Call) -> None: 

462 # Two callable shapes are allowed: 

463 # 

464 # 1. Bare-name calls to one of ``_ALLOWED_CALLABLES`` — 

465 # ``len(x)``, ``any(...)``, ``sorted(xs)``. The validator 

466 # enforces the name appears on the allowlist. 

467 # 2. Method calls of the form ``<expr>.<method>(...)`` where 

468 # ``<method>`` is in ``_ALLOWED_METHOD_CALLS`` (the four 

469 # pure dict/list read accessors). The receiver expression 

470 # is validated through the normal visit chain so a method 

471 # call on something the predicate cannot otherwise see 

472 # (e.g. ``getattr(x, 'y').get(...)``) is rejected at the 

473 # receiver-validation step before the method allowlist is 

474 # even consulted. 

475 # 

476 # Anything else — subscript-then-call (``builtins["eval"]()``), 

477 # call-then-call (``factory()()``), method calls to non- 

478 # allowlisted attribute names — is rejected. 

479 if isinstance(node.func, ast.Attribute): 

480 if self._is_dunder(node.func.attr): 

481 self._reject( 

482 "dunder_attribute", 

483 node.func, 

484 f"attribute '{node.func.attr}' starts with '__'", 

485 ) 

486 if node.func.attr not in _ALLOWED_METHOD_CALLS: 

487 self._reject( 

488 "call_target_method_not_allowed", 

489 node, 

490 f"method '.{node.func.attr}()' is not allowed; " 

491 f"the read-only method allowlist is " 

492 f"{sorted(_ALLOWED_METHOD_CALLS)}", 

493 ) 

494 # Validate the receiver itself. Recursing here (rather 

495 # than into ``visit_Attribute``) bypasses the 

496 # ``visit_Attribute`` rule that only ``obs.<attr>`` 

497 # is allowed — but only because the *method name* is on 

498 # the explicit pure-accessor allowlist above. Any other 

499 # attribute name still falls through ``visit_Attribute``'s 

500 # tighter rules. 

501 self.visit(node.func.value) 

502 elif isinstance(node.func, ast.Name): 

503 if node.func.id not in _ALLOWED_CALLABLES: 

504 self._reject( 

505 "call_target_not_allowed", 

506 node, 

507 f"call to '{node.func.id}' is not allowed", 

508 ) 

509 else: 

510 # Subscript-then-call, call-then-call, etc. — reject. 

511 self._reject( 

512 "call_target_not_name", 

513 node, 

514 "predicate calls must target a bare callable name or a read-only dict/list method", 

515 ) 

516 for arg in node.args: 

517 self.visit(arg) 

518 for kw in node.keywords: 518 ↛ 521line 518 didn't jump to line 521 because the loop on line 518 never started

519 # ``**kwargs`` shows up as a keyword with arg=None; allow the 

520 # value but recurse so its content is still validated. 

521 self.visit(kw.value) 

522 

523 # ---- f-strings ----------------------------------------------------- 

524 

525 def visit_JoinedStr(self, node: ast.JoinedStr) -> None: 

526 for value in node.values: 

527 self.visit(value) 

528 

529 def visit_FormattedValue(self, node: ast.FormattedValue) -> None: 

530 self.visit(node.value) 

531 if node.format_spec is not None: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true

532 self.visit(node.format_spec) 

533 

534 # ---- comprehensions ----------------------------------------------- 

535 

536 def _validate_comprehensions(self, generators: list[ast.comprehension]) -> frozenset[str]: 

537 """Walk comprehension generators and return their target names. 

538 

539 Each generator's ``iter`` is validated against the *outer* scope 

540 (it cannot reference the targets of its own generator), then the 

541 targets are added to the local set so the next generator's 

542 ``ifs`` and any later ``iter`` can see them. 

543 """ 

544 accumulated: set[str] = set() 

545 for gen in generators: 

546 if gen.is_async: 

547 self._reject( 

548 "async_comprehension", 

549 gen.iter, 

550 "async comprehensions are not allowed", 

551 ) 

552 # Validate the iterable in the scope visible *before* this 

553 # generator's targets are bound. 

554 self.visit(gen.iter) 

555 target_names = self._collect_target_names(gen.target) 

556 for name_node in target_names: 

557 if self._is_dunder(name_node.id): 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 self._reject( 

559 "dunder_comprehension_target", 

560 name_node, 

561 f"comprehension target '{name_node.id}' starts with '__'", 

562 ) 

563 if name_node.id in _ALLOWED_NAMES: 

564 self._reject( 

565 "comprehension_target_shadows_allowlist", 

566 name_node, 

567 f"comprehension target '{name_node.id}' shadows an allowlisted name", 

568 ) 

569 accumulated.add(name_node.id) 

570 # Subsequent ``ifs`` and any later generator may reference 

571 # these targets; push them now. 

572 self._push_scope(frozenset(accumulated)) 

573 try: 

574 for if_clause in gen.ifs: 

575 self.visit(if_clause) 

576 finally: 

577 self._pop_scope() 

578 return frozenset(accumulated) 

579 

580 def _visit_comprehension_like( 

581 self, 

582 node: ast.ListComp | ast.SetComp | ast.GeneratorExp, 

583 ) -> None: 

584 locals_ = self._validate_comprehensions(node.generators) 

585 self._push_scope(locals_) 

586 try: 

587 self.visit(node.elt) 

588 finally: 

589 self._pop_scope() 

590 

591 def visit_ListComp(self, node: ast.ListComp) -> None: 

592 self._visit_comprehension_like(node) 

593 

594 def visit_SetComp(self, node: ast.SetComp) -> None: 

595 self._visit_comprehension_like(node) 

596 

597 def visit_GeneratorExp(self, node: ast.GeneratorExp) -> None: 

598 self._visit_comprehension_like(node) 

599 

600 def visit_DictComp(self, node: ast.DictComp) -> None: 

601 locals_ = self._validate_comprehensions(node.generators) 

602 self._push_scope(locals_) 

603 try: 

604 self.visit(node.key) 

605 self.visit(node.value) 

606 finally: 

607 self._pop_scope() 

608 

609 

610# --------------------------------------------------------------------------- 

611# Public API 

612# --------------------------------------------------------------------------- 

613 

614 

615def parse_predicate(src: str) -> ast.Expression: 

616 """Parse and validate a predicate source string. 

617 

618 Returns the parsed :class:`ast.Expression` so callers can cache it and 

619 feed it to :func:`evaluate_predicate` without reparsing. Raises 

620 :class:`PredicateRejected` if the source fails to parse or contains 

621 any disallowed construct. 

622 """ 

623 if not isinstance(src, str): 

624 raise PredicateRejected( 

625 "not_a_string", 

626 message="predicate source must be a str", 

627 ) 

628 try: 

629 parsed = ast.parse(src, mode="eval") 

630 except SyntaxError as exc: 

631 rejection = PredicateRejected( 

632 "syntax_error", 

633 message=f"could not parse predicate: {exc.msg}", 

634 ) 

635 rejection.lineno = exc.lineno 

636 rejection.col_offset = exc.offset 

637 raise rejection from exc 

638 _PredicateValidator().visit(parsed) 

639 return parsed 

640 

641 

642# Pre-built sandbox namespace. The double-empty ``__builtins__`` plus an 

643# explicit safe-callable namespace is the established sandbox pattern: it 

644# blocks lookup of every dangerous builtin (``__import__``, ``open``, 

645# ``eval``, ``compile``, ``exec``, ``getattr``, ...) even if the validator 

646# were ever bypassed by a future AST node we forgot about. 

647_SAFE_GLOBALS: Final[dict[str, Any]] = {"__builtins__": {}} 

648_SAFE_CALLABLES: Final[dict[str, Any]] = { 

649 "len": len, 

650 "min": min, 

651 "max": max, 

652 "sum": sum, 

653 "abs": abs, 

654 "any": any, 

655 "all": all, 

656 "sorted": sorted, 

657 # Type coercions — pure, side-effect-free transforms used in 

658 # idioms like ``str(r.get('count')) == '0'``. None of them can 

659 # escape the empty-``__builtins__`` namespace regardless of input. 

660 "str": str, 

661 "int": int, 

662 "float": float, 

663 "bool": bool, 

664} 

665 

666 

667def evaluate_predicate(parsed: ast.Expression, obs: dict[str, Any]) -> Any: 

668 """Evaluate an already-validated predicate AST against ``obs``. 

669 

670 The caller is responsible for passing only an :class:`ast.Expression` 

671 that came from :func:`parse_predicate`; the function does not 

672 re-validate. Compilation is per-call to keep the function pure (the 

673 AST itself is the cached unit of work). Returns whatever the 

674 expression evaluates to — typically a ``bool``, but the criterion 

675 layer handles other values. 

676 """ 

677 code = compile(parsed, "<predicate>", "eval") 

678 # Names referenced from inside a comprehension or generator 

679 # expression resolve through the enclosing function's *globals* 

680 # at runtime, not the ``locals`` mapping passed to ``eval`` — 

681 # because each comprehension compiles to its own implicit 

682 # function scope. So validated free names (``obs`` plus the 

683 # safe callables) must live in the globals dict to remain 

684 # visible from inside ``any(str(r) ... for r in obs[...])`` 

685 # idioms; an earlier "locals-only" arrangement raised 

686 # ``NameError: name 'str' is not defined`` at runtime even 

687 # though parse_predicate had accepted the source. The empty 

688 # ``__builtins__`` still keeps the sandbox tight: every name 

689 # the body can reach is one we put in the globals dict 

690 # ourselves. 

691 eval_globals: dict[str, Any] = {**_SAFE_GLOBALS, "obs": obs, **_SAFE_CALLABLES} 

692 return eval( # nosemgrep: python.lang.security.audit.eval-detected.eval-detected 

693 code, eval_globals, {} 

694 ) # noqa: S307