Coverage for mcp/tools/storage.py: 94%
29 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""File storage MCP tools."""
3import asyncio
5import cli_runner
6from audit import audit_logged
7from server import mcp
10@mcp.tool(tags={"safe", "storage"})
11@audit_logged
12def list_storage_contents(region: str, path: str = "/") -> str:
13 """List contents of shared EFS storage.
15 Args:
16 region: AWS region.
17 path: Directory path to list (default: root).
18 """
19 args = ["files", "ls", "-r", region]
20 if path != "/":
21 args.append(path)
22 return cli_runner._run_cli(*args)
25@mcp.tool(tags={"safe", "storage"})
26@audit_logged
27def list_file_systems(region: str | None = None) -> str:
28 """List EFS and FSx file systems.
30 Args:
31 region: Specific region, or omit for all.
32 """
33 args = ["files", "list"]
34 if region: 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true
35 args += ["-r", region]
36 return cli_runner._run_cli(*args)
39# =============================================================================
40# Read-only inspection tools (async)
41# =============================================================================
44@mcp.tool(tags={"safe", "files"})
45@audit_logged
46async def files_get(path: str, region: str) -> str:
47 """`gco files get` — fetch a single file from EFS.
49 Args:
50 path: File path relative to the storage root.
51 region: AWS region.
52 """
53 return await asyncio.to_thread(cli_runner._run_cli, "files", "get", path, "-r", region)
56@mcp.tool(tags={"safe", "files"})
57@audit_logged
58async def files_access_points(region: str | None = None) -> str:
59 """`gco files access-points` — list EFS access points.
61 Args:
62 region: AWS region.
63 """
64 args = ["files", "access-points"]
65 if region:
66 args += ["-r", region]
67 return await asyncio.to_thread(cli_runner._run_cli, *args)