Coverage for mcp/iam.py: 90%
29 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2IAM role assumption for the GCO MCP server.
4When ``GCO_MCP_ROLE_ARN`` is set, assumes the dedicated MCP IAM role at
5startup so every boto3 client downstream uses reduced-scope credentials.
6"""
8import json
9import logging
10import os
11from datetime import UTC, datetime
13audit_logger = logging.getLogger("gco.mcp.audit")
16def assume_mcp_role() -> None:
17 """Assume the dedicated MCP IAM role if ``GCO_MCP_ROLE_ARN`` is set.
19 When the environment variable is set, this function:
21 1. Uses ambient credentials (via a transient ``boto3.Session``) to call
22 ``sts:AssumeRole`` with the configured role ARN.
23 2. Builds a new ``boto3.Session`` from the temporary credentials and
24 installs it as the default session (``boto3.setup_default_session``)
25 so that every subsequent boto3 client in this process uses
26 the least-privilege role automatically.
27 3. Logs a sanitized audit entry (role ARN + expiration) via the audit
28 logger. **Credentials themselves are never logged.**
30 When the environment variable is not set, a debug-level message is
31 logged and the process continues with ambient credentials.
32 """
33 role_arn = os.environ.get("GCO_MCP_ROLE_ARN", "").strip()
34 if not role_arn:
35 audit_logger.debug(
36 json.dumps(
37 {
38 "event": "mcp.server.role_assumption.skipped",
39 "reason": "GCO_MCP_ROLE_ARN not set; using ambient credentials",
40 "timestamp": datetime.now(UTC).isoformat(),
41 }
42 )
43 )
44 return
46 try:
47 import boto3
48 except ImportError:
49 audit_logger.error(
50 json.dumps(
51 {
52 "event": "mcp.server.role_assumption.error",
53 "role_arn": role_arn,
54 "error": "boto3 is not installed; cannot assume role",
55 "timestamp": datetime.now(UTC).isoformat(),
56 }
57 )
58 )
59 raise
61 session_name = os.environ.get("GCO_MCP_ROLE_SESSION_NAME", "gco-mcp-server")
62 duration_seconds = int(os.environ.get("GCO_MCP_ROLE_DURATION_SECONDS", "3600"))
64 try:
65 ambient_session = boto3.Session()
66 sts = ambient_session.client("sts")
67 response = sts.assume_role(
68 RoleArn=role_arn,
69 RoleSessionName=session_name,
70 DurationSeconds=duration_seconds,
71 )
72 except Exception as e:
73 audit_logger.error(
74 json.dumps(
75 {
76 "event": "mcp.server.role_assumption.error",
77 "role_arn": role_arn,
78 "error": str(e)[:200],
79 "timestamp": datetime.now(UTC).isoformat(),
80 }
81 )
82 )
83 raise
85 credentials = response["Credentials"]
86 expiration = credentials["Expiration"]
87 expiration_iso = expiration.isoformat() if hasattr(expiration, "isoformat") else str(expiration)
89 boto3.setup_default_session(
90 aws_access_key_id=credentials["AccessKeyId"],
91 aws_secret_access_key=credentials["SecretAccessKey"],
92 aws_session_token=credentials["SessionToken"],
93 )
95 audit_logger.info(
96 json.dumps(
97 {
98 "event": "mcp.server.role_assumption.success",
99 "role_arn": role_arn,
100 "session_name": session_name,
101 "expiration": expiration_iso,
102 "timestamp": datetime.now(UTC).isoformat(),
103 }
104 )
105 )