Coverage for mcp/iam.py: 90%

29 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2IAM role assumption for the GCO MCP server. 

3 

4When ``GCO_MCP_ROLE_ARN`` is set, assumes the dedicated MCP IAM role at 

5startup so every boto3 client downstream uses reduced-scope credentials. 

6""" 

7 

8import json 

9import logging 

10import os 

11from datetime import UTC, datetime 

12 

13audit_logger = logging.getLogger("gco.mcp.audit") 

14 

15 

16def assume_mcp_role() -> None: 

17 """Assume the dedicated MCP IAM role if ``GCO_MCP_ROLE_ARN`` is set. 

18 

19 When the environment variable is set, this function: 

20 

21 1. Uses ambient credentials (via a transient ``boto3.Session``) to call 

22 ``sts:AssumeRole`` with the configured role ARN. 

23 2. Builds a new ``boto3.Session`` from the temporary credentials and 

24 installs it as the default session (``boto3.setup_default_session``) 

25 so that every subsequent boto3 client in this process uses 

26 the least-privilege role automatically. 

27 3. Logs a sanitized audit entry (role ARN + expiration) via the audit 

28 logger. **Credentials themselves are never logged.** 

29 

30 When the environment variable is not set, a debug-level message is 

31 logged and the process continues with ambient credentials. 

32 """ 

33 role_arn = os.environ.get("GCO_MCP_ROLE_ARN", "").strip() 

34 if not role_arn: 

35 audit_logger.debug( 

36 json.dumps( 

37 { 

38 "event": "mcp.server.role_assumption.skipped", 

39 "reason": "GCO_MCP_ROLE_ARN not set; using ambient credentials", 

40 "timestamp": datetime.now(UTC).isoformat(), 

41 } 

42 ) 

43 ) 

44 return 

45 

46 try: 

47 import boto3 

48 except ImportError: 

49 audit_logger.error( 

50 json.dumps( 

51 { 

52 "event": "mcp.server.role_assumption.error", 

53 "role_arn": role_arn, 

54 "error": "boto3 is not installed; cannot assume role", 

55 "timestamp": datetime.now(UTC).isoformat(), 

56 } 

57 ) 

58 ) 

59 raise 

60 

61 session_name = os.environ.get("GCO_MCP_ROLE_SESSION_NAME", "gco-mcp-server") 

62 duration_seconds = int(os.environ.get("GCO_MCP_ROLE_DURATION_SECONDS", "3600")) 

63 

64 try: 

65 ambient_session = boto3.Session() 

66 sts = ambient_session.client("sts") 

67 response = sts.assume_role( 

68 RoleArn=role_arn, 

69 RoleSessionName=session_name, 

70 DurationSeconds=duration_seconds, 

71 ) 

72 except Exception as e: 

73 audit_logger.error( 

74 json.dumps( 

75 { 

76 "event": "mcp.server.role_assumption.error", 

77 "role_arn": role_arn, 

78 "error": str(e)[:200], 

79 "timestamp": datetime.now(UTC).isoformat(), 

80 } 

81 ) 

82 ) 

83 raise 

84 

85 credentials = response["Credentials"] 

86 expiration = credentials["Expiration"] 

87 expiration_iso = expiration.isoformat() if hasattr(expiration, "isoformat") else str(expiration) 

88 

89 boto3.setup_default_session( 

90 aws_access_key_id=credentials["AccessKeyId"], 

91 aws_secret_access_key=credentials["SecretAccessKey"], 

92 aws_session_token=credentials["SessionToken"], 

93 ) 

94 

95 audit_logger.info( 

96 json.dumps( 

97 { 

98 "event": "mcp.server.role_assumption.success", 

99 "role_arn": role_arn, 

100 "session_name": session_name, 

101 "expiration": expiration_iso, 

102 "timestamp": datetime.now(UTC).isoformat(), 

103 } 

104 ) 

105 )