Coverage for cli/analytics_user_mgmt.py: 93%

184 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2User management helpers for the GCO analytics environment. 

3 

4This module holds the pieces of the ``gco analytics`` CLI that are worth 

5exercising in isolation from Click: 

6 

7* :func:`discover_cognito_pool_id` / :func:`discover_cognito_client_id` 

8 / :func:`discover_api_endpoint` — single-stack CloudFormation output 

9 lookups used by every sub-command to avoid forcing operators to hand a 

10 pool id / api url on the command line. 

11* :func:`srp_authenticate` — Cognito SRP authentication via the 

12 ``pycognito`` library, used by ``gco analytics studio login``. 

13""" 

14 

15from __future__ import annotations 

16 

17import logging 

18from typing import Any 

19 

20# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit 

21# Flowchart(s) generated from this file: 

22# * ``srp_authenticate`` -> ``diagrams/code_diagrams/cli/analytics_user_mgmt.srp_authenticate.html`` 

23# (PNG: ``diagrams/code_diagrams/cli/analytics_user_mgmt.srp_authenticate.png``) 

24# * ``fetch_studio_url`` -> ``diagrams/code_diagrams/cli/analytics_user_mgmt.fetch_studio_url.html`` 

25# (PNG: ``diagrams/code_diagrams/cli/analytics_user_mgmt.fetch_studio_url.png``) 

26# Regenerate with ``python diagrams/code_diagrams/generate.py``. 

27# <pyflowchart-code-diagram> END 

28 

29 

30logger = logging.getLogger(__name__) 

31 

32# --------------------------------------------------------------------------- 

33# CloudFormation output discovery 

34# --------------------------------------------------------------------------- 

35 

36 

37def _describe_stack_outputs(region: str, stack_name: str) -> list[dict[str, str]] | None: 

38 """Return the ``Outputs`` list for ``stack_name`` in ``region``. 

39 

40 Returns ``None`` if the stack does not exist or the call fails. 

41 Any non-transient error surfaces as ``None`` — callers raise the 

42 user-facing error message themselves so the error copy can mention 

43 ``gco analytics enable`` / ``gco stacks deploy gco-analytics``. 

44 """ 

45 import boto3 

46 from botocore.exceptions import BotoCoreError, ClientError 

47 

48 try: 

49 cfn = boto3.client("cloudformation", region_name=region) 

50 response = cfn.describe_stacks(StackName=stack_name) 

51 except (ClientError, BotoCoreError) as exc: 

52 logger.debug("describe_stacks(%s) in %s failed: %s", stack_name, region, exc) 

53 return None 

54 

55 stacks = response.get("Stacks", []) 

56 if not stacks: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 return None 

58 outputs = stacks[0].get("Outputs", []) 

59 return list(outputs) if isinstance(outputs, list) else [] 

60 

61 

62def _find_output(outputs: list[dict[str, str]], key: str) -> str | None: 

63 """Return the ``OutputValue`` for ``key`` in a CloudFormation outputs list.""" 

64 for output in outputs: 

65 if output.get("OutputKey") == key: 

66 value = output.get("OutputValue") 

67 return value if isinstance(value, str) else None 

68 return None 

69 

70 

71def discover_cognito_pool_id(region: str, project_name: str = "gco") -> str | None: 

72 """Return the Cognito user pool id published by ``gco-analytics``. 

73 

74 Returns ``None`` when the ``gco-analytics`` stack does not exist or 

75 when the stack exists but the ``CognitoUserPoolId`` output is 

76 missing. The CLI callers translate ``None`` into the documented 

77 "gco-analytics stack not deployed" error message. 

78 """ 

79 stack_name = f"{project_name}-analytics" 

80 outputs = _describe_stack_outputs(region, stack_name) 

81 if outputs is None: 

82 return None 

83 return _find_output(outputs, "CognitoUserPoolId") 

84 

85 

86def discover_cognito_client_id(region: str, project_name: str = "gco") -> str | None: 

87 """Return the Cognito SRP client id published by ``gco-analytics``. 

88 

89 Looked up on the same stack as :func:`discover_cognito_pool_id`. 

90 Returns ``None`` when the stack or output is missing. 

91 """ 

92 stack_name = f"{project_name}-analytics" 

93 outputs = _describe_stack_outputs(region, stack_name) 

94 if outputs is None: 

95 return None 

96 return _find_output(outputs, "CognitoUserPoolClientId") 

97 

98 

99def discover_api_endpoint(region: str, project_name: str = "gco") -> str | None: 

100 """Return the API Gateway base URL published by ``gco-api-gateway``. 

101 

102 The returned value is the ``ApiEndpoint`` CloudFormation output, 

103 typically of the form ``https://<id>.execute-api.<region>.amazonaws.com/prod/``. 

104 Returns ``None`` when the stack or output is missing. 

105 """ 

106 stack_name = f"{project_name}-api-gateway" 

107 outputs = _describe_stack_outputs(region, stack_name) 

108 if outputs is None: 

109 return None 

110 return _find_output(outputs, "ApiEndpoint") 

111 

112 

113# --------------------------------------------------------------------------- 

114# Cognito authentication 

115# --------------------------------------------------------------------------- 

116 

117 

118def srp_authenticate( 

119 pool_id: str, 

120 client_id: str, 

121 username: str, 

122 password: str, 

123 region: str, 

124) -> dict[str, str]: 

125 """Authenticate a Cognito user via the ADMIN_USER_PASSWORD_AUTH flow. 

126 

127 Uses ``admin_initiate_auth`` which sends the password over TLS 

128 directly (no client-side SRP math). This requires the user pool 

129 client to have ``ALLOW_ADMIN_USER_PASSWORD_AUTH`` enabled and the 

130 caller to have ``cognito-idp:AdminInitiateAuth`` permission. 

131 

132 Returns a dict with ``IdToken``, ``AccessToken``, and 

133 ``RefreshToken`` on success. Raises ``botocore.exceptions.ClientError`` 

134 for Cognito-side failures (``NotAuthorizedException``, 

135 ``UserNotFoundException``, etc.). 

136 """ 

137 import boto3 

138 

139 cognito = boto3.client("cognito-idp", region_name=region) 

140 response = cognito.admin_initiate_auth( 

141 UserPoolId=pool_id, 

142 ClientId=client_id, 

143 AuthFlow="ADMIN_USER_PASSWORD_AUTH", 

144 AuthParameters={ 

145 "USERNAME": username, 

146 "PASSWORD": password, 

147 }, 

148 ) 

149 tokens = response.get("AuthenticationResult") or {} 

150 return { 

151 "IdToken": str(tokens.get("IdToken", "")), 

152 "AccessToken": str(tokens.get("AccessToken", "")), 

153 "RefreshToken": str(tokens.get("RefreshToken", "")), 

154 } 

155 

156 

157__all__ = [ 

158 "admin_create_user", 

159 "admin_delete_user", 

160 "admin_set_user_password", 

161 "check_ssm_parameter", 

162 "check_stack_complete", 

163 "discover_api_endpoint", 

164 "discover_cognito_client_id", 

165 "discover_cognito_pool_id", 

166 "fetch_studio_url", 

167 "generate_strong_password", 

168 "list_users", 

169 "scan_orphan_analytics_resources", 

170 "srp_authenticate", 

171] 

172 

173# --------------------------------------------------------------------------- 

174# Cognito user management helpers 

175# --------------------------------------------------------------------------- 

176 

177 

178def admin_create_user( 

179 pool_id: str, 

180 region: str, 

181 username: str, 

182 email: str | None = None, 

183 suppress_email: bool = False, 

184) -> tuple[dict[str, Any], str | None]: 

185 """Create a Cognito user via AdminCreateUser. 

186 

187 Returns ``(response, temporary_password)``. The temporary password 

188 is only set when Cognito echoes it in the response (it does this 

189 on some versions of the API when ``MessageAction=SUPPRESS``); when 

190 absent the caller should direct the operator to 

191 ``admin-set-user-password`` out-of-band. 

192 """ 

193 import boto3 

194 

195 user_attributes: list[dict[str, str]] = [] 

196 if email: 

197 user_attributes.append({"Name": "email", "Value": email}) 

198 user_attributes.append({"Name": "email_verified", "Value": "true"}) 

199 

200 kwargs: dict[str, Any] = { 

201 "UserPoolId": pool_id, 

202 "Username": username, 

203 "UserAttributes": user_attributes, 

204 } 

205 if suppress_email: 

206 kwargs["MessageAction"] = "SUPPRESS" 

207 

208 cognito = boto3.client("cognito-idp", region_name=region) 

209 response = cognito.admin_create_user(**kwargs) 

210 

211 temporary_password: str | None = None 

212 user = response.get("User", {}) 

213 for attr in user.get("Attributes", []) or []: 

214 if attr.get("Name") == "temporary_password": 

215 temporary_password = attr.get("Value") 

216 break 

217 if temporary_password is None: 

218 temporary_password = response.get("TemporaryPassword") 

219 

220 return response, temporary_password 

221 

222 

223def admin_set_user_password( 

224 pool_id: str, 

225 region: str, 

226 username: str, 

227 password: str, 

228 permanent: bool = True, 

229) -> None: 

230 """Set a Cognito user's password via AdminSetUserPassword. 

231 

232 ``permanent=True`` (the default) marks the password as already 

233 satisfying Cognito's ``NEW_PASSWORD_REQUIRED`` challenge so the 

234 user can sign in without a forced reset — matching what you'd 

235 get from ``aws cognito-idp admin-set-user-password --permanent``. 

236 Pass ``permanent=False`` to require the user to pick their own 

237 password on first login. 

238 """ 

239 import boto3 

240 

241 cognito = boto3.client("cognito-idp", region_name=region) 

242 cognito.admin_set_user_password( 

243 UserPoolId=pool_id, 

244 Username=username, 

245 Password=password, 

246 Permanent=permanent, 

247 ) 

248 

249 

250def generate_strong_password(length: int = 20) -> str: 

251 """Return a random password that satisfies Cognito's default policy. 

252 

253 Cognito's default password policy requires at least one uppercase 

254 letter, one lowercase letter, one digit, and one symbol, plus the 

255 length minimum (8). The generated password is sampled from 

256 :func:`secrets.choice` — cryptographically strong by construction — 

257 and guaranteed to contain one character from each required class, 

258 with the remaining characters drawn from the union. 

259 """ 

260 import secrets 

261 import string 

262 

263 if length < 8: 

264 raise ValueError(f"length must be >= 8 to satisfy Cognito policy; got {length}") 

265 

266 lowers = string.ascii_lowercase 

267 uppers = string.ascii_uppercase 

268 digits = string.digits 

269 # Cognito's allowed symbol set per AWS docs. Notably excludes space 

270 # and tab — Cognito rejects whitespace with InvalidParameterException. 

271 symbols = "^$*.[]{}()?-\"!@#%&/\\,><':;|_~`+=" 

272 

273 required = [ 

274 secrets.choice(lowers), 

275 secrets.choice(uppers), 

276 secrets.choice(digits), 

277 secrets.choice(symbols), 

278 ] 

279 alphabet = lowers + uppers + digits + symbols 

280 remaining = [secrets.choice(alphabet) for _ in range(length - len(required))] 

281 

282 # Shuffle so the required-class characters aren't always at the start. 

283 chars = required + remaining 

284 for i in range(len(chars) - 1, 0, -1): 

285 j = secrets.randbelow(i + 1) 

286 chars[i], chars[j] = chars[j], chars[i] 

287 

288 return "".join(chars) 

289 

290 

291def list_users(pool_id: str, region: str) -> list[dict[str, str]]: 

292 """Return a flat row-per-user list suitable for tabular output.""" 

293 import boto3 

294 

295 cognito = boto3.client("cognito-idp", region_name=region) 

296 response = cognito.list_users(UserPoolId=pool_id) 

297 

298 rows: list[dict[str, str]] = [] 

299 for user in response.get("Users", []) or []: 

300 row: dict[str, str] = { 

301 "username": user.get("Username", ""), 

302 "status": user.get("UserStatus", ""), 

303 "enabled": str(user.get("Enabled", "")), 

304 } 

305 for attr in user.get("Attributes", []) or []: 

306 if attr.get("Name") == "email": 

307 row["email"] = attr.get("Value", "") 

308 rows.append(row) 

309 return rows 

310 

311 

312def admin_delete_user(pool_id: str, region: str, username: str) -> None: 

313 """Delete a Cognito user via AdminDeleteUser.""" 

314 import boto3 

315 

316 cognito = boto3.client("cognito-idp", region_name=region) 

317 cognito.admin_delete_user(UserPoolId=pool_id, Username=username) 

318 

319 

320# --------------------------------------------------------------------------- 

321# HTTP helper for /studio/login 

322# --------------------------------------------------------------------------- 

323 

324 

325def fetch_studio_url(api_base: str, id_token: str) -> tuple[str, int, str]: 

326 """GET ``{api_base}/studio/login`` with the Cognito ID token. 

327 

328 Returns ``(url, expires_in, correlation_id)`` on success. Raises 

329 :class:`urllib.error.HTTPError` / :class:`urllib.error.URLError` 

330 on transport or HTTP failure; raises ``ValueError`` on malformed 

331 response bodies (unexpected JSON shape / missing ``url`` key), or 

332 on non-``https://`` ``api_base`` values (guards urllib's 

333 ``file://`` / ``ftp://`` scheme support). 

334 """ 

335 import email.message 

336 import json as _json 

337 import urllib.error 

338 import urllib.parse 

339 import urllib.request 

340 

341 # Scheme allow-list — urllib happily dereferences ``file://`` and 

342 # ``ftp://`` URLs, which is the shape of the semgrep 

343 # ``dynamic-urllib-use-detected`` finding. We only ever call this with 

344 # the API Gateway endpoint (HTTPS by construction), so reject anything 

345 # else before the urlopen call. 

346 parsed = urllib.parse.urlparse(api_base) 

347 if parsed.scheme != "https": 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 raise ValueError( 

349 f"api_base must use https:// scheme (got {parsed.scheme!r}). " 

350 "This guard rejects file:// / ftp:// schemes that urllib would " 

351 "otherwise follow." 

352 ) 

353 if not parsed.netloc: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 raise ValueError(f"api_base is missing a hostname: {api_base!r}") 

355 

356 login_url = api_base.rstrip("/") + "/studio/login" 

357 # Justification for the ``dynamic-urllib-use-detected`` / ``B310`` 

358 # suppressions below: ``login_url`` is built from ``api_base`` + a 

359 # static ``/studio/login`` suffix. The scheme allow-list near the top 

360 # of this function rejects any ``api_base`` that isn't ``https://`` 

361 # before we reach these lines, which closes the ``file://`` / 

362 # ``ftp://`` / ``custom`` scheme hole the rules are written to catch. 

363 # ``# fmt: off`` pins the block so the formatter can't re-wrap the 

364 # urlopen call — wrapping moves the suppression comments to the 

365 # wrong line and bandit / semgrep attach findings to the first 

366 # line of the call. 

367 # fmt: off 

368 request = urllib.request.Request( # nosec B310 # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected # noqa: S310 

369 login_url, 

370 headers={"Authorization": id_token, "Accept": "application/json"}, 

371 method="GET", 

372 ) 

373 with urllib.request.urlopen(request, timeout=30) as response: # nosec B310 # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected # noqa: S310 

374 status = int(response.status) 

375 body = response.read().decode("utf-8") 

376 correlation_id = response.headers.get("x-amzn-RequestId") or "N/A" 

377 # fmt: on 

378 

379 if status == 202: 379 ↛ 382line 379 didn't jump to line 382 because the condition on line 379 was never true

380 # Profile is still provisioning -- return empty URL so the caller 

381 # can poll. The body is ``{"status": "provisioning"}``. 

382 return "", 0, correlation_id 

383 

384 if status != 200: 

385 # HTTPError requires a Message (email.message.Message) as its 

386 # headers argument; build an empty one for determinism. 

387 headers_msg: email.message.Message = email.message.Message() 

388 headers_msg["x-amzn-RequestId"] = correlation_id 

389 raise urllib.error.HTTPError( 

390 login_url, 

391 status, 

392 f"Studio login returned HTTP {status}", 

393 headers_msg, 

394 None, 

395 ) 

396 

397 try: 

398 payload = _json.loads(body) 

399 url = str(payload["url"]) 

400 expires_in = int(payload.get("expires_in", 0)) 

401 except (ValueError, KeyError) as exc: 

402 raise ValueError(f"malformed /studio/login response: {exc!r}") from exc 

403 

404 return url, expires_in, correlation_id 

405 

406 

407# --------------------------------------------------------------------------- 

408# Doctor helpers 

409# --------------------------------------------------------------------------- 

410 

411 

412def check_stack_complete(region: str, stack_name: str) -> tuple[bool, str]: 

413 """Return ``(True, "")`` iff ``stack_name`` is in a healthy state. 

414 

415 Healthy states are ``CREATE_COMPLETE`` / ``UPDATE_COMPLETE`` / 

416 ``IMPORT_COMPLETE``. Any other status (or missing stack) returns 

417 ``(False, remediation_hint)``. 

418 """ 

419 import boto3 

420 from botocore.exceptions import BotoCoreError, ClientError 

421 

422 try: 

423 cfn = boto3.client("cloudformation", region_name=region) 

424 resp = cfn.describe_stacks(StackName=stack_name) 

425 except (ClientError, BotoCoreError) as exc: 

426 return False, f"describe_stacks failed in {region}: {exc!s}" 

427 stacks = resp.get("Stacks", []) 

428 if not stacks: 

429 return False, f"{stack_name} not found in {region}" 

430 status = stacks[0].get("StackStatus", "") 

431 if status in ("CREATE_COMPLETE", "UPDATE_COMPLETE", "IMPORT_COMPLETE"): 

432 return True, "" 

433 return False, f"{stack_name} in {region} has status {status}" 

434 

435 

436def check_ssm_parameter(region: str, param_name: str) -> tuple[bool, str]: 

437 """Return ``(True, "")`` iff the SSM parameter exists in ``region``. 

438 

439 Thin alias over :func:`gco.services.aws_ssm.check_ssm_parameter` 

440 that preserves the historical positional ``(region, param_name)`` 

441 argument order. Kept as a re-export so existing callers and the 

442 public ``__all__`` surface stay stable; new code should reach for 

443 the keyword-style helper directly. 

444 """ 

445 from gco.services.aws_ssm import check_ssm_parameter as _check 

446 

447 return _check(param_name, region=region) 

448 

449 

450def scan_orphan_analytics_resources(region: str) -> list[str]: 

451 """Return a list of copy-paste ``aws`` commands for retained resources. 

452 

453 Scans EFS and Cognito for resources tagged 

454 ``gco:analytics:managed=true``. An empty list means no orphans 

455 were found. 

456 """ 

457 import boto3 

458 from botocore.exceptions import BotoCoreError, ClientError 

459 

460 remediation: list[str] = [] 

461 try: 

462 efs = boto3.client("efs", region_name=region) 

463 for fs in efs.describe_file_systems().get("FileSystems", []) or []: 

464 fs_id = fs.get("FileSystemId", "") 

465 if not fs_id: 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true

466 continue 

467 tag_resp = efs.list_tags_for_resource(ResourceId=fs_id) 

468 tags = {t.get("Key"): t.get("Value") for t in tag_resp.get("Tags", []) or []} 

469 if tags.get("gco:analytics:managed") == "true": 

470 remediation.append(f"aws efs delete-file-system --file-system-id {fs_id}") 

471 except (ClientError, BotoCoreError) as exc: 

472 remediation.append(f"(EFS orphan scan failed: {exc!s})") 

473 

474 try: 

475 cognito = boto3.client("cognito-idp", region_name=region) 

476 pools = cognito.list_user_pools(MaxResults=60) 

477 for pool in pools.get("UserPools", []) or []: 

478 pool_id = pool.get("Id") 

479 if not pool_id: 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true

480 continue 

481 describe = cognito.describe_user_pool(UserPoolId=pool_id) 

482 tags = describe.get("UserPool", {}).get("UserPoolTags", {}) or {} 

483 if tags.get("gco:analytics:managed") == "true": 

484 remediation.append(f"aws cognito-idp delete-user-pool --user-pool-id {pool_id}") 

485 except (ClientError, BotoCoreError) as exc: 

486 remediation.append(f"(Cognito orphan scan failed: {exc!s})") 

487 

488 return remediation