aws_iatk
1# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 2# SPDX-License-Identifier: Apache-2.0 3 4from subprocess import Popen, PIPE 5from datetime import datetime 6import pathlib 7import json 8import logging 9from dataclasses import dataclass 10from functools import wraps 11from typing import TYPE_CHECKING, Optional, Dict, List, Callable 12import time 13import math 14 15from .get_physical_id_from_stack import ( 16 PhysicalIdFromStackOutput, 17 PhysicalIdFromStackParams, 18) 19from .get_stack_outputs import ( 20 GetStackOutputsOutput, 21 GetStackOutputsParams, 22) 23from .add_eb_listener import ( 24 AddEbListenerOutput, 25 AddEbListenerParams, 26) 27from .remove_listeners import ( 28 RemoveListenersOutput, 29 RemoveListenersParams, 30 RemoveListeners_TagFilter, 31) 32from .poll_events import ( 33 PollEventsOutput, 34 PollEventsParams, 35 WaitUntilEventMatchedParams, 36) 37from .get_trace_tree import ( 38 GetTraceTreeOutput, 39 GetTraceTreeParams 40) 41from .retry_xray_trace import ( 42 RetryGetTraceTreeUntilParams, 43) 44from .generate_mock_event import ( 45 GenerateBareboneEventOutput, 46 GenerateBareboneEventParams, 47 GenerateMockEventOutput, 48 GenerateMockEventParams, 49) 50from .jsonrpc import Payload 51 52if TYPE_CHECKING: 53 import boto3 54 55__all__ = [ 56 "AwsIatk", 57 "IatkException", 58 "PhysicalIdFromStackOutput", 59 "GetStackOutputsOutput", 60 "AddEbListenerOutput", 61 "RemoveListenersOutput", 62 "RemoveListeners_TagFilter", 63 "PollEventsOutput", 64 "GetTraceTreeOutput", 65 "GenerateBareboneEventOutput", 66 "GenerateMockEventOutput", 67] 68 69LOG = logging.getLogger(__name__) 70iatk_service_logger = logging.getLogger("iatk.service") 71 72 73def _log_duration(func): 74 @wraps(func) 75 def wrapper(*args, **kwargs): 76 start = datetime.now() 77 ret = func(*args, **kwargs) 78 LOG.debug("elapsed: %s seconds", (datetime.now() - start).total_seconds()) 79 return ret 80 81 return wrapper 82 83 84class IatkException(Exception): 85 def __init__(self, message, error_code) -> None: 86 super().__init__(message) 87 88 self.error_code = error_code 89 90class RetryableException(Exception): 91 def __init__(self, message) -> None: 92 super().__init__(message) 93 94@dataclass 95class AwsIatk: 96 """ 97 Creates and setups AWS Integrated Application Test Kit 98 99 Parameters 100 ---------- 101 region : str, optional 102 AWS Region used to interact with AWS 103 profile: str, optional 104 AWS Profile used to communicate with AWS resources 105 """ 106 region: Optional[str] = None 107 profile: Optional[str] = None 108 109 _iatk_binary_path = ( 110 pathlib.Path(__file__).parent.parent.joinpath("iatk_service", "iatk").absolute() 111 ) 112 113 def get_physical_id_from_stack( 114 self, logical_resource_id: str, stack_name: str 115 ) -> PhysicalIdFromStackOutput: 116 """ 117 Fetch a Phsyical Id from a Logical Id within an AWS CloudFormation stack 118 119 IAM Permissions Needed 120 ---------------------- 121 cloudformation:DescribeStackResources 122 123 Parameters 124 ---------- 125 logical_resource_id : str 126 Name of the Logical Id within the Stack to fetch 127 stack_name : str 128 Name of the CloudFormation Stack 129 130 Returns 131 ------- 132 PhysicalIdFromStackOutput 133 Data Class that holds the Phsyical Id of the resource 134 135 Raises 136 ------ 137 IatkException 138 When failed to fetch Phsyical Id 139 """ 140 params = PhysicalIdFromStackParams(logical_resource_id, stack_name) 141 payload = params.to_payload(self.region, self.profile) 142 response = self._invoke_iatk(payload) 143 output = PhysicalIdFromStackOutput(response) 144 LOG.debug(f"Physical id: {output.physical_id}, Logical id: {params.logical_resource_id}") 145 return output 146 147 def get_stack_outputs(self, stack_name: str, output_names: List[str]) -> GetStackOutputsOutput: 148 """ 149 Fetch Stack Outputs from an AWS CloudFormation stack 150 151 IAM Permissions Needed 152 ---------------------- 153 cloudformation:DescribeStacks 154 155 Parameters 156 ---------- 157 stack_name : str 158 Name of the Stack 159 output_names : List[str] 160 List of strings that represent the StackOutput Keys 161 162 Returns 163 ------- 164 GetStackOutputsOutput 165 Data Class that holds the Stack Outputs of the resource 166 167 Raises 168 ------ 169 IatkException 170 When failed to fetch Stack Outputs 171 """ 172 params = GetStackOutputsParams(stack_name, output_names) 173 payload = params.to_payload(self.region, self.profile) 174 response = self._invoke_iatk(payload) 175 output = GetStackOutputsOutput(response) 176 LOG.debug(f"Output: {output}") 177 return output 178 179 def add_listener(self, event_bus_name: str, rule_name: str, target_id: Optional[str] = None, tags: Optional[Dict[str, str]] = None) -> AddEbListenerOutput: 180 """ 181 Add Listener Resource to an AWS Event Bridge Bus to enable testing 182 183 IAM Permissions Needed 184 ---------------------- 185 events:DescribeEventBus 186 events:DescribeRule 187 events:PutRule 188 events:PutTargets 189 events:DeleteRule 190 events:RemoveTargets 191 events:TagResource 192 193 sqs:CreateQueue 194 sqs:GetQueueAttributes 195 sqs:GetQueueUrl 196 sqs:DeleteQueue 197 sqs:TagQueue 198 199 Parameters 200 ---------- 201 event_bus_name : str 202 Name of the AWS Event Bus 203 rule_name : str 204 Name of a Rule on the EventBus to replicate 205 target_id : str, optional 206 Target Id on the given rule to replicate 207 tags : Dict[str, str], optional 208 A key-value pair associated EventBridge rule. 209 210 Returns 211 ------- 212 AddEbListenerOutput 213 Data Class that holds the Listener created 214 215 Raises 216 ------ 217 IatkException 218 When failed to add listener 219 """ 220 params = AddEbListenerParams(event_bus_name, rule_name, target_id, tags) 221 payload = params.to_payload(self.region, self.profile) 222 response = self._invoke_iatk(payload) 223 output = AddEbListenerOutput(response) 224 LOG.debug(f"Output: {output}") 225 return output 226 227 def remove_listeners(self, ids: Optional[List[str]] = None, tag_filters: Optional[List[RemoveListeners_TagFilter]] = None) -> RemoveListenersOutput: 228 """ 229 Remove Listener Resource(s) from an AWS Event Bridge Bus 230 231 IAM Permissions Needed 232 ---------------------- 233 tag:GetResources 234 235 sqs:DeleteQueue 236 sqs:GetQueueUrl 237 sqs:GetQueueAttributes 238 sqs:ListQueueTags 239 240 events:DeleteRule 241 events:RemoveTargets 242 events:ListTargetsByRule 243 244 Parameters 245 ---------- 246 ids : List[str], optional 247 List of Listener Ids to remove, one of ids and tag_filters must be supplied 248 tag_filters : List[RemoveListeners_TagFilter], optional 249 List of RemoveListeners_TagFilter, one of ids and tag_filters must be supplied 250 251 Returns 252 ------- 253 RemoveListenersOutput 254 Data Class that holds the Listener(s) that were removed 255 256 Raises 257 ------ 258 IatkException 259 When failed to remove listener(s) 260 """ 261 params = RemoveListenersParams(ids, tag_filters) 262 payload = params.to_payload(self.region, self.profile) 263 response = self._invoke_iatk(payload) 264 output = RemoveListenersOutput(response) 265 LOG.debug(f"Output: {output}") 266 return output 267 268 def _poll_events(self, params: PollEventsParams, caller: str=None) -> PollEventsOutput: 269 """ 270 underlying implementation for poll_events and wait_until_event_matched 271 """ 272 payload = params.to_payload(self.region, self.profile) 273 response = self._invoke_iatk(payload, caller) 274 output = PollEventsOutput(response) 275 return output 276 277 def poll_events(self, listener_id: str, wait_time_seconds: int, max_number_of_messages: int) -> PollEventsOutput: 278 """ 279 Poll Events from a specific Listener 280 281 IAM Permissions Needed 282 ---------------------- 283 sqs:GetQueueUrl 284 sqs:ListQueueTags 285 sqs:ReceiveMessage 286 sqs:DeleteMessage 287 sqs:GetQueueAttributes 288 289 events:DescribeRule 290 291 Parameters 292 ---------- 293 listener_id : str 294 Id of the Listener that was created 295 wait_time_seconds : int 296 Time in seconds to wait for polling 297 max_number_of_messages : int 298 Max number of messages to poll 299 300 Returns 301 ------- 302 PollEventsOutput 303 Data Class that holds the Events captured by the Listener 304 305 Raises 306 ------ 307 IatkException 308 When failed to Poll Events 309 """ 310 params = PollEventsParams(listener_id, wait_time_seconds, max_number_of_messages) 311 output = self._poll_events(params) 312 LOG.debug(f"Output: {output}") 313 return output 314 315 def wait_until_event_matched(self, listener_id: str, assertion_fn: Callable[[str], None], timeout_seconds: int = 30) -> bool: 316 """ 317 Poll Events on a given Listener until a match is found or timeout met. 318 319 IAM Permissions Needed 320 ---------------------- 321 sqs:GetQueueUrl 322 sqs:ListQueueTags 323 sqs:ReceiveMessage 324 sqs:DeleteMessage 325 sqs:GetQueueAttributes 326 327 events:DescribeRule 328 329 Parameters 330 ---------- 331 listener_id : str 332 Id of the Listener that was created 333 assertion_fn : Callable[[str], bool] 334 Callable function that has an assertion and raises an AssertionError if it fails 335 timeout_seconds : int 336 Timeout (in seconds) to stop the polling 337 338 Returns 339 ------- 340 bool 341 True if the event was matched before timeout otherwise False 342 343 Raises 344 ------ 345 IatkException 346 When failed to Poll Events 347 """ 348 params = WaitUntilEventMatchedParams(listener_id, assertion_fn, timeout_seconds) 349 start = datetime.now() 350 elapsed = lambda _: (datetime.now() - start).total_seconds() 351 while elapsed(None) < params.timeout_seconds: 352 out = self._poll_events(params=params._poll_event_params, caller="wait_until_event_matched") 353 events = out.events 354 if events: 355 for event in events: 356 try: 357 params.assertion_fn(event) 358 LOG.debug("event matched") 359 return True 360 except AssertionError as e: 361 LOG.debug(f"Assertion failed: {e}") 362 363 LOG.debug(f"timeout after {params.timeout_seconds} seconds") 364 LOG.debug("no matching event found") 365 return False 366 367 def _get_trace_tree(self, params: GetTraceTreeParams, caller: str=None) -> GetTraceTreeOutput: 368 """ 369 underlying implementation for get_trace_tree and retry_get_trace_tree_until 370 """ 371 payload = params.to_payload(self.region, self.profile) 372 response = self._invoke_iatk(payload, caller) 373 output = GetTraceTreeOutput(response) 374 return output 375 376 def get_trace_tree( 377 self, tracing_header: str 378 ) -> GetTraceTreeOutput: 379 """ 380 Fetch the trace tree structure using the provided tracing_header 381 382 IAM Permissions Needed 383 ---------------------- 384 xray:BatchGetTraces 385 386 Parameters 387 ---------- 388 tracing_header : str 389 Trace header to get the trace tree 390 391 Returns 392 ------- 393 GetTraceTreeOutput 394 Data Class that holds the trace tree structure 395 396 Raises 397 ------ 398 IatkException 399 When failed to fetch a trace tree 400 """ 401 params = GetTraceTreeParams(tracing_header) 402 output = self._get_trace_tree(params) 403 return output 404 405 def _generate_barebone_event( 406 self, params: GenerateBareboneEventParams, 407 ) -> GenerateBareboneEventOutput: 408 payload = params.to_payload(self.region, self.profile) 409 response = self._invoke_iatk(payload) 410 output = GenerateBareboneEventOutput(response) 411 return output 412 413 def generate_mock_event( 414 self, 415 registry_name: Optional[str] = None, 416 schema_name: Optional[str] = None, 417 schema_version: Optional[str] = None, 418 event_ref: Optional[str] = None, 419 skip_optional: Optional[bool] = None, 420 contexts: Optional[List[Callable[[dict], dict]]] = None 421 ) -> GenerateMockEventOutput: 422 """ 423 Generate a mock event based on a schema from EventBridge Schema Registry 424 425 IAM Permissions Needed 426 ---------------------- 427 schemas:DescribeSchema 428 429 Parameters 430 ---------- 431 registry_name : str 432 name of the registry of the schema stored in EventBridge Schema Registry 433 schema_name : str 434 name of the schema stored in EventBridge Schema Registry 435 schema_version : str 436 version of the schema stored in EventBridge Schema Registry 437 event_ref : str 438 location to the event in the schema in json schema ref syntax, only applicable for openapi schema 439 skip_optional : bool 440 if set to true, do not generate optional fields 441 contexts : List[Callable[[dict], dict]] 442 a list of callables to apply context on the generated mock event 443 444 Returns 445 ------- 446 GenerateMockEventOutput 447 Data Class that holds the trace tree structure 448 449 Raises 450 ------ 451 IatkException 452 When failed to fetch a trace tree 453 """ 454 params = GenerateMockEventParams(registry_name, schema_name, schema_version, event_ref, skip_optional, contexts) 455 out = self._generate_barebone_event( 456 GenerateBareboneEventParams( 457 registry_name=params.registry_name, 458 schema_name=params.schema_name, 459 schema_version=params.schema_version, 460 event_ref=params.event_ref, 461 skip_optional=params.skip_optional, 462 ) 463 ) 464 event = out.event 465 466 if params.contexts is not None and type(params.contexts) == list: 467 event = self._apply_contexts(event, params.contexts) 468 469 return GenerateMockEventOutput(event) 470 471 def retry_until(self, assertion_fn, timeout = 10, retryable_exceptions = (RetryableException,)): 472 """ 473 Decorator function to retry until condition or timeout is met 474 475 IAM Permissions Needed 476 ---------------------- 477 478 Parameters 479 ---------- 480 assertion_fn: Callable[[any], None] 481 Callable function that has an assertion and raises an AssertionError if it fails 482 timeout: int or float 483 value that specifies how long the function will retry for until it times out 484 485 Returns 486 ------- 487 bool 488 True if the condition was met or false if the timeout is met 489 490 Raises 491 ------ 492 ValueException 493 When timeout is a negative number 494 TypeException 495 When timeout or condition is not a suitable type 496 """ 497 if(not(isinstance(timeout, int) or isinstance(timeout, float))): 498 raise TypeError("timeout must be an int or float") 499 elif(timeout < 0): 500 raise ValueError("timeout must not be a negative value") 501 if(not callable(assertion_fn)): 502 raise TypeError("condition is not a callable function") 503 def retry_until_decorator(func): 504 @wraps(func) 505 def _wrapper(*args, **kwargs): 506 start = datetime.now() 507 attempt = 1 508 delay = 0.05 509 elapsed = lambda _: (datetime.now() - start).total_seconds() 510 if timeout == 0: 511 elapsed = lambda _: -1 512 while elapsed(None) < timeout: 513 try: 514 output = func(*args, **kwargs) 515 except retryable_exceptions: 516 continue 517 try: 518 assertion_fn(output) 519 return True 520 except AssertionError as e: 521 LOG.debug(f"Assertion failed: {e}") 522 time.sleep(math.pow(2, attempt) * delay) 523 attempt += 1 524 LOG.debug(f"timeout after {timeout} seconds") 525 LOG.debug("condition not satisfied") 526 return False 527 return _wrapper 528 return retry_until_decorator 529 530 def _apply_contexts(self, generated_event: dict, callable_contexts: List[Callable]) -> dict: 531 """ 532 function for looping through provided functions, modifying the event as the client specifies 533 """ 534 for func in callable_contexts: 535 generated_event = func(generated_event) 536 try: 537 json.dumps(generated_event) 538 except TypeError: 539 raise IatkException(f"context applier {func.__name__} returns a non-JSON-serializable result", 400) 540 if generated_event is None: 541 raise IatkException("event is empty, make sure function returns a valid event", 404) 542 return generated_event 543 544 def patch_aws_client(self, client: "boto3.client", sampled = 1) -> "boto3.client": 545 """ 546 Patches boto3 client to register event to include generated x-ray trace id and sampling rule as part of request header before invoke/execution 547 548 Parameters 549 ---------- 550 params : boto3.client 551 boto3.client for specified aws service 552 sampled : int 553 value 0 to not sample the request or value 1 to sample the request 554 555 Returns 556 ------- 557 boto3.client 558 same client passed in the params with the event registered 559 """ 560 def _add_header(request, **kwargs): 561 trace_id_string= 'Root=;Sampled={}'.format(sampled) 562 563 request.headers.add_header('X-Amzn-Trace-Id', trace_id_string) 564 LOG.debug(f"Trace ID format: {trace_id_string}") 565 566 service_name = client.meta.service_model.service_name 567 event_string = 'before-sign.{}.*' 568 LOG.debug(f"service id: {client.meta.service_model.service_id}, service name: {service_name}") 569 570 client.meta.events.register(event_string.format(service_name), _add_header) 571 572 return client 573 574 @_log_duration 575 def _invoke_iatk(self, payload: Payload, caller: str=None) -> dict: 576 input_data = payload.dump_bytes(caller) 577 LOG.debug("payload: %s", input_data) 578 stdout_data = self._popen_iatk(input_data) 579 jsonrpc_data = stdout_data.decode("utf-8") 580 data_dict = json.loads(jsonrpc_data.strip()) 581 582 # Error is only returned in the response if one happened. Otherwise it is omitted 583 if data_dict.get("error", None): 584 message = data_dict.get("error", {}).get("message", "") 585 error_code = data_dict.get("error", {}).get("Code", 0) 586 raise IatkException(message=message, error_code=error_code) 587 588 return data_dict 589 590 def _popen_iatk(self, input: bytes, env_vars: Optional[dict]=None) -> bytes: 591 LOG.debug("calling iatk rpc with input %s", input) 592 p = Popen([self._iatk_binary_path], stdout=PIPE, stdin=PIPE, stderr=PIPE) 593 594 out, err = p.communicate(input=input) 595 for line in err.splitlines(): 596 iatk_service_logger.debug(line.decode()) 597 return out 598 599 def _raise_error_if_returned(self, output): 600 jsonrpc_data = output.decode("utf-8") 601 data_dict = json.loads(jsonrpc_data.strip()) 602 603 # Error is only returned in the response if one happened. Otherwise it is omitted 604 if data_dict.get("error", None): 605 message = data_dict.get("error", {}).get("message", "") 606 error_code = data_dict.get("error", {}).get("Code", 0) 607 raise IatkException(message=message, error_code=error_code) 608 609 610 def retry_get_trace_tree_until(self, tracing_header: str, assertion_fn: Callable[[GetTraceTreeOutput], None], timeout_seconds: int = 30): 611 """ 612 function to retry get_trace_tree condition or timeout is met 613 614 IAM Permissions Needed 615 ---------------------- 616 xray:BatchGetTraces 617 618 Parameters 619 ---------- 620 trace_header: 621 x-ray trace header 622 assertion_fn : Callable[[GetTraceTreeOutput], bool] 623 Callable fuction that makes an assertion and raises an AssertionError if it fails 624 timeout_seconds : int 625 Timeout (in seconds) to stop the fetching 626 627 Returns 628 ------- 629 bool 630 True if the condition was met or false if the timeout is met 631 632 Raises 633 ------ 634 IatkException 635 When an exception occurs during get_trace_tree 636 """ 637 params = RetryGetTraceTreeUntilParams(tracing_header, assertion_fn, timeout_seconds) 638 @self.retry_until(assertion_fn=params.assertion_fn, timeout=params.timeout_seconds) 639 def fetch_trace_tree(): 640 try: 641 response = self._get_trace_tree( 642 params=GetTraceTreeParams(tracing_header=params.tracing_header), 643 caller="retry_get_trace_tree_until", 644 ) 645 return response 646 except IatkException as e: 647 if "trace not found" in str(e): 648 pass 649 raise RetryableException(e) 650 else: 651 raise IatkException(e, 500) 652 try: 653 response = fetch_trace_tree() 654 return response 655 except IatkException as e: 656 raise IatkException(e, 500) 657 658 659# Set up logging to ``/dev/null`` like a library is supposed to. 660# https://docs.python.org/3.3/howto/logging.html#configuring-logging-for-a-library 661class NullHandler(logging.Handler): 662 def emit(self, record): 663 pass 664 665 666logging.getLogger("aws_iatk").addHandler(NullHandler())
95@dataclass 96class AwsIatk: 97 """ 98 Creates and setups AWS Integrated Application Test Kit 99 100 Parameters 101 ---------- 102 region : str, optional 103 AWS Region used to interact with AWS 104 profile: str, optional 105 AWS Profile used to communicate with AWS resources 106 """ 107 region: Optional[str] = None 108 profile: Optional[str] = None 109 110 _iatk_binary_path = ( 111 pathlib.Path(__file__).parent.parent.joinpath("iatk_service", "iatk").absolute() 112 ) 113 114 def get_physical_id_from_stack( 115 self, logical_resource_id: str, stack_name: str 116 ) -> PhysicalIdFromStackOutput: 117 """ 118 Fetch a Phsyical Id from a Logical Id within an AWS CloudFormation stack 119 120 IAM Permissions Needed 121 ---------------------- 122 cloudformation:DescribeStackResources 123 124 Parameters 125 ---------- 126 logical_resource_id : str 127 Name of the Logical Id within the Stack to fetch 128 stack_name : str 129 Name of the CloudFormation Stack 130 131 Returns 132 ------- 133 PhysicalIdFromStackOutput 134 Data Class that holds the Phsyical Id of the resource 135 136 Raises 137 ------ 138 IatkException 139 When failed to fetch Phsyical Id 140 """ 141 params = PhysicalIdFromStackParams(logical_resource_id, stack_name) 142 payload = params.to_payload(self.region, self.profile) 143 response = self._invoke_iatk(payload) 144 output = PhysicalIdFromStackOutput(response) 145 LOG.debug(f"Physical id: {output.physical_id}, Logical id: {params.logical_resource_id}") 146 return output 147 148 def get_stack_outputs(self, stack_name: str, output_names: List[str]) -> GetStackOutputsOutput: 149 """ 150 Fetch Stack Outputs from an AWS CloudFormation stack 151 152 IAM Permissions Needed 153 ---------------------- 154 cloudformation:DescribeStacks 155 156 Parameters 157 ---------- 158 stack_name : str 159 Name of the Stack 160 output_names : List[str] 161 List of strings that represent the StackOutput Keys 162 163 Returns 164 ------- 165 GetStackOutputsOutput 166 Data Class that holds the Stack Outputs of the resource 167 168 Raises 169 ------ 170 IatkException 171 When failed to fetch Stack Outputs 172 """ 173 params = GetStackOutputsParams(stack_name, output_names) 174 payload = params.to_payload(self.region, self.profile) 175 response = self._invoke_iatk(payload) 176 output = GetStackOutputsOutput(response) 177 LOG.debug(f"Output: {output}") 178 return output 179 180 def add_listener(self, event_bus_name: str, rule_name: str, target_id: Optional[str] = None, tags: Optional[Dict[str, str]] = None) -> AddEbListenerOutput: 181 """ 182 Add Listener Resource to an AWS Event Bridge Bus to enable testing 183 184 IAM Permissions Needed 185 ---------------------- 186 events:DescribeEventBus 187 events:DescribeRule 188 events:PutRule 189 events:PutTargets 190 events:DeleteRule 191 events:RemoveTargets 192 events:TagResource 193 194 sqs:CreateQueue 195 sqs:GetQueueAttributes 196 sqs:GetQueueUrl 197 sqs:DeleteQueue 198 sqs:TagQueue 199 200 Parameters 201 ---------- 202 event_bus_name : str 203 Name of the AWS Event Bus 204 rule_name : str 205 Name of a Rule on the EventBus to replicate 206 target_id : str, optional 207 Target Id on the given rule to replicate 208 tags : Dict[str, str], optional 209 A key-value pair associated EventBridge rule. 210 211 Returns 212 ------- 213 AddEbListenerOutput 214 Data Class that holds the Listener created 215 216 Raises 217 ------ 218 IatkException 219 When failed to add listener 220 """ 221 params = AddEbListenerParams(event_bus_name, rule_name, target_id, tags) 222 payload = params.to_payload(self.region, self.profile) 223 response = self._invoke_iatk(payload) 224 output = AddEbListenerOutput(response) 225 LOG.debug(f"Output: {output}") 226 return output 227 228 def remove_listeners(self, ids: Optional[List[str]] = None, tag_filters: Optional[List[RemoveListeners_TagFilter]] = None) -> RemoveListenersOutput: 229 """ 230 Remove Listener Resource(s) from an AWS Event Bridge Bus 231 232 IAM Permissions Needed 233 ---------------------- 234 tag:GetResources 235 236 sqs:DeleteQueue 237 sqs:GetQueueUrl 238 sqs:GetQueueAttributes 239 sqs:ListQueueTags 240 241 events:DeleteRule 242 events:RemoveTargets 243 events:ListTargetsByRule 244 245 Parameters 246 ---------- 247 ids : List[str], optional 248 List of Listener Ids to remove, one of ids and tag_filters must be supplied 249 tag_filters : List[RemoveListeners_TagFilter], optional 250 List of RemoveListeners_TagFilter, one of ids and tag_filters must be supplied 251 252 Returns 253 ------- 254 RemoveListenersOutput 255 Data Class that holds the Listener(s) that were removed 256 257 Raises 258 ------ 259 IatkException 260 When failed to remove listener(s) 261 """ 262 params = RemoveListenersParams(ids, tag_filters) 263 payload = params.to_payload(self.region, self.profile) 264 response = self._invoke_iatk(payload) 265 output = RemoveListenersOutput(response) 266 LOG.debug(f"Output: {output}") 267 return output 268 269 def _poll_events(self, params: PollEventsParams, caller: str=None) -> PollEventsOutput: 270 """ 271 underlying implementation for poll_events and wait_until_event_matched 272 """ 273 payload = params.to_payload(self.region, self.profile) 274 response = self._invoke_iatk(payload, caller) 275 output = PollEventsOutput(response) 276 return output 277 278 def poll_events(self, listener_id: str, wait_time_seconds: int, max_number_of_messages: int) -> PollEventsOutput: 279 """ 280 Poll Events from a specific Listener 281 282 IAM Permissions Needed 283 ---------------------- 284 sqs:GetQueueUrl 285 sqs:ListQueueTags 286 sqs:ReceiveMessage 287 sqs:DeleteMessage 288 sqs:GetQueueAttributes 289 290 events:DescribeRule 291 292 Parameters 293 ---------- 294 listener_id : str 295 Id of the Listener that was created 296 wait_time_seconds : int 297 Time in seconds to wait for polling 298 max_number_of_messages : int 299 Max number of messages to poll 300 301 Returns 302 ------- 303 PollEventsOutput 304 Data Class that holds the Events captured by the Listener 305 306 Raises 307 ------ 308 IatkException 309 When failed to Poll Events 310 """ 311 params = PollEventsParams(listener_id, wait_time_seconds, max_number_of_messages) 312 output = self._poll_events(params) 313 LOG.debug(f"Output: {output}") 314 return output 315 316 def wait_until_event_matched(self, listener_id: str, assertion_fn: Callable[[str], None], timeout_seconds: int = 30) -> bool: 317 """ 318 Poll Events on a given Listener until a match is found or timeout met. 319 320 IAM Permissions Needed 321 ---------------------- 322 sqs:GetQueueUrl 323 sqs:ListQueueTags 324 sqs:ReceiveMessage 325 sqs:DeleteMessage 326 sqs:GetQueueAttributes 327 328 events:DescribeRule 329 330 Parameters 331 ---------- 332 listener_id : str 333 Id of the Listener that was created 334 assertion_fn : Callable[[str], bool] 335 Callable function that has an assertion and raises an AssertionError if it fails 336 timeout_seconds : int 337 Timeout (in seconds) to stop the polling 338 339 Returns 340 ------- 341 bool 342 True if the event was matched before timeout otherwise False 343 344 Raises 345 ------ 346 IatkException 347 When failed to Poll Events 348 """ 349 params = WaitUntilEventMatchedParams(listener_id, assertion_fn, timeout_seconds) 350 start = datetime.now() 351 elapsed = lambda _: (datetime.now() - start).total_seconds() 352 while elapsed(None) < params.timeout_seconds: 353 out = self._poll_events(params=params._poll_event_params, caller="wait_until_event_matched") 354 events = out.events 355 if events: 356 for event in events: 357 try: 358 params.assertion_fn(event) 359 LOG.debug("event matched") 360 return True 361 except AssertionError as e: 362 LOG.debug(f"Assertion failed: {e}") 363 364 LOG.debug(f"timeout after {params.timeout_seconds} seconds") 365 LOG.debug("no matching event found") 366 return False 367 368 def _get_trace_tree(self, params: GetTraceTreeParams, caller: str=None) -> GetTraceTreeOutput: 369 """ 370 underlying implementation for get_trace_tree and retry_get_trace_tree_until 371 """ 372 payload = params.to_payload(self.region, self.profile) 373 response = self._invoke_iatk(payload, caller) 374 output = GetTraceTreeOutput(response) 375 return output 376 377 def get_trace_tree( 378 self, tracing_header: str 379 ) -> GetTraceTreeOutput: 380 """ 381 Fetch the trace tree structure using the provided tracing_header 382 383 IAM Permissions Needed 384 ---------------------- 385 xray:BatchGetTraces 386 387 Parameters 388 ---------- 389 tracing_header : str 390 Trace header to get the trace tree 391 392 Returns 393 ------- 394 GetTraceTreeOutput 395 Data Class that holds the trace tree structure 396 397 Raises 398 ------ 399 IatkException 400 When failed to fetch a trace tree 401 """ 402 params = GetTraceTreeParams(tracing_header) 403 output = self._get_trace_tree(params) 404 return output 405 406 def _generate_barebone_event( 407 self, params: GenerateBareboneEventParams, 408 ) -> GenerateBareboneEventOutput: 409 payload = params.to_payload(self.region, self.profile) 410 response = self._invoke_iatk(payload) 411 output = GenerateBareboneEventOutput(response) 412 return output 413 414 def generate_mock_event( 415 self, 416 registry_name: Optional[str] = None, 417 schema_name: Optional[str] = None, 418 schema_version: Optional[str] = None, 419 event_ref: Optional[str] = None, 420 skip_optional: Optional[bool] = None, 421 contexts: Optional[List[Callable[[dict], dict]]] = None 422 ) -> GenerateMockEventOutput: 423 """ 424 Generate a mock event based on a schema from EventBridge Schema Registry 425 426 IAM Permissions Needed 427 ---------------------- 428 schemas:DescribeSchema 429 430 Parameters 431 ---------- 432 registry_name : str 433 name of the registry of the schema stored in EventBridge Schema Registry 434 schema_name : str 435 name of the schema stored in EventBridge Schema Registry 436 schema_version : str 437 version of the schema stored in EventBridge Schema Registry 438 event_ref : str 439 location to the event in the schema in json schema ref syntax, only applicable for openapi schema 440 skip_optional : bool 441 if set to true, do not generate optional fields 442 contexts : List[Callable[[dict], dict]] 443 a list of callables to apply context on the generated mock event 444 445 Returns 446 ------- 447 GenerateMockEventOutput 448 Data Class that holds the trace tree structure 449 450 Raises 451 ------ 452 IatkException 453 When failed to fetch a trace tree 454 """ 455 params = GenerateMockEventParams(registry_name, schema_name, schema_version, event_ref, skip_optional, contexts) 456 out = self._generate_barebone_event( 457 GenerateBareboneEventParams( 458 registry_name=params.registry_name, 459 schema_name=params.schema_name, 460 schema_version=params.schema_version, 461 event_ref=params.event_ref, 462 skip_optional=params.skip_optional, 463 ) 464 ) 465 event = out.event 466 467 if params.contexts is not None and type(params.contexts) == list: 468 event = self._apply_contexts(event, params.contexts) 469 470 return GenerateMockEventOutput(event) 471 472 def retry_until(self, assertion_fn, timeout = 10, retryable_exceptions = (RetryableException,)): 473 """ 474 Decorator function to retry until condition or timeout is met 475 476 IAM Permissions Needed 477 ---------------------- 478 479 Parameters 480 ---------- 481 assertion_fn: Callable[[any], None] 482 Callable function that has an assertion and raises an AssertionError if it fails 483 timeout: int or float 484 value that specifies how long the function will retry for until it times out 485 486 Returns 487 ------- 488 bool 489 True if the condition was met or false if the timeout is met 490 491 Raises 492 ------ 493 ValueException 494 When timeout is a negative number 495 TypeException 496 When timeout or condition is not a suitable type 497 """ 498 if(not(isinstance(timeout, int) or isinstance(timeout, float))): 499 raise TypeError("timeout must be an int or float") 500 elif(timeout < 0): 501 raise ValueError("timeout must not be a negative value") 502 if(not callable(assertion_fn)): 503 raise TypeError("condition is not a callable function") 504 def retry_until_decorator(func): 505 @wraps(func) 506 def _wrapper(*args, **kwargs): 507 start = datetime.now() 508 attempt = 1 509 delay = 0.05 510 elapsed = lambda _: (datetime.now() - start).total_seconds() 511 if timeout == 0: 512 elapsed = lambda _: -1 513 while elapsed(None) < timeout: 514 try: 515 output = func(*args, **kwargs) 516 except retryable_exceptions: 517 continue 518 try: 519 assertion_fn(output) 520 return True 521 except AssertionError as e: 522 LOG.debug(f"Assertion failed: {e}") 523 time.sleep(math.pow(2, attempt) * delay) 524 attempt += 1 525 LOG.debug(f"timeout after {timeout} seconds") 526 LOG.debug("condition not satisfied") 527 return False 528 return _wrapper 529 return retry_until_decorator 530 531 def _apply_contexts(self, generated_event: dict, callable_contexts: List[Callable]) -> dict: 532 """ 533 function for looping through provided functions, modifying the event as the client specifies 534 """ 535 for func in callable_contexts: 536 generated_event = func(generated_event) 537 try: 538 json.dumps(generated_event) 539 except TypeError: 540 raise IatkException(f"context applier {func.__name__} returns a non-JSON-serializable result", 400) 541 if generated_event is None: 542 raise IatkException("event is empty, make sure function returns a valid event", 404) 543 return generated_event 544 545 def patch_aws_client(self, client: "boto3.client", sampled = 1) -> "boto3.client": 546 """ 547 Patches boto3 client to register event to include generated x-ray trace id and sampling rule as part of request header before invoke/execution 548 549 Parameters 550 ---------- 551 params : boto3.client 552 boto3.client for specified aws service 553 sampled : int 554 value 0 to not sample the request or value 1 to sample the request 555 556 Returns 557 ------- 558 boto3.client 559 same client passed in the params with the event registered 560 """ 561 def _add_header(request, **kwargs): 562 trace_id_string= 'Root=;Sampled={}'.format(sampled) 563 564 request.headers.add_header('X-Amzn-Trace-Id', trace_id_string) 565 LOG.debug(f"Trace ID format: {trace_id_string}") 566 567 service_name = client.meta.service_model.service_name 568 event_string = 'before-sign.{}.*' 569 LOG.debug(f"service id: {client.meta.service_model.service_id}, service name: {service_name}") 570 571 client.meta.events.register(event_string.format(service_name), _add_header) 572 573 return client 574 575 @_log_duration 576 def _invoke_iatk(self, payload: Payload, caller: str=None) -> dict: 577 input_data = payload.dump_bytes(caller) 578 LOG.debug("payload: %s", input_data) 579 stdout_data = self._popen_iatk(input_data) 580 jsonrpc_data = stdout_data.decode("utf-8") 581 data_dict = json.loads(jsonrpc_data.strip()) 582 583 # Error is only returned in the response if one happened. Otherwise it is omitted 584 if data_dict.get("error", None): 585 message = data_dict.get("error", {}).get("message", "") 586 error_code = data_dict.get("error", {}).get("Code", 0) 587 raise IatkException(message=message, error_code=error_code) 588 589 return data_dict 590 591 def _popen_iatk(self, input: bytes, env_vars: Optional[dict]=None) -> bytes: 592 LOG.debug("calling iatk rpc with input %s", input) 593 p = Popen([self._iatk_binary_path], stdout=PIPE, stdin=PIPE, stderr=PIPE) 594 595 out, err = p.communicate(input=input) 596 for line in err.splitlines(): 597 iatk_service_logger.debug(line.decode()) 598 return out 599 600 def _raise_error_if_returned(self, output): 601 jsonrpc_data = output.decode("utf-8") 602 data_dict = json.loads(jsonrpc_data.strip()) 603 604 # Error is only returned in the response if one happened. Otherwise it is omitted 605 if data_dict.get("error", None): 606 message = data_dict.get("error", {}).get("message", "") 607 error_code = data_dict.get("error", {}).get("Code", 0) 608 raise IatkException(message=message, error_code=error_code) 609 610 611 def retry_get_trace_tree_until(self, tracing_header: str, assertion_fn: Callable[[GetTraceTreeOutput], None], timeout_seconds: int = 30): 612 """ 613 function to retry get_trace_tree condition or timeout is met 614 615 IAM Permissions Needed 616 ---------------------- 617 xray:BatchGetTraces 618 619 Parameters 620 ---------- 621 trace_header: 622 x-ray trace header 623 assertion_fn : Callable[[GetTraceTreeOutput], bool] 624 Callable fuction that makes an assertion and raises an AssertionError if it fails 625 timeout_seconds : int 626 Timeout (in seconds) to stop the fetching 627 628 Returns 629 ------- 630 bool 631 True if the condition was met or false if the timeout is met 632 633 Raises 634 ------ 635 IatkException 636 When an exception occurs during get_trace_tree 637 """ 638 params = RetryGetTraceTreeUntilParams(tracing_header, assertion_fn, timeout_seconds) 639 @self.retry_until(assertion_fn=params.assertion_fn, timeout=params.timeout_seconds) 640 def fetch_trace_tree(): 641 try: 642 response = self._get_trace_tree( 643 params=GetTraceTreeParams(tracing_header=params.tracing_header), 644 caller="retry_get_trace_tree_until", 645 ) 646 return response 647 except IatkException as e: 648 if "trace not found" in str(e): 649 pass 650 raise RetryableException(e) 651 else: 652 raise IatkException(e, 500) 653 try: 654 response = fetch_trace_tree() 655 return response 656 except IatkException as e: 657 raise IatkException(e, 500)
Creates and setups AWS Integrated Application Test Kit
Parameters
- region (str, optional): AWS Region used to interact with AWS
- profile (str, optional): AWS Profile used to communicate with AWS resources
114 def get_physical_id_from_stack( 115 self, logical_resource_id: str, stack_name: str 116 ) -> PhysicalIdFromStackOutput: 117 """ 118 Fetch a Phsyical Id from a Logical Id within an AWS CloudFormation stack 119 120 IAM Permissions Needed 121 ---------------------- 122 cloudformation:DescribeStackResources 123 124 Parameters 125 ---------- 126 logical_resource_id : str 127 Name of the Logical Id within the Stack to fetch 128 stack_name : str 129 Name of the CloudFormation Stack 130 131 Returns 132 ------- 133 PhysicalIdFromStackOutput 134 Data Class that holds the Phsyical Id of the resource 135 136 Raises 137 ------ 138 IatkException 139 When failed to fetch Phsyical Id 140 """ 141 params = PhysicalIdFromStackParams(logical_resource_id, stack_name) 142 payload = params.to_payload(self.region, self.profile) 143 response = self._invoke_iatk(payload) 144 output = PhysicalIdFromStackOutput(response) 145 LOG.debug(f"Physical id: {output.physical_id}, Logical id: {params.logical_resource_id}") 146 return output
Fetch a Phsyical Id from a Logical Id within an AWS CloudFormation stack
IAM Permissions Needed
cloudformation:DescribeStackResources
Parameters
- logical_resource_id (str): Name of the Logical Id within the Stack to fetch
- stack_name (str): Name of the CloudFormation Stack
Returns
- PhysicalIdFromStackOutput: Data Class that holds the Phsyical Id of the resource
Raises
- IatkException: When failed to fetch Phsyical Id
148 def get_stack_outputs(self, stack_name: str, output_names: List[str]) -> GetStackOutputsOutput: 149 """ 150 Fetch Stack Outputs from an AWS CloudFormation stack 151 152 IAM Permissions Needed 153 ---------------------- 154 cloudformation:DescribeStacks 155 156 Parameters 157 ---------- 158 stack_name : str 159 Name of the Stack 160 output_names : List[str] 161 List of strings that represent the StackOutput Keys 162 163 Returns 164 ------- 165 GetStackOutputsOutput 166 Data Class that holds the Stack Outputs of the resource 167 168 Raises 169 ------ 170 IatkException 171 When failed to fetch Stack Outputs 172 """ 173 params = GetStackOutputsParams(stack_name, output_names) 174 payload = params.to_payload(self.region, self.profile) 175 response = self._invoke_iatk(payload) 176 output = GetStackOutputsOutput(response) 177 LOG.debug(f"Output: {output}") 178 return output
Fetch Stack Outputs from an AWS CloudFormation stack
IAM Permissions Needed
cloudformation:DescribeStacks
Parameters
- stack_name (str): Name of the Stack
- output_names (List[str]): List of strings that represent the StackOutput Keys
Returns
- GetStackOutputsOutput: Data Class that holds the Stack Outputs of the resource
Raises
- IatkException: When failed to fetch Stack Outputs
180 def add_listener(self, event_bus_name: str, rule_name: str, target_id: Optional[str] = None, tags: Optional[Dict[str, str]] = None) -> AddEbListenerOutput: 181 """ 182 Add Listener Resource to an AWS Event Bridge Bus to enable testing 183 184 IAM Permissions Needed 185 ---------------------- 186 events:DescribeEventBus 187 events:DescribeRule 188 events:PutRule 189 events:PutTargets 190 events:DeleteRule 191 events:RemoveTargets 192 events:TagResource 193 194 sqs:CreateQueue 195 sqs:GetQueueAttributes 196 sqs:GetQueueUrl 197 sqs:DeleteQueue 198 sqs:TagQueue 199 200 Parameters 201 ---------- 202 event_bus_name : str 203 Name of the AWS Event Bus 204 rule_name : str 205 Name of a Rule on the EventBus to replicate 206 target_id : str, optional 207 Target Id on the given rule to replicate 208 tags : Dict[str, str], optional 209 A key-value pair associated EventBridge rule. 210 211 Returns 212 ------- 213 AddEbListenerOutput 214 Data Class that holds the Listener created 215 216 Raises 217 ------ 218 IatkException 219 When failed to add listener 220 """ 221 params = AddEbListenerParams(event_bus_name, rule_name, target_id, tags) 222 payload = params.to_payload(self.region, self.profile) 223 response = self._invoke_iatk(payload) 224 output = AddEbListenerOutput(response) 225 LOG.debug(f"Output: {output}") 226 return output
Add Listener Resource to an AWS Event Bridge Bus to enable testing
IAM Permissions Needed
events:DescribeEventBus events:DescribeRule events:PutRule events:PutTargets events:DeleteRule events:RemoveTargets events:TagResource
sqs:CreateQueue sqs:GetQueueAttributes sqs:GetQueueUrl sqs:DeleteQueue sqs:TagQueue
Parameters
- event_bus_name (str): Name of the AWS Event Bus
- rule_name (str): Name of a Rule on the EventBus to replicate
- target_id (str, optional): Target Id on the given rule to replicate
- tags (Dict[str, str], optional): A key-value pair associated EventBridge rule.
Returns
- AddEbListenerOutput: Data Class that holds the Listener created
Raises
- IatkException: When failed to add listener
228 def remove_listeners(self, ids: Optional[List[str]] = None, tag_filters: Optional[List[RemoveListeners_TagFilter]] = None) -> RemoveListenersOutput: 229 """ 230 Remove Listener Resource(s) from an AWS Event Bridge Bus 231 232 IAM Permissions Needed 233 ---------------------- 234 tag:GetResources 235 236 sqs:DeleteQueue 237 sqs:GetQueueUrl 238 sqs:GetQueueAttributes 239 sqs:ListQueueTags 240 241 events:DeleteRule 242 events:RemoveTargets 243 events:ListTargetsByRule 244 245 Parameters 246 ---------- 247 ids : List[str], optional 248 List of Listener Ids to remove, one of ids and tag_filters must be supplied 249 tag_filters : List[RemoveListeners_TagFilter], optional 250 List of RemoveListeners_TagFilter, one of ids and tag_filters must be supplied 251 252 Returns 253 ------- 254 RemoveListenersOutput 255 Data Class that holds the Listener(s) that were removed 256 257 Raises 258 ------ 259 IatkException 260 When failed to remove listener(s) 261 """ 262 params = RemoveListenersParams(ids, tag_filters) 263 payload = params.to_payload(self.region, self.profile) 264 response = self._invoke_iatk(payload) 265 output = RemoveListenersOutput(response) 266 LOG.debug(f"Output: {output}") 267 return output
Remove Listener Resource(s) from an AWS Event Bridge Bus
IAM Permissions Needed
tag:GetResources
sqs:DeleteQueue sqs:GetQueueUrl sqs:GetQueueAttributes sqs:ListQueueTags
events:DeleteRule events:RemoveTargets events:ListTargetsByRule
Parameters
- ids (List[str], optional): List of Listener Ids to remove, one of ids and tag_filters must be supplied
- tag_filters (List[RemoveListeners_TagFilter], optional): List of RemoveListeners_TagFilter, one of ids and tag_filters must be supplied
Returns
- RemoveListenersOutput: Data Class that holds the Listener(s) that were removed
Raises
- IatkException: When failed to remove listener(s)
278 def poll_events(self, listener_id: str, wait_time_seconds: int, max_number_of_messages: int) -> PollEventsOutput: 279 """ 280 Poll Events from a specific Listener 281 282 IAM Permissions Needed 283 ---------------------- 284 sqs:GetQueueUrl 285 sqs:ListQueueTags 286 sqs:ReceiveMessage 287 sqs:DeleteMessage 288 sqs:GetQueueAttributes 289 290 events:DescribeRule 291 292 Parameters 293 ---------- 294 listener_id : str 295 Id of the Listener that was created 296 wait_time_seconds : int 297 Time in seconds to wait for polling 298 max_number_of_messages : int 299 Max number of messages to poll 300 301 Returns 302 ------- 303 PollEventsOutput 304 Data Class that holds the Events captured by the Listener 305 306 Raises 307 ------ 308 IatkException 309 When failed to Poll Events 310 """ 311 params = PollEventsParams(listener_id, wait_time_seconds, max_number_of_messages) 312 output = self._poll_events(params) 313 LOG.debug(f"Output: {output}") 314 return output
Poll Events from a specific Listener
IAM Permissions Needed
sqs:GetQueueUrl sqs:ListQueueTags sqs:ReceiveMessage sqs:DeleteMessage sqs:GetQueueAttributes
events:DescribeRule
Parameters
- listener_id (str): Id of the Listener that was created
- wait_time_seconds (int): Time in seconds to wait for polling
- max_number_of_messages (int): Max number of messages to poll
Returns
- PollEventsOutput: Data Class that holds the Events captured by the Listener
Raises
- IatkException: When failed to Poll Events
316 def wait_until_event_matched(self, listener_id: str, assertion_fn: Callable[[str], None], timeout_seconds: int = 30) -> bool: 317 """ 318 Poll Events on a given Listener until a match is found or timeout met. 319 320 IAM Permissions Needed 321 ---------------------- 322 sqs:GetQueueUrl 323 sqs:ListQueueTags 324 sqs:ReceiveMessage 325 sqs:DeleteMessage 326 sqs:GetQueueAttributes 327 328 events:DescribeRule 329 330 Parameters 331 ---------- 332 listener_id : str 333 Id of the Listener that was created 334 assertion_fn : Callable[[str], bool] 335 Callable function that has an assertion and raises an AssertionError if it fails 336 timeout_seconds : int 337 Timeout (in seconds) to stop the polling 338 339 Returns 340 ------- 341 bool 342 True if the event was matched before timeout otherwise False 343 344 Raises 345 ------ 346 IatkException 347 When failed to Poll Events 348 """ 349 params = WaitUntilEventMatchedParams(listener_id, assertion_fn, timeout_seconds) 350 start = datetime.now() 351 elapsed = lambda _: (datetime.now() - start).total_seconds() 352 while elapsed(None) < params.timeout_seconds: 353 out = self._poll_events(params=params._poll_event_params, caller="wait_until_event_matched") 354 events = out.events 355 if events: 356 for event in events: 357 try: 358 params.assertion_fn(event) 359 LOG.debug("event matched") 360 return True 361 except AssertionError as e: 362 LOG.debug(f"Assertion failed: {e}") 363 364 LOG.debug(f"timeout after {params.timeout_seconds} seconds") 365 LOG.debug("no matching event found") 366 return False
Poll Events on a given Listener until a match is found or timeout met.
IAM Permissions Needed
sqs:GetQueueUrl sqs:ListQueueTags sqs:ReceiveMessage sqs:DeleteMessage sqs:GetQueueAttributes
events:DescribeRule
Parameters
- listener_id (str): Id of the Listener that was created
- assertion_fn (Callable[[str], bool]): Callable function that has an assertion and raises an AssertionError if it fails
- timeout_seconds (int): Timeout (in seconds) to stop the polling
Returns
- bool: True if the event was matched before timeout otherwise False
Raises
- IatkException: When failed to Poll Events
377 def get_trace_tree( 378 self, tracing_header: str 379 ) -> GetTraceTreeOutput: 380 """ 381 Fetch the trace tree structure using the provided tracing_header 382 383 IAM Permissions Needed 384 ---------------------- 385 xray:BatchGetTraces 386 387 Parameters 388 ---------- 389 tracing_header : str 390 Trace header to get the trace tree 391 392 Returns 393 ------- 394 GetTraceTreeOutput 395 Data Class that holds the trace tree structure 396 397 Raises 398 ------ 399 IatkException 400 When failed to fetch a trace tree 401 """ 402 params = GetTraceTreeParams(tracing_header) 403 output = self._get_trace_tree(params) 404 return output
Fetch the trace tree structure using the provided tracing_header
IAM Permissions Needed
xray:BatchGetTraces
Parameters
- tracing_header (str): Trace header to get the trace tree
Returns
- GetTraceTreeOutput: Data Class that holds the trace tree structure
Raises
- IatkException: When failed to fetch a trace tree
414 def generate_mock_event( 415 self, 416 registry_name: Optional[str] = None, 417 schema_name: Optional[str] = None, 418 schema_version: Optional[str] = None, 419 event_ref: Optional[str] = None, 420 skip_optional: Optional[bool] = None, 421 contexts: Optional[List[Callable[[dict], dict]]] = None 422 ) -> GenerateMockEventOutput: 423 """ 424 Generate a mock event based on a schema from EventBridge Schema Registry 425 426 IAM Permissions Needed 427 ---------------------- 428 schemas:DescribeSchema 429 430 Parameters 431 ---------- 432 registry_name : str 433 name of the registry of the schema stored in EventBridge Schema Registry 434 schema_name : str 435 name of the schema stored in EventBridge Schema Registry 436 schema_version : str 437 version of the schema stored in EventBridge Schema Registry 438 event_ref : str 439 location to the event in the schema in json schema ref syntax, only applicable for openapi schema 440 skip_optional : bool 441 if set to true, do not generate optional fields 442 contexts : List[Callable[[dict], dict]] 443 a list of callables to apply context on the generated mock event 444 445 Returns 446 ------- 447 GenerateMockEventOutput 448 Data Class that holds the trace tree structure 449 450 Raises 451 ------ 452 IatkException 453 When failed to fetch a trace tree 454 """ 455 params = GenerateMockEventParams(registry_name, schema_name, schema_version, event_ref, skip_optional, contexts) 456 out = self._generate_barebone_event( 457 GenerateBareboneEventParams( 458 registry_name=params.registry_name, 459 schema_name=params.schema_name, 460 schema_version=params.schema_version, 461 event_ref=params.event_ref, 462 skip_optional=params.skip_optional, 463 ) 464 ) 465 event = out.event 466 467 if params.contexts is not None and type(params.contexts) == list: 468 event = self._apply_contexts(event, params.contexts) 469 470 return GenerateMockEventOutput(event)
Generate a mock event based on a schema from EventBridge Schema Registry
IAM Permissions Needed
schemas:DescribeSchema
Parameters
- registry_name (str): name of the registry of the schema stored in EventBridge Schema Registry
- schema_name (str): name of the schema stored in EventBridge Schema Registry
- schema_version (str): version of the schema stored in EventBridge Schema Registry
- event_ref (str): location to the event in the schema in json schema ref syntax, only applicable for openapi schema
- skip_optional (bool): if set to true, do not generate optional fields
- contexts (List[Callable[[dict], dict]]): a list of callables to apply context on the generated mock event
Returns
- GenerateMockEventOutput: Data Class that holds the trace tree structure
Raises
- IatkException: When failed to fetch a trace tree
472 def retry_until(self, assertion_fn, timeout = 10, retryable_exceptions = (RetryableException,)): 473 """ 474 Decorator function to retry until condition or timeout is met 475 476 IAM Permissions Needed 477 ---------------------- 478 479 Parameters 480 ---------- 481 assertion_fn: Callable[[any], None] 482 Callable function that has an assertion and raises an AssertionError if it fails 483 timeout: int or float 484 value that specifies how long the function will retry for until it times out 485 486 Returns 487 ------- 488 bool 489 True if the condition was met or false if the timeout is met 490 491 Raises 492 ------ 493 ValueException 494 When timeout is a negative number 495 TypeException 496 When timeout or condition is not a suitable type 497 """ 498 if(not(isinstance(timeout, int) or isinstance(timeout, float))): 499 raise TypeError("timeout must be an int or float") 500 elif(timeout < 0): 501 raise ValueError("timeout must not be a negative value") 502 if(not callable(assertion_fn)): 503 raise TypeError("condition is not a callable function") 504 def retry_until_decorator(func): 505 @wraps(func) 506 def _wrapper(*args, **kwargs): 507 start = datetime.now() 508 attempt = 1 509 delay = 0.05 510 elapsed = lambda _: (datetime.now() - start).total_seconds() 511 if timeout == 0: 512 elapsed = lambda _: -1 513 while elapsed(None) < timeout: 514 try: 515 output = func(*args, **kwargs) 516 except retryable_exceptions: 517 continue 518 try: 519 assertion_fn(output) 520 return True 521 except AssertionError as e: 522 LOG.debug(f"Assertion failed: {e}") 523 time.sleep(math.pow(2, attempt) * delay) 524 attempt += 1 525 LOG.debug(f"timeout after {timeout} seconds") 526 LOG.debug("condition not satisfied") 527 return False 528 return _wrapper 529 return retry_until_decorator
Decorator function to retry until condition or timeout is met
IAM Permissions Needed
Parameters
- assertion_fn (Callable[[any], None]): Callable function that has an assertion and raises an AssertionError if it fails
- timeout (int or float): value that specifies how long the function will retry for until it times out
Returns
- bool: True if the condition was met or false if the timeout is met
Raises
- ValueException: When timeout is a negative number
- TypeException: When timeout or condition is not a suitable type
545 def patch_aws_client(self, client: "boto3.client", sampled = 1) -> "boto3.client": 546 """ 547 Patches boto3 client to register event to include generated x-ray trace id and sampling rule as part of request header before invoke/execution 548 549 Parameters 550 ---------- 551 params : boto3.client 552 boto3.client for specified aws service 553 sampled : int 554 value 0 to not sample the request or value 1 to sample the request 555 556 Returns 557 ------- 558 boto3.client 559 same client passed in the params with the event registered 560 """ 561 def _add_header(request, **kwargs): 562 trace_id_string= 'Root=;Sampled={}'.format(sampled) 563 564 request.headers.add_header('X-Amzn-Trace-Id', trace_id_string) 565 LOG.debug(f"Trace ID format: {trace_id_string}") 566 567 service_name = client.meta.service_model.service_name 568 event_string = 'before-sign.{}.*' 569 LOG.debug(f"service id: {client.meta.service_model.service_id}, service name: {service_name}") 570 571 client.meta.events.register(event_string.format(service_name), _add_header) 572 573 return client
Patches boto3 client to register event to include generated x-ray trace id and sampling rule as part of request header before invoke/execution
Parameters
- params (boto3.client): boto3.client for specified aws service
- sampled (int): value 0 to not sample the request or value 1 to sample the request
Returns
- boto3.client: same client passed in the params with the event registered
611 def retry_get_trace_tree_until(self, tracing_header: str, assertion_fn: Callable[[GetTraceTreeOutput], None], timeout_seconds: int = 30): 612 """ 613 function to retry get_trace_tree condition or timeout is met 614 615 IAM Permissions Needed 616 ---------------------- 617 xray:BatchGetTraces 618 619 Parameters 620 ---------- 621 trace_header: 622 x-ray trace header 623 assertion_fn : Callable[[GetTraceTreeOutput], bool] 624 Callable fuction that makes an assertion and raises an AssertionError if it fails 625 timeout_seconds : int 626 Timeout (in seconds) to stop the fetching 627 628 Returns 629 ------- 630 bool 631 True if the condition was met or false if the timeout is met 632 633 Raises 634 ------ 635 IatkException 636 When an exception occurs during get_trace_tree 637 """ 638 params = RetryGetTraceTreeUntilParams(tracing_header, assertion_fn, timeout_seconds) 639 @self.retry_until(assertion_fn=params.assertion_fn, timeout=params.timeout_seconds) 640 def fetch_trace_tree(): 641 try: 642 response = self._get_trace_tree( 643 params=GetTraceTreeParams(tracing_header=params.tracing_header), 644 caller="retry_get_trace_tree_until", 645 ) 646 return response 647 except IatkException as e: 648 if "trace not found" in str(e): 649 pass 650 raise RetryableException(e) 651 else: 652 raise IatkException(e, 500) 653 try: 654 response = fetch_trace_tree() 655 return response 656 except IatkException as e: 657 raise IatkException(e, 500)
function to retry get_trace_tree condition or timeout is met
IAM Permissions Needed
xray:BatchGetTraces
Parameters
- trace_header:: x-ray trace header
- assertion_fn (Callable[[GetTraceTreeOutput], bool]): Callable fuction that makes an assertion and raises an AssertionError if it fails
- timeout_seconds (int): Timeout (in seconds) to stop the fetching
Returns
- bool: True if the condition was met or false if the timeout is met
Raises
- IatkException: When an exception occurs during get_trace_tree
85class IatkException(Exception): 86 def __init__(self, message, error_code) -> None: 87 super().__init__(message) 88 89 self.error_code = error_code
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
- add_note
- args
14@dataclass 15class PhysicalIdFromStackOutput: 16 """ 17 AwsIatk.get_physical_id_from_stack output 18 19 Parameters 20 ---------- 21 physical_id : str 22 Physical Id of the Resource requested 23 """ 24 physical_id: str 25 26 def __init__(self, data_dict) -> None: 27 self.physical_id = data_dict.get("result", {}).get("output", "")
AwsIatk.get_physical_id_from_stack output
Parameters
- physical_id (str): Physical Id of the Resource requested
15@dataclass 16class GetStackOutputsOutput: 17 """ 18 AwsIatk.get_stack_outputs output 19 20 Parameters 21 ---------- 22 outputs : Dict[str, str] 23 Dictionary of keys being the StackOutput Key and value 24 being the StackOutput Value 25 """ 26 outputs: Dict[str, str] 27 28 def __init__(self, data_dict: dict) -> None: 29 self.outputs = data_dict.get("result", {}).get("output", {})
AwsIatk.get_stack_outputs output
Parameters
- outputs (Dict[str, str]): Dictionary of keys being the StackOutput Key and value being the StackOutput Value
41@dataclass 42class AddEbListenerOutput: 43 """ 44 AwsIatk.add_listener output 45 46 Parameters 47 ---------- 48 id : str 49 Id that corresponds to the listener created 50 target_under_test : AddEbListener_Resource 51 Target Resource that test resources were added 52 components : List[AddEbListener_Resource] 53 List of all Resources created to support the listener 54 on the `target_under_test` 55 """ 56 id: str 57 target_under_test: AddEbListener_Resource 58 components: List[AddEbListener_Resource] 59 60 def __init__(self, data_dict: dict) -> None: 61 output = data_dict.get("result", {}).get("output", {}) 62 self.id = output.get("Id", "") 63 self.target_under_test = AddEbListener_Resource( 64 output.get("TargetUnderTest", {}) 65 ) 66 self.components = [] 67 for component in output.get("Components", []): 68 self.components.append(AddEbListener_Resource(component))
AwsIatk.add_listener output
Parameters
- id (str): Id that corresponds to the listener created
- target_under_test (AddEbListener_Resource): Target Resource that test resources were added
- components (List[AddEbListener_Resource]):
List of all Resources created to support the listener
on the
target_under_test
60 def __init__(self, data_dict: dict) -> None: 61 output = data_dict.get("result", {}).get("output", {}) 62 self.id = output.get("Id", "") 63 self.target_under_test = AddEbListener_Resource( 64 output.get("TargetUnderTest", {}) 65 ) 66 self.components = [] 67 for component in output.get("Components", []): 68 self.components.append(AddEbListener_Resource(component))
15@dataclass 16class RemoveListenersOutput: 17 """ 18 AwsIatk.remove_listeners Output 19 20 Parameters 21 ---------- 22 message : str 23 Message indicates whether or not the remove succeeded. 24 """ 25 message: str 26 27 def __init__(self, data_dict: dict) -> None: 28 self.message = data_dict.get("result", {}).get("output", "")
AwsIatk.remove_listeners Output
Parameters
- message (str): Message indicates whether or not the remove succeeded.
31@dataclass 32class RemoveListeners_TagFilter: 33 """ 34 Tag filters 35 36 Parameters 37 ---------- 38 key : str 39 One part of a key-value pair that makes up a tag. A key is a general label that acts like a category for more specific tag values. 40 values : List[str], optional 41 One part of a key-value pair that make up a tag. A value acts as a descriptor within a tag category (key). The value can be empty or null. 42 """ 43 key: str 44 values: Optional[List[str]] = None 45 46 def to_dict(self) -> dict: 47 d = { 48 "Key": self.key, 49 } 50 if self.values: 51 d["Values"] = self.values 52 return d
Tag filters
Parameters
- key (str): One part of a key-value pair that makes up a tag. A key is a general label that acts like a category for more specific tag values.
- values (List[str], optional): One part of a key-value pair that make up a tag. A value acts as a descriptor within a tag category (key). The value can be empty or null.
15@dataclass 16class PollEventsOutput: 17 """ 18 AwsIatk.poll_events Output 19 20 Parameters 21 ---------- 22 events : List[str] 23 List of event found 24 """ 25 events: List[str] 26 27 def __init__(self, data_dict) -> None: 28 output = data_dict.get("result", {}).get("output", []) 29 self.events = output
15@dataclass 16class GetTraceTreeOutput: 17 """ 18 AwsIatk.get_trace_tree output 19 20 Parameters 21 ---------- 22 trace_tree : Tree 23 Trace tree structure of the provided trace id 24 """ 25 trace_tree: Tree 26 27 def __init__(self, data_dict) -> None: 28 trace_tree_output = data_dict.get("result", {}).get("output", {}) 29 self.trace_tree = Tree(trace_tree_output)
AwsIatk.get_trace_tree output
Parameters
- trace_tree (Tree): Trace tree structure of the provided trace id
16@dataclass 17class GenerateBareboneEventOutput: 18 """ 19 AwsIatk.generate_barebone_event output 20 21 Parameters 22 ---------- 23 event : dict 24 mock event 25 """ 26 event: dict 27 28 def __init__(self, data_dict: dict) -> None: 29 event = data_dict.get("result", {}).get("output") 30 self.event = json.loads(event) if event else None
AwsIatk.generate_barebone_event output
Parameters
- event (dict): mock event