Coverage for cli/analytics_user_mgmt.py: 93%
184 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2User management helpers for the GCO analytics environment.
4This module holds the pieces of the ``gco analytics`` CLI that are worth
5exercising in isolation from Click:
7* :func:`discover_cognito_pool_id` / :func:`discover_cognito_client_id`
8 / :func:`discover_api_endpoint` — single-stack CloudFormation output
9 lookups used by every sub-command to avoid forcing operators to hand a
10 pool id / api url on the command line.
11* :func:`srp_authenticate` — Cognito SRP authentication via the
12 ``pycognito`` library, used by ``gco analytics studio login``.
13"""
15from __future__ import annotations
17import logging
18from typing import Any
20# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit
21# Flowchart(s) generated from this file:
22# * ``srp_authenticate`` -> ``diagrams/code_diagrams/cli/analytics_user_mgmt.srp_authenticate.html``
23# (PNG: ``diagrams/code_diagrams/cli/analytics_user_mgmt.srp_authenticate.png``)
24# * ``fetch_studio_url`` -> ``diagrams/code_diagrams/cli/analytics_user_mgmt.fetch_studio_url.html``
25# (PNG: ``diagrams/code_diagrams/cli/analytics_user_mgmt.fetch_studio_url.png``)
26# Regenerate with ``python diagrams/code_diagrams/generate.py``.
27# <pyflowchart-code-diagram> END
30logger = logging.getLogger(__name__)
32# ---------------------------------------------------------------------------
33# CloudFormation output discovery
34# ---------------------------------------------------------------------------
37def _describe_stack_outputs(region: str, stack_name: str) -> list[dict[str, str]] | None:
38 """Return the ``Outputs`` list for ``stack_name`` in ``region``.
40 Returns ``None`` if the stack does not exist or the call fails.
41 Any non-transient error surfaces as ``None`` — callers raise the
42 user-facing error message themselves so the error copy can mention
43 ``gco analytics enable`` / ``gco stacks deploy gco-analytics``.
44 """
45 import boto3
46 from botocore.exceptions import BotoCoreError, ClientError
48 try:
49 cfn = boto3.client("cloudformation", region_name=region)
50 response = cfn.describe_stacks(StackName=stack_name)
51 except (ClientError, BotoCoreError) as exc:
52 logger.debug("describe_stacks(%s) in %s failed: %s", stack_name, region, exc)
53 return None
55 stacks = response.get("Stacks", [])
56 if not stacks: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 return None
58 outputs = stacks[0].get("Outputs", [])
59 return list(outputs) if isinstance(outputs, list) else []
62def _find_output(outputs: list[dict[str, str]], key: str) -> str | None:
63 """Return the ``OutputValue`` for ``key`` in a CloudFormation outputs list."""
64 for output in outputs:
65 if output.get("OutputKey") == key:
66 value = output.get("OutputValue")
67 return value if isinstance(value, str) else None
68 return None
71def discover_cognito_pool_id(region: str, project_name: str = "gco") -> str | None:
72 """Return the Cognito user pool id published by ``gco-analytics``.
74 Returns ``None`` when the ``gco-analytics`` stack does not exist or
75 when the stack exists but the ``CognitoUserPoolId`` output is
76 missing. The CLI callers translate ``None`` into the documented
77 "gco-analytics stack not deployed" error message.
78 """
79 stack_name = f"{project_name}-analytics"
80 outputs = _describe_stack_outputs(region, stack_name)
81 if outputs is None:
82 return None
83 return _find_output(outputs, "CognitoUserPoolId")
86def discover_cognito_client_id(region: str, project_name: str = "gco") -> str | None:
87 """Return the Cognito SRP client id published by ``gco-analytics``.
89 Looked up on the same stack as :func:`discover_cognito_pool_id`.
90 Returns ``None`` when the stack or output is missing.
91 """
92 stack_name = f"{project_name}-analytics"
93 outputs = _describe_stack_outputs(region, stack_name)
94 if outputs is None:
95 return None
96 return _find_output(outputs, "CognitoUserPoolClientId")
99def discover_api_endpoint(region: str, project_name: str = "gco") -> str | None:
100 """Return the API Gateway base URL published by ``gco-api-gateway``.
102 The returned value is the ``ApiEndpoint`` CloudFormation output,
103 typically of the form ``https://<id>.execute-api.<region>.amazonaws.com/prod/``.
104 Returns ``None`` when the stack or output is missing.
105 """
106 stack_name = f"{project_name}-api-gateway"
107 outputs = _describe_stack_outputs(region, stack_name)
108 if outputs is None:
109 return None
110 return _find_output(outputs, "ApiEndpoint")
113# ---------------------------------------------------------------------------
114# Cognito authentication
115# ---------------------------------------------------------------------------
118def srp_authenticate(
119 pool_id: str,
120 client_id: str,
121 username: str,
122 password: str,
123 region: str,
124) -> dict[str, str]:
125 """Authenticate a Cognito user via the ADMIN_USER_PASSWORD_AUTH flow.
127 Uses ``admin_initiate_auth`` which sends the password over TLS
128 directly (no client-side SRP math). This requires the user pool
129 client to have ``ALLOW_ADMIN_USER_PASSWORD_AUTH`` enabled and the
130 caller to have ``cognito-idp:AdminInitiateAuth`` permission.
132 Returns a dict with ``IdToken``, ``AccessToken``, and
133 ``RefreshToken`` on success. Raises ``botocore.exceptions.ClientError``
134 for Cognito-side failures (``NotAuthorizedException``,
135 ``UserNotFoundException``, etc.).
136 """
137 import boto3
139 cognito = boto3.client("cognito-idp", region_name=region)
140 response = cognito.admin_initiate_auth(
141 UserPoolId=pool_id,
142 ClientId=client_id,
143 AuthFlow="ADMIN_USER_PASSWORD_AUTH",
144 AuthParameters={
145 "USERNAME": username,
146 "PASSWORD": password,
147 },
148 )
149 tokens = response.get("AuthenticationResult") or {}
150 return {
151 "IdToken": str(tokens.get("IdToken", "")),
152 "AccessToken": str(tokens.get("AccessToken", "")),
153 "RefreshToken": str(tokens.get("RefreshToken", "")),
154 }
157__all__ = [
158 "admin_create_user",
159 "admin_delete_user",
160 "admin_set_user_password",
161 "check_ssm_parameter",
162 "check_stack_complete",
163 "discover_api_endpoint",
164 "discover_cognito_client_id",
165 "discover_cognito_pool_id",
166 "fetch_studio_url",
167 "generate_strong_password",
168 "list_users",
169 "scan_orphan_analytics_resources",
170 "srp_authenticate",
171]
173# ---------------------------------------------------------------------------
174# Cognito user management helpers
175# ---------------------------------------------------------------------------
178def admin_create_user(
179 pool_id: str,
180 region: str,
181 username: str,
182 email: str | None = None,
183 suppress_email: bool = False,
184) -> tuple[dict[str, Any], str | None]:
185 """Create a Cognito user via AdminCreateUser.
187 Returns ``(response, temporary_password)``. The temporary password
188 is only set when Cognito echoes it in the response (it does this
189 on some versions of the API when ``MessageAction=SUPPRESS``); when
190 absent the caller should direct the operator to
191 ``admin-set-user-password`` out-of-band.
192 """
193 import boto3
195 user_attributes: list[dict[str, str]] = []
196 if email:
197 user_attributes.append({"Name": "email", "Value": email})
198 user_attributes.append({"Name": "email_verified", "Value": "true"})
200 kwargs: dict[str, Any] = {
201 "UserPoolId": pool_id,
202 "Username": username,
203 "UserAttributes": user_attributes,
204 }
205 if suppress_email:
206 kwargs["MessageAction"] = "SUPPRESS"
208 cognito = boto3.client("cognito-idp", region_name=region)
209 response = cognito.admin_create_user(**kwargs)
211 temporary_password: str | None = None
212 user = response.get("User", {})
213 for attr in user.get("Attributes", []) or []:
214 if attr.get("Name") == "temporary_password":
215 temporary_password = attr.get("Value")
216 break
217 if temporary_password is None:
218 temporary_password = response.get("TemporaryPassword")
220 return response, temporary_password
223def admin_set_user_password(
224 pool_id: str,
225 region: str,
226 username: str,
227 password: str,
228 permanent: bool = True,
229) -> None:
230 """Set a Cognito user's password via AdminSetUserPassword.
232 ``permanent=True`` (the default) marks the password as already
233 satisfying Cognito's ``NEW_PASSWORD_REQUIRED`` challenge so the
234 user can sign in without a forced reset — matching what you'd
235 get from ``aws cognito-idp admin-set-user-password --permanent``.
236 Pass ``permanent=False`` to require the user to pick their own
237 password on first login.
238 """
239 import boto3
241 cognito = boto3.client("cognito-idp", region_name=region)
242 cognito.admin_set_user_password(
243 UserPoolId=pool_id,
244 Username=username,
245 Password=password,
246 Permanent=permanent,
247 )
250def generate_strong_password(length: int = 20) -> str:
251 """Return a random password that satisfies Cognito's default policy.
253 Cognito's default password policy requires at least one uppercase
254 letter, one lowercase letter, one digit, and one symbol, plus the
255 length minimum (8). The generated password is sampled from
256 :func:`secrets.choice` — cryptographically strong by construction —
257 and guaranteed to contain one character from each required class,
258 with the remaining characters drawn from the union.
259 """
260 import secrets
261 import string
263 if length < 8:
264 raise ValueError(f"length must be >= 8 to satisfy Cognito policy; got {length}")
266 lowers = string.ascii_lowercase
267 uppers = string.ascii_uppercase
268 digits = string.digits
269 # Cognito's allowed symbol set per AWS docs. Notably excludes space
270 # and tab — Cognito rejects whitespace with InvalidParameterException.
271 symbols = "^$*.[]{}()?-\"!@#%&/\\,><':;|_~`+="
273 required = [
274 secrets.choice(lowers),
275 secrets.choice(uppers),
276 secrets.choice(digits),
277 secrets.choice(symbols),
278 ]
279 alphabet = lowers + uppers + digits + symbols
280 remaining = [secrets.choice(alphabet) for _ in range(length - len(required))]
282 # Shuffle so the required-class characters aren't always at the start.
283 chars = required + remaining
284 for i in range(len(chars) - 1, 0, -1):
285 j = secrets.randbelow(i + 1)
286 chars[i], chars[j] = chars[j], chars[i]
288 return "".join(chars)
291def list_users(pool_id: str, region: str) -> list[dict[str, str]]:
292 """Return a flat row-per-user list suitable for tabular output."""
293 import boto3
295 cognito = boto3.client("cognito-idp", region_name=region)
296 response = cognito.list_users(UserPoolId=pool_id)
298 rows: list[dict[str, str]] = []
299 for user in response.get("Users", []) or []:
300 row: dict[str, str] = {
301 "username": user.get("Username", ""),
302 "status": user.get("UserStatus", ""),
303 "enabled": str(user.get("Enabled", "")),
304 }
305 for attr in user.get("Attributes", []) or []:
306 if attr.get("Name") == "email":
307 row["email"] = attr.get("Value", "")
308 rows.append(row)
309 return rows
312def admin_delete_user(pool_id: str, region: str, username: str) -> None:
313 """Delete a Cognito user via AdminDeleteUser."""
314 import boto3
316 cognito = boto3.client("cognito-idp", region_name=region)
317 cognito.admin_delete_user(UserPoolId=pool_id, Username=username)
320# ---------------------------------------------------------------------------
321# HTTP helper for /studio/login
322# ---------------------------------------------------------------------------
325def fetch_studio_url(api_base: str, id_token: str) -> tuple[str, int, str]:
326 """GET ``{api_base}/studio/login`` with the Cognito ID token.
328 Returns ``(url, expires_in, correlation_id)`` on success. Raises
329 :class:`urllib.error.HTTPError` / :class:`urllib.error.URLError`
330 on transport or HTTP failure; raises ``ValueError`` on malformed
331 response bodies (unexpected JSON shape / missing ``url`` key), or
332 on non-``https://`` ``api_base`` values (guards urllib's
333 ``file://`` / ``ftp://`` scheme support).
334 """
335 import email.message
336 import json as _json
337 import urllib.error
338 import urllib.parse
339 import urllib.request
341 # Scheme allow-list — urllib happily dereferences ``file://`` and
342 # ``ftp://`` URLs, which is the shape of the semgrep
343 # ``dynamic-urllib-use-detected`` finding. We only ever call this with
344 # the API Gateway endpoint (HTTPS by construction), so reject anything
345 # else before the urlopen call.
346 parsed = urllib.parse.urlparse(api_base)
347 if parsed.scheme != "https": 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 raise ValueError(
349 f"api_base must use https:// scheme (got {parsed.scheme!r}). "
350 "This guard rejects file:// / ftp:// schemes that urllib would "
351 "otherwise follow."
352 )
353 if not parsed.netloc: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 raise ValueError(f"api_base is missing a hostname: {api_base!r}")
356 login_url = api_base.rstrip("/") + "/studio/login"
357 # Justification for the ``dynamic-urllib-use-detected`` / ``B310``
358 # suppressions below: ``login_url`` is built from ``api_base`` + a
359 # static ``/studio/login`` suffix. The scheme allow-list near the top
360 # of this function rejects any ``api_base`` that isn't ``https://``
361 # before we reach these lines, which closes the ``file://`` /
362 # ``ftp://`` / ``custom`` scheme hole the rules are written to catch.
363 # ``# fmt: off`` pins the block so the formatter can't re-wrap the
364 # urlopen call — wrapping moves the suppression comments to the
365 # wrong line and bandit / semgrep attach findings to the first
366 # line of the call.
367 # fmt: off
368 request = urllib.request.Request( # nosec B310 # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected # noqa: S310
369 login_url,
370 headers={"Authorization": id_token, "Accept": "application/json"},
371 method="GET",
372 )
373 with urllib.request.urlopen(request, timeout=30) as response: # nosec B310 # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected # noqa: S310
374 status = int(response.status)
375 body = response.read().decode("utf-8")
376 correlation_id = response.headers.get("x-amzn-RequestId") or "N/A"
377 # fmt: on
379 if status == 202: 379 ↛ 382line 379 didn't jump to line 382 because the condition on line 379 was never true
380 # Profile is still provisioning -- return empty URL so the caller
381 # can poll. The body is ``{"status": "provisioning"}``.
382 return "", 0, correlation_id
384 if status != 200:
385 # HTTPError requires a Message (email.message.Message) as its
386 # headers argument; build an empty one for determinism.
387 headers_msg: email.message.Message = email.message.Message()
388 headers_msg["x-amzn-RequestId"] = correlation_id
389 raise urllib.error.HTTPError(
390 login_url,
391 status,
392 f"Studio login returned HTTP {status}",
393 headers_msg,
394 None,
395 )
397 try:
398 payload = _json.loads(body)
399 url = str(payload["url"])
400 expires_in = int(payload.get("expires_in", 0))
401 except (ValueError, KeyError) as exc:
402 raise ValueError(f"malformed /studio/login response: {exc!r}") from exc
404 return url, expires_in, correlation_id
407# ---------------------------------------------------------------------------
408# Doctor helpers
409# ---------------------------------------------------------------------------
412def check_stack_complete(region: str, stack_name: str) -> tuple[bool, str]:
413 """Return ``(True, "")`` iff ``stack_name`` is in a healthy state.
415 Healthy states are ``CREATE_COMPLETE`` / ``UPDATE_COMPLETE`` /
416 ``IMPORT_COMPLETE``. Any other status (or missing stack) returns
417 ``(False, remediation_hint)``.
418 """
419 import boto3
420 from botocore.exceptions import BotoCoreError, ClientError
422 try:
423 cfn = boto3.client("cloudformation", region_name=region)
424 resp = cfn.describe_stacks(StackName=stack_name)
425 except (ClientError, BotoCoreError) as exc:
426 return False, f"describe_stacks failed in {region}: {exc!s}"
427 stacks = resp.get("Stacks", [])
428 if not stacks:
429 return False, f"{stack_name} not found in {region}"
430 status = stacks[0].get("StackStatus", "")
431 if status in ("CREATE_COMPLETE", "UPDATE_COMPLETE", "IMPORT_COMPLETE"):
432 return True, ""
433 return False, f"{stack_name} in {region} has status {status}"
436def check_ssm_parameter(region: str, param_name: str) -> tuple[bool, str]:
437 """Return ``(True, "")`` iff the SSM parameter exists in ``region``.
439 Thin alias over :func:`gco.services.aws_ssm.check_ssm_parameter`
440 that preserves the historical positional ``(region, param_name)``
441 argument order. Kept as a re-export so existing callers and the
442 public ``__all__`` surface stay stable; new code should reach for
443 the keyword-style helper directly.
444 """
445 from gco.services.aws_ssm import check_ssm_parameter as _check
447 return _check(param_name, region=region)
450def scan_orphan_analytics_resources(region: str) -> list[str]:
451 """Return a list of copy-paste ``aws`` commands for retained resources.
453 Scans EFS and Cognito for resources tagged
454 ``gco:analytics:managed=true``. An empty list means no orphans
455 were found.
456 """
457 import boto3
458 from botocore.exceptions import BotoCoreError, ClientError
460 remediation: list[str] = []
461 try:
462 efs = boto3.client("efs", region_name=region)
463 for fs in efs.describe_file_systems().get("FileSystems", []) or []:
464 fs_id = fs.get("FileSystemId", "")
465 if not fs_id: 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true
466 continue
467 tag_resp = efs.list_tags_for_resource(ResourceId=fs_id)
468 tags = {t.get("Key"): t.get("Value") for t in tag_resp.get("Tags", []) or []}
469 if tags.get("gco:analytics:managed") == "true":
470 remediation.append(f"aws efs delete-file-system --file-system-id {fs_id}")
471 except (ClientError, BotoCoreError) as exc:
472 remediation.append(f"(EFS orphan scan failed: {exc!s})")
474 try:
475 cognito = boto3.client("cognito-idp", region_name=region)
476 pools = cognito.list_user_pools(MaxResults=60)
477 for pool in pools.get("UserPools", []) or []:
478 pool_id = pool.get("Id")
479 if not pool_id: 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true
480 continue
481 describe = cognito.describe_user_pool(UserPoolId=pool_id)
482 tags = describe.get("UserPool", {}).get("UserPoolTags", {}) or {}
483 if tags.get("gco:analytics:managed") == "true":
484 remediation.append(f"aws cognito-idp delete-user-pool --user-pool-id {pool_id}")
485 except (ClientError, BotoCoreError) as exc:
486 remediation.append(f"(Cognito orphan scan failed: {exc!s})")
488 return remediation