aws_iatk

  1# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3
  4from subprocess import Popen, PIPE
  5from datetime import datetime
  6import pathlib
  7import json
  8import logging
  9from dataclasses import dataclass
 10from functools import wraps
 11from typing import TYPE_CHECKING, Optional, Dict, List, Callable
 12import time
 13import math
 14
 15from .get_physical_id_from_stack import (
 16    PhysicalIdFromStackOutput,
 17    PhysicalIdFromStackParams,
 18)
 19from .get_stack_outputs import (
 20    GetStackOutputsOutput,
 21    GetStackOutputsParams,
 22)
 23from .add_eb_listener import (
 24    AddEbListenerOutput,
 25    AddEbListenerParams,
 26)
 27from .remove_listeners import (
 28    RemoveListenersOutput,
 29    RemoveListenersParams,
 30    RemoveListeners_TagFilter,
 31)
 32from .poll_events import (
 33    PollEventsOutput,
 34    PollEventsParams,
 35    WaitUntilEventMatchedParams,
 36)
 37from .get_trace_tree import (
 38    GetTraceTreeOutput,
 39    GetTraceTreeParams
 40)
 41from .retry_xray_trace import (
 42    RetryGetTraceTreeUntilParams,
 43)
 44from .generate_mock_event import (
 45    GenerateBareboneEventOutput,
 46    GenerateBareboneEventParams,
 47    GenerateMockEventOutput,
 48    GenerateMockEventParams,
 49)
 50from .jsonrpc import Payload
 51
 52if TYPE_CHECKING:
 53    import boto3
 54
 55__all__ = [
 56    "AwsIatk", 
 57    "IatkException", 
 58    "PhysicalIdFromStackOutput", 
 59    "GetStackOutputsOutput", 
 60    "AddEbListenerOutput", 
 61    "RemoveListenersOutput",
 62    "RemoveListeners_TagFilter",
 63    "PollEventsOutput",
 64    "GetTraceTreeOutput",
 65    "GenerateBareboneEventOutput",
 66    "GenerateMockEventOutput",
 67]
 68
 69LOG = logging.getLogger(__name__)
 70iatk_service_logger = logging.getLogger("iatk.service")
 71
 72
 73def _log_duration(func):
 74    @wraps(func)
 75    def wrapper(*args, **kwargs):
 76        start = datetime.now()
 77        ret = func(*args, **kwargs)
 78        LOG.debug("elapsed: %s seconds", (datetime.now() - start).total_seconds())
 79        return ret
 80
 81    return wrapper
 82
 83
 84class IatkException(Exception):
 85    def __init__(self, message, error_code) -> None:
 86        super().__init__(message)
 87
 88        self.error_code = error_code
 89
 90class RetryableException(Exception):
 91    def __init__(self, message) -> None:
 92        super().__init__(message)
 93
 94@dataclass
 95class AwsIatk:
 96    """
 97    Creates and setups AWS Integrated Application Test Kit
 98
 99    Parameters
100    ----------
101    region : str, optional
102        AWS Region used to interact with AWS
103    profile: str, optional
104        AWS Profile used to communicate with AWS resources
105    """
106    region: Optional[str] = None
107    profile: Optional[str] = None
108
109    _iatk_binary_path = (
110        pathlib.Path(__file__).parent.parent.joinpath("iatk_service", "iatk").absolute()
111    )
112
113    def get_physical_id_from_stack(
114        self, logical_resource_id: str, stack_name: str
115    ) -> PhysicalIdFromStackOutput:
116        """
117        Fetch a Phsyical Id from a Logical Id within an AWS CloudFormation stack
118
119        IAM Permissions Needed
120        ----------------------
121        cloudformation:DescribeStackResources
122
123        Parameters
124        ----------
125        logical_resource_id : str
126            Name of the Logical Id within the Stack to fetch
127        stack_name : str
128            Name of the CloudFormation Stack
129        
130        Returns
131        -------
132        PhysicalIdFromStackOutput
133            Data Class that holds the Phsyical Id of the resource
134
135        Raises
136        ------
137        IatkException
138            When failed to fetch Phsyical Id
139        """
140        params = PhysicalIdFromStackParams(logical_resource_id, stack_name)
141        payload = params.to_payload(self.region, self.profile)
142        response = self._invoke_iatk(payload)
143        output = PhysicalIdFromStackOutput(response)
144        LOG.debug(f"Physical id: {output.physical_id}, Logical id: {params.logical_resource_id}")
145        return output
146
147    def get_stack_outputs(self, stack_name: str, output_names: List[str]) -> GetStackOutputsOutput:
148        """
149        Fetch Stack Outputs from an AWS CloudFormation stack
150
151        IAM Permissions Needed
152        ----------------------
153        cloudformation:DescribeStacks
154
155        Parameters
156        ----------
157        stack_name : str
158            Name of the Stack
159        output_names : List[str] 
160            List of strings that represent the StackOutput Keys   
161        
162        Returns
163        -------
164        GetStackOutputsOutput
165            Data Class that holds the Stack Outputs of the resource
166
167        Raises
168        ------
169        IatkException
170            When failed to fetch Stack Outputs
171        """
172        params = GetStackOutputsParams(stack_name, output_names)
173        payload = params.to_payload(self.region, self.profile)
174        response = self._invoke_iatk(payload)
175        output = GetStackOutputsOutput(response)
176        LOG.debug(f"Output: {output}")
177        return output
178
179    def add_listener(self, event_bus_name: str, rule_name: str, target_id: Optional[str] = None, tags: Optional[Dict[str, str]] = None) -> AddEbListenerOutput:
180        """
181        Add Listener Resource to an AWS Event Bridge Bus to enable testing
182
183        IAM Permissions Needed
184        ----------------------
185        events:DescribeEventBus
186        events:DescribeRule
187        events:PutRule
188        events:PutTargets
189        events:DeleteRule
190        events:RemoveTargets
191        events:TagResource
192
193        sqs:CreateQueue
194        sqs:GetQueueAttributes
195        sqs:GetQueueUrl
196        sqs:DeleteQueue
197        sqs:TagQueue
198
199        Parameters
200        ----------
201        event_bus_name : str
202            Name of the AWS Event Bus
203        rule_name : str
204            Name of a Rule on the EventBus to replicate
205        target_id : str, optional
206            Target Id on the given rule to replicate
207        tags : Dict[str, str], optional
208            A key-value pair associated EventBridge rule.
209        
210        Returns
211        -------
212        AddEbListenerOutput
213            Data Class that holds the Listener created
214
215        Raises
216        ------
217        IatkException
218            When failed to add listener
219        """
220        params = AddEbListenerParams(event_bus_name, rule_name, target_id, tags)
221        payload = params.to_payload(self.region, self.profile)
222        response = self._invoke_iatk(payload)
223        output = AddEbListenerOutput(response)
224        LOG.debug(f"Output: {output}")
225        return output
226
227    def remove_listeners(self, ids: Optional[List[str]] = None, tag_filters: Optional[List[RemoveListeners_TagFilter]] = None) -> RemoveListenersOutput:
228        """
229        Remove Listener Resource(s) from an AWS Event Bridge Bus
230
231        IAM Permissions Needed
232        ----------------------
233        tag:GetResources
234        
235        sqs:DeleteQueue
236        sqs:GetQueueUrl
237        sqs:GetQueueAttributes
238        sqs:ListQueueTags
239
240        events:DeleteRule
241        events:RemoveTargets
242        events:ListTargetsByRule
243        
244        Parameters
245        ----------
246        ids : List[str], optional
247            List of Listener Ids to remove, one of ids and tag_filters must be supplied
248        tag_filters : List[RemoveListeners_TagFilter], optional
249            List of RemoveListeners_TagFilter, one of ids and tag_filters must be supplied
250        
251        Returns
252        -------
253        RemoveListenersOutput
254            Data Class that holds the Listener(s) that were removed
255
256        Raises
257        ------
258        IatkException
259            When failed to remove listener(s)
260        """
261        params = RemoveListenersParams(ids, tag_filters)
262        payload = params.to_payload(self.region, self.profile)
263        response = self._invoke_iatk(payload)
264        output = RemoveListenersOutput(response)
265        LOG.debug(f"Output: {output}")
266        return output
267
268    def _poll_events(self, params: PollEventsParams, caller: str=None) -> PollEventsOutput:
269        """
270        underlying implementation for poll_events and wait_until_event_matched
271        """
272        payload = params.to_payload(self.region, self.profile)
273        response = self._invoke_iatk(payload, caller)
274        output = PollEventsOutput(response)
275        return output
276
277    def poll_events(self, listener_id: str, wait_time_seconds: int, max_number_of_messages: int) -> PollEventsOutput:
278        """
279        Poll Events from a specific Listener
280
281        IAM Permissions Needed
282        ----------------------
283        sqs:GetQueueUrl
284        sqs:ListQueueTags
285        sqs:ReceiveMessage
286        sqs:DeleteMessage
287        sqs:GetQueueAttributes
288
289        events:DescribeRule
290
291        Parameters
292        ----------
293        listener_id : str
294            Id of the Listener that was created
295        wait_time_seconds : int
296            Time in seconds to wait for polling
297        max_number_of_messages : int
298            Max number of messages to poll
299        
300        Returns
301        -------
302        PollEventsOutput
303            Data Class that holds the Events captured by the Listener
304
305        Raises
306        ------
307        IatkException
308            When failed to Poll Events
309        """
310        params = PollEventsParams(listener_id, wait_time_seconds, max_number_of_messages)
311        output = self._poll_events(params)
312        LOG.debug(f"Output: {output}")
313        return output
314
315    def wait_until_event_matched(self, listener_id: str, assertion_fn: Callable[[str], None], timeout_seconds: int = 30) -> bool:
316        """
317        Poll Events on a given Listener until a match is found or timeout met.
318
319        IAM Permissions Needed
320        ----------------------
321        sqs:GetQueueUrl
322        sqs:ListQueueTags
323        sqs:ReceiveMessage
324        sqs:DeleteMessage
325        sqs:GetQueueAttributes
326
327        events:DescribeRule
328        
329        Parameters
330        ----------
331        listener_id : str
332            Id of the Listener that was created
333        assertion_fn : Callable[[str], bool]
334            Callable function that has an assertion and raises an AssertionError if it fails
335        timeout_seconds : int
336            Timeout (in seconds) to stop the polling
337        
338        Returns
339        -------
340        bool
341            True if the event was matched before timeout otherwise False
342
343        Raises
344        ------
345        IatkException
346            When failed to Poll Events
347        """
348        params = WaitUntilEventMatchedParams(listener_id, assertion_fn, timeout_seconds)
349        start = datetime.now()
350        elapsed = lambda _: (datetime.now() - start).total_seconds()
351        while elapsed(None) < params.timeout_seconds:
352            out = self._poll_events(params=params._poll_event_params, caller="wait_until_event_matched")
353            events = out.events
354            if events:
355                for event in events:
356                    try: 
357                        params.assertion_fn(event)
358                        LOG.debug("event matched")
359                        return True
360                    except AssertionError as e:
361                        LOG.debug(f"Assertion failed: {e}")
362                        
363        LOG.debug(f"timeout after {params.timeout_seconds} seconds")
364        LOG.debug("no matching event found")
365        return False
366
367    def _get_trace_tree(self, params: GetTraceTreeParams, caller: str=None) -> GetTraceTreeOutput:
368        """
369        underlying implementation for get_trace_tree and retry_get_trace_tree_until
370        """
371        payload = params.to_payload(self.region, self.profile)
372        response = self._invoke_iatk(payload, caller)
373        output = GetTraceTreeOutput(response)
374        return output
375
376    def get_trace_tree(
377        self, tracing_header: str
378    ) -> GetTraceTreeOutput:
379        """
380        Fetch the trace tree structure using the provided tracing_header
381
382        IAM Permissions Needed
383        ----------------------
384        xray:BatchGetTraces
385        
386        Parameters
387        ----------
388        tracing_header : str
389            Trace header to get the trace tree
390        
391        Returns
392        -------
393        GetTraceTreeOutput
394            Data Class that holds the trace tree structure
395
396        Raises
397        ------
398        IatkException
399            When failed to fetch a trace tree
400        """
401        params = GetTraceTreeParams(tracing_header)
402        output = self._get_trace_tree(params)
403        return output
404
405    def _generate_barebone_event(
406        self, params: GenerateBareboneEventParams,
407    ) -> GenerateBareboneEventOutput:
408        payload = params.to_payload(self.region, self.profile)
409        response = self._invoke_iatk(payload)
410        output = GenerateBareboneEventOutput(response)
411        return output
412
413    def generate_mock_event(
414        self, 
415        registry_name: Optional[str] = None, 
416        schema_name: Optional[str] = None,
417        schema_version: Optional[str] = None,
418        event_ref: Optional[str] = None,
419        skip_optional: Optional[bool] = None,
420        contexts: Optional[List[Callable[[dict], dict]]] = None
421    ) -> GenerateMockEventOutput:
422        """
423        Generate a mock event based on a schema from EventBridge Schema Registry
424
425        IAM Permissions Needed
426        ----------------------
427        schemas:DescribeSchema
428        
429        Parameters
430        ----------
431        registry_name : str
432            name of the registry of the schema stored in EventBridge Schema Registry
433        schema_name : str
434            name of the schema stored in EventBridge Schema Registry
435        schema_version : str
436            version of the schema stored in EventBridge Schema Registry
437        event_ref : str
438            location to the event in the schema in json schema ref syntax, only applicable for openapi schema
439        skip_optional : bool
440            if set to true, do not generate optional fields
441        contexts : List[Callable[[dict], dict]]
442            a list of callables to apply context on the generated mock event
443        
444        Returns
445        -------
446        GenerateMockEventOutput
447            Data Class that holds the trace tree structure
448
449        Raises
450        ------
451        IatkException
452            When failed to fetch a trace tree
453        """
454        params = GenerateMockEventParams(registry_name, schema_name, schema_version, event_ref, skip_optional, contexts)
455        out = self._generate_barebone_event(
456            GenerateBareboneEventParams(
457                registry_name=params.registry_name,
458                schema_name=params.schema_name,
459                schema_version=params.schema_version,
460                event_ref=params.event_ref,
461                skip_optional=params.skip_optional,
462            )
463        )
464        event = out.event
465
466        if params.contexts is not None and type(params.contexts) == list:
467            event = self._apply_contexts(event, params.contexts)
468
469        return GenerateMockEventOutput(event)
470    
471    def retry_until(self, assertion_fn, timeout = 10, retryable_exceptions = (RetryableException,)):
472        """
473        Decorator function to retry until condition or timeout is met
474
475        IAM Permissions Needed
476        ----------------------
477        
478        Parameters
479        ----------
480        assertion_fn: Callable[[any], None]
481            Callable function that has an assertion and raises an AssertionError if it fails
482        timeout: int or float
483            value that specifies how long the function will retry for until it times out
484        
485        Returns
486        -------
487        bool
488            True if the condition was met or false if the timeout is met
489
490        Raises
491        ------
492        ValueException
493            When timeout is a negative number
494        TypeException
495            When timeout or condition is not a suitable type
496        """
497        if(not(isinstance(timeout, int) or  isinstance(timeout, float))):
498            raise TypeError("timeout must be an int or float")
499        elif(timeout < 0):
500            raise ValueError("timeout must not be a negative value")
501        if(not callable(assertion_fn)):
502            raise TypeError("condition is not a callable function")
503        def retry_until_decorator(func):
504            @wraps(func)
505            def _wrapper(*args, **kwargs):
506                start = datetime.now()
507                attempt = 1
508                delay = 0.05
509                elapsed = lambda _: (datetime.now() - start).total_seconds()
510                if timeout == 0:
511                    elapsed = lambda _: -1
512                while elapsed(None) < timeout:
513                    try:
514                        output = func(*args, **kwargs)
515                    except retryable_exceptions:
516                        continue
517                    try:
518                        assertion_fn(output)
519                        return True
520                    except AssertionError as e:
521                        LOG.debug(f"Assertion failed: {e}")
522                    time.sleep(math.pow(2, attempt) * delay)
523                    attempt += 1
524                LOG.debug(f"timeout after {timeout} seconds")
525                LOG.debug("condition not satisfied")
526                return False
527            return _wrapper
528        return retry_until_decorator
529
530    def _apply_contexts(self, generated_event: dict, callable_contexts: List[Callable]) -> dict:
531        """
532        function for looping through provided functions, modifying the event as the client specifies
533        """
534        for func in callable_contexts:
535            generated_event = func(generated_event)
536            try:
537                json.dumps(generated_event)
538            except TypeError:
539                raise IatkException(f"context applier {func.__name__} returns a non-JSON-serializable result", 400)
540        if generated_event is None:
541            raise IatkException("event is empty, make sure function returns a valid event", 404)
542        return generated_event
543        
544    def patch_aws_client(self, client: "boto3.client", sampled = 1) -> "boto3.client":
545        """
546        Patches boto3 client to register event to include generated x-ray trace id and sampling rule as part of request header before invoke/execution
547
548        Parameters
549        ----------
550        params : boto3.client
551            boto3.client for specified aws service
552        sampled : int
553            value 0 to not sample the request or value 1 to sample the request
554        
555        Returns
556        -------
557        boto3.client
558            same client passed in the params with the event registered
559        """
560        def _add_header(request, **kwargs):
561            trace_id_string= 'Root=;Sampled={}'.format(sampled)
562            
563            request.headers.add_header('X-Amzn-Trace-Id', trace_id_string)
564            LOG.debug(f"Trace ID format: {trace_id_string}")
565
566        service_name = client.meta.service_model.service_name
567        event_string = 'before-sign.{}.*'
568        LOG.debug(f"service id: {client.meta.service_model.service_id}, service name: {service_name}")
569        
570        client.meta.events.register(event_string.format(service_name), _add_header)
571        
572        return client
573
574    @_log_duration
575    def _invoke_iatk(self, payload: Payload, caller: str=None) -> dict:
576        input_data = payload.dump_bytes(caller)
577        LOG.debug("payload: %s", input_data)
578        stdout_data = self._popen_iatk(input_data)
579        jsonrpc_data = stdout_data.decode("utf-8")
580        data_dict = json.loads(jsonrpc_data.strip())
581
582        # Error is only returned in the response if one happened. Otherwise it is omitted
583        if data_dict.get("error", None):
584            message = data_dict.get("error", {}).get("message", "")
585            error_code = data_dict.get("error", {}).get("Code", 0)
586            raise IatkException(message=message, error_code=error_code)
587        
588        return data_dict
589
590    def _popen_iatk(self, input: bytes, env_vars: Optional[dict]=None) -> bytes:
591        LOG.debug("calling iatk rpc with input %s", input)
592        p = Popen([self._iatk_binary_path], stdout=PIPE, stdin=PIPE, stderr=PIPE)
593
594        out, err = p.communicate(input=input)
595        for line in err.splitlines():
596            iatk_service_logger.debug(line.decode())
597        return out
598
599    def _raise_error_if_returned(self, output):
600        jsonrpc_data = output.decode("utf-8")
601        data_dict = json.loads(jsonrpc_data.strip())
602
603        # Error is only returned in the response if one happened. Otherwise it is omitted
604        if data_dict.get("error", None):
605            message = data_dict.get("error", {}).get("message", "")
606            error_code = data_dict.get("error", {}).get("Code", 0)
607            raise IatkException(message=message, error_code=error_code)
608
609        
610    def retry_get_trace_tree_until(self, tracing_header: str, assertion_fn: Callable[[GetTraceTreeOutput], None], timeout_seconds: int = 30):
611        """
612        function to retry get_trace_tree condition or timeout is met
613
614        IAM Permissions Needed
615        ----------------------
616        xray:BatchGetTraces
617        
618        Parameters
619        ----------
620        trace_header:
621            x-ray trace header
622        assertion_fn : Callable[[GetTraceTreeOutput], bool]
623            Callable fuction that makes an assertion and raises an AssertionError if it fails
624        timeout_seconds : int
625            Timeout (in seconds) to stop the fetching
626        
627        Returns
628        -------
629        bool
630            True if the condition was met or false if the timeout is met
631
632        Raises
633        ------
634        IatkException
635            When an exception occurs during get_trace_tree
636        """
637        params = RetryGetTraceTreeUntilParams(tracing_header, assertion_fn, timeout_seconds)
638        @self.retry_until(assertion_fn=params.assertion_fn, timeout=params.timeout_seconds)
639        def fetch_trace_tree():
640            try:
641                response = self._get_trace_tree(
642                    params=GetTraceTreeParams(tracing_header=params.tracing_header),
643                    caller="retry_get_trace_tree_until",
644                )
645                return response
646            except IatkException as e:
647                 if "trace not found" in str(e):
648                    pass
649                    raise RetryableException(e)
650                 else:
651                    raise IatkException(e, 500)
652        try:
653            response = fetch_trace_tree()
654            return response
655        except IatkException as e:
656            raise IatkException(e, 500)
657
658
659# Set up logging to ``/dev/null`` like a library is supposed to.
660# https://docs.python.org/3.3/howto/logging.html#configuring-logging-for-a-library
661class NullHandler(logging.Handler):
662    def emit(self, record):
663        pass
664
665
666logging.getLogger("aws_iatk").addHandler(NullHandler())
@dataclass
class AwsIatk:
 95@dataclass
 96class AwsIatk:
 97    """
 98    Creates and setups AWS Integrated Application Test Kit
 99
100    Parameters
101    ----------
102    region : str, optional
103        AWS Region used to interact with AWS
104    profile: str, optional
105        AWS Profile used to communicate with AWS resources
106    """
107    region: Optional[str] = None
108    profile: Optional[str] = None
109
110    _iatk_binary_path = (
111        pathlib.Path(__file__).parent.parent.joinpath("iatk_service", "iatk").absolute()
112    )
113
114    def get_physical_id_from_stack(
115        self, logical_resource_id: str, stack_name: str
116    ) -> PhysicalIdFromStackOutput:
117        """
118        Fetch a Phsyical Id from a Logical Id within an AWS CloudFormation stack
119
120        IAM Permissions Needed
121        ----------------------
122        cloudformation:DescribeStackResources
123
124        Parameters
125        ----------
126        logical_resource_id : str
127            Name of the Logical Id within the Stack to fetch
128        stack_name : str
129            Name of the CloudFormation Stack
130        
131        Returns
132        -------
133        PhysicalIdFromStackOutput
134            Data Class that holds the Phsyical Id of the resource
135
136        Raises
137        ------
138        IatkException
139            When failed to fetch Phsyical Id
140        """
141        params = PhysicalIdFromStackParams(logical_resource_id, stack_name)
142        payload = params.to_payload(self.region, self.profile)
143        response = self._invoke_iatk(payload)
144        output = PhysicalIdFromStackOutput(response)
145        LOG.debug(f"Physical id: {output.physical_id}, Logical id: {params.logical_resource_id}")
146        return output
147
148    def get_stack_outputs(self, stack_name: str, output_names: List[str]) -> GetStackOutputsOutput:
149        """
150        Fetch Stack Outputs from an AWS CloudFormation stack
151
152        IAM Permissions Needed
153        ----------------------
154        cloudformation:DescribeStacks
155
156        Parameters
157        ----------
158        stack_name : str
159            Name of the Stack
160        output_names : List[str] 
161            List of strings that represent the StackOutput Keys   
162        
163        Returns
164        -------
165        GetStackOutputsOutput
166            Data Class that holds the Stack Outputs of the resource
167
168        Raises
169        ------
170        IatkException
171            When failed to fetch Stack Outputs
172        """
173        params = GetStackOutputsParams(stack_name, output_names)
174        payload = params.to_payload(self.region, self.profile)
175        response = self._invoke_iatk(payload)
176        output = GetStackOutputsOutput(response)
177        LOG.debug(f"Output: {output}")
178        return output
179
180    def add_listener(self, event_bus_name: str, rule_name: str, target_id: Optional[str] = None, tags: Optional[Dict[str, str]] = None) -> AddEbListenerOutput:
181        """
182        Add Listener Resource to an AWS Event Bridge Bus to enable testing
183
184        IAM Permissions Needed
185        ----------------------
186        events:DescribeEventBus
187        events:DescribeRule
188        events:PutRule
189        events:PutTargets
190        events:DeleteRule
191        events:RemoveTargets
192        events:TagResource
193
194        sqs:CreateQueue
195        sqs:GetQueueAttributes
196        sqs:GetQueueUrl
197        sqs:DeleteQueue
198        sqs:TagQueue
199
200        Parameters
201        ----------
202        event_bus_name : str
203            Name of the AWS Event Bus
204        rule_name : str
205            Name of a Rule on the EventBus to replicate
206        target_id : str, optional
207            Target Id on the given rule to replicate
208        tags : Dict[str, str], optional
209            A key-value pair associated EventBridge rule.
210        
211        Returns
212        -------
213        AddEbListenerOutput
214            Data Class that holds the Listener created
215
216        Raises
217        ------
218        IatkException
219            When failed to add listener
220        """
221        params = AddEbListenerParams(event_bus_name, rule_name, target_id, tags)
222        payload = params.to_payload(self.region, self.profile)
223        response = self._invoke_iatk(payload)
224        output = AddEbListenerOutput(response)
225        LOG.debug(f"Output: {output}")
226        return output
227
228    def remove_listeners(self, ids: Optional[List[str]] = None, tag_filters: Optional[List[RemoveListeners_TagFilter]] = None) -> RemoveListenersOutput:
229        """
230        Remove Listener Resource(s) from an AWS Event Bridge Bus
231
232        IAM Permissions Needed
233        ----------------------
234        tag:GetResources
235        
236        sqs:DeleteQueue
237        sqs:GetQueueUrl
238        sqs:GetQueueAttributes
239        sqs:ListQueueTags
240
241        events:DeleteRule
242        events:RemoveTargets
243        events:ListTargetsByRule
244        
245        Parameters
246        ----------
247        ids : List[str], optional
248            List of Listener Ids to remove, one of ids and tag_filters must be supplied
249        tag_filters : List[RemoveListeners_TagFilter], optional
250            List of RemoveListeners_TagFilter, one of ids and tag_filters must be supplied
251        
252        Returns
253        -------
254        RemoveListenersOutput
255            Data Class that holds the Listener(s) that were removed
256
257        Raises
258        ------
259        IatkException
260            When failed to remove listener(s)
261        """
262        params = RemoveListenersParams(ids, tag_filters)
263        payload = params.to_payload(self.region, self.profile)
264        response = self._invoke_iatk(payload)
265        output = RemoveListenersOutput(response)
266        LOG.debug(f"Output: {output}")
267        return output
268
269    def _poll_events(self, params: PollEventsParams, caller: str=None) -> PollEventsOutput:
270        """
271        underlying implementation for poll_events and wait_until_event_matched
272        """
273        payload = params.to_payload(self.region, self.profile)
274        response = self._invoke_iatk(payload, caller)
275        output = PollEventsOutput(response)
276        return output
277
278    def poll_events(self, listener_id: str, wait_time_seconds: int, max_number_of_messages: int) -> PollEventsOutput:
279        """
280        Poll Events from a specific Listener
281
282        IAM Permissions Needed
283        ----------------------
284        sqs:GetQueueUrl
285        sqs:ListQueueTags
286        sqs:ReceiveMessage
287        sqs:DeleteMessage
288        sqs:GetQueueAttributes
289
290        events:DescribeRule
291
292        Parameters
293        ----------
294        listener_id : str
295            Id of the Listener that was created
296        wait_time_seconds : int
297            Time in seconds to wait for polling
298        max_number_of_messages : int
299            Max number of messages to poll
300        
301        Returns
302        -------
303        PollEventsOutput
304            Data Class that holds the Events captured by the Listener
305
306        Raises
307        ------
308        IatkException
309            When failed to Poll Events
310        """
311        params = PollEventsParams(listener_id, wait_time_seconds, max_number_of_messages)
312        output = self._poll_events(params)
313        LOG.debug(f"Output: {output}")
314        return output
315
316    def wait_until_event_matched(self, listener_id: str, assertion_fn: Callable[[str], None], timeout_seconds: int = 30) -> bool:
317        """
318        Poll Events on a given Listener until a match is found or timeout met.
319
320        IAM Permissions Needed
321        ----------------------
322        sqs:GetQueueUrl
323        sqs:ListQueueTags
324        sqs:ReceiveMessage
325        sqs:DeleteMessage
326        sqs:GetQueueAttributes
327
328        events:DescribeRule
329        
330        Parameters
331        ----------
332        listener_id : str
333            Id of the Listener that was created
334        assertion_fn : Callable[[str], bool]
335            Callable function that has an assertion and raises an AssertionError if it fails
336        timeout_seconds : int
337            Timeout (in seconds) to stop the polling
338        
339        Returns
340        -------
341        bool
342            True if the event was matched before timeout otherwise False
343
344        Raises
345        ------
346        IatkException
347            When failed to Poll Events
348        """
349        params = WaitUntilEventMatchedParams(listener_id, assertion_fn, timeout_seconds)
350        start = datetime.now()
351        elapsed = lambda _: (datetime.now() - start).total_seconds()
352        while elapsed(None) < params.timeout_seconds:
353            out = self._poll_events(params=params._poll_event_params, caller="wait_until_event_matched")
354            events = out.events
355            if events:
356                for event in events:
357                    try: 
358                        params.assertion_fn(event)
359                        LOG.debug("event matched")
360                        return True
361                    except AssertionError as e:
362                        LOG.debug(f"Assertion failed: {e}")
363                        
364        LOG.debug(f"timeout after {params.timeout_seconds} seconds")
365        LOG.debug("no matching event found")
366        return False
367
368    def _get_trace_tree(self, params: GetTraceTreeParams, caller: str=None) -> GetTraceTreeOutput:
369        """
370        underlying implementation for get_trace_tree and retry_get_trace_tree_until
371        """
372        payload = params.to_payload(self.region, self.profile)
373        response = self._invoke_iatk(payload, caller)
374        output = GetTraceTreeOutput(response)
375        return output
376
377    def get_trace_tree(
378        self, tracing_header: str
379    ) -> GetTraceTreeOutput:
380        """
381        Fetch the trace tree structure using the provided tracing_header
382
383        IAM Permissions Needed
384        ----------------------
385        xray:BatchGetTraces
386        
387        Parameters
388        ----------
389        tracing_header : str
390            Trace header to get the trace tree
391        
392        Returns
393        -------
394        GetTraceTreeOutput
395            Data Class that holds the trace tree structure
396
397        Raises
398        ------
399        IatkException
400            When failed to fetch a trace tree
401        """
402        params = GetTraceTreeParams(tracing_header)
403        output = self._get_trace_tree(params)
404        return output
405
406    def _generate_barebone_event(
407        self, params: GenerateBareboneEventParams,
408    ) -> GenerateBareboneEventOutput:
409        payload = params.to_payload(self.region, self.profile)
410        response = self._invoke_iatk(payload)
411        output = GenerateBareboneEventOutput(response)
412        return output
413
414    def generate_mock_event(
415        self, 
416        registry_name: Optional[str] = None, 
417        schema_name: Optional[str] = None,
418        schema_version: Optional[str] = None,
419        event_ref: Optional[str] = None,
420        skip_optional: Optional[bool] = None,
421        contexts: Optional[List[Callable[[dict], dict]]] = None
422    ) -> GenerateMockEventOutput:
423        """
424        Generate a mock event based on a schema from EventBridge Schema Registry
425
426        IAM Permissions Needed
427        ----------------------
428        schemas:DescribeSchema
429        
430        Parameters
431        ----------
432        registry_name : str
433            name of the registry of the schema stored in EventBridge Schema Registry
434        schema_name : str
435            name of the schema stored in EventBridge Schema Registry
436        schema_version : str
437            version of the schema stored in EventBridge Schema Registry
438        event_ref : str
439            location to the event in the schema in json schema ref syntax, only applicable for openapi schema
440        skip_optional : bool
441            if set to true, do not generate optional fields
442        contexts : List[Callable[[dict], dict]]
443            a list of callables to apply context on the generated mock event
444        
445        Returns
446        -------
447        GenerateMockEventOutput
448            Data Class that holds the trace tree structure
449
450        Raises
451        ------
452        IatkException
453            When failed to fetch a trace tree
454        """
455        params = GenerateMockEventParams(registry_name, schema_name, schema_version, event_ref, skip_optional, contexts)
456        out = self._generate_barebone_event(
457            GenerateBareboneEventParams(
458                registry_name=params.registry_name,
459                schema_name=params.schema_name,
460                schema_version=params.schema_version,
461                event_ref=params.event_ref,
462                skip_optional=params.skip_optional,
463            )
464        )
465        event = out.event
466
467        if params.contexts is not None and type(params.contexts) == list:
468            event = self._apply_contexts(event, params.contexts)
469
470        return GenerateMockEventOutput(event)
471    
472    def retry_until(self, assertion_fn, timeout = 10, retryable_exceptions = (RetryableException,)):
473        """
474        Decorator function to retry until condition or timeout is met
475
476        IAM Permissions Needed
477        ----------------------
478        
479        Parameters
480        ----------
481        assertion_fn: Callable[[any], None]
482            Callable function that has an assertion and raises an AssertionError if it fails
483        timeout: int or float
484            value that specifies how long the function will retry for until it times out
485        
486        Returns
487        -------
488        bool
489            True if the condition was met or false if the timeout is met
490
491        Raises
492        ------
493        ValueException
494            When timeout is a negative number
495        TypeException
496            When timeout or condition is not a suitable type
497        """
498        if(not(isinstance(timeout, int) or  isinstance(timeout, float))):
499            raise TypeError("timeout must be an int or float")
500        elif(timeout < 0):
501            raise ValueError("timeout must not be a negative value")
502        if(not callable(assertion_fn)):
503            raise TypeError("condition is not a callable function")
504        def retry_until_decorator(func):
505            @wraps(func)
506            def _wrapper(*args, **kwargs):
507                start = datetime.now()
508                attempt = 1
509                delay = 0.05
510                elapsed = lambda _: (datetime.now() - start).total_seconds()
511                if timeout == 0:
512                    elapsed = lambda _: -1
513                while elapsed(None) < timeout:
514                    try:
515                        output = func(*args, **kwargs)
516                    except retryable_exceptions:
517                        continue
518                    try:
519                        assertion_fn(output)
520                        return True
521                    except AssertionError as e:
522                        LOG.debug(f"Assertion failed: {e}")
523                    time.sleep(math.pow(2, attempt) * delay)
524                    attempt += 1
525                LOG.debug(f"timeout after {timeout} seconds")
526                LOG.debug("condition not satisfied")
527                return False
528            return _wrapper
529        return retry_until_decorator
530
531    def _apply_contexts(self, generated_event: dict, callable_contexts: List[Callable]) -> dict:
532        """
533        function for looping through provided functions, modifying the event as the client specifies
534        """
535        for func in callable_contexts:
536            generated_event = func(generated_event)
537            try:
538                json.dumps(generated_event)
539            except TypeError:
540                raise IatkException(f"context applier {func.__name__} returns a non-JSON-serializable result", 400)
541        if generated_event is None:
542            raise IatkException("event is empty, make sure function returns a valid event", 404)
543        return generated_event
544        
545    def patch_aws_client(self, client: "boto3.client", sampled = 1) -> "boto3.client":
546        """
547        Patches boto3 client to register event to include generated x-ray trace id and sampling rule as part of request header before invoke/execution
548
549        Parameters
550        ----------
551        params : boto3.client
552            boto3.client for specified aws service
553        sampled : int
554            value 0 to not sample the request or value 1 to sample the request
555        
556        Returns
557        -------
558        boto3.client
559            same client passed in the params with the event registered
560        """
561        def _add_header(request, **kwargs):
562            trace_id_string= 'Root=;Sampled={}'.format(sampled)
563            
564            request.headers.add_header('X-Amzn-Trace-Id', trace_id_string)
565            LOG.debug(f"Trace ID format: {trace_id_string}")
566
567        service_name = client.meta.service_model.service_name
568        event_string = 'before-sign.{}.*'
569        LOG.debug(f"service id: {client.meta.service_model.service_id}, service name: {service_name}")
570        
571        client.meta.events.register(event_string.format(service_name), _add_header)
572        
573        return client
574
575    @_log_duration
576    def _invoke_iatk(self, payload: Payload, caller: str=None) -> dict:
577        input_data = payload.dump_bytes(caller)
578        LOG.debug("payload: %s", input_data)
579        stdout_data = self._popen_iatk(input_data)
580        jsonrpc_data = stdout_data.decode("utf-8")
581        data_dict = json.loads(jsonrpc_data.strip())
582
583        # Error is only returned in the response if one happened. Otherwise it is omitted
584        if data_dict.get("error", None):
585            message = data_dict.get("error", {}).get("message", "")
586            error_code = data_dict.get("error", {}).get("Code", 0)
587            raise IatkException(message=message, error_code=error_code)
588        
589        return data_dict
590
591    def _popen_iatk(self, input: bytes, env_vars: Optional[dict]=None) -> bytes:
592        LOG.debug("calling iatk rpc with input %s", input)
593        p = Popen([self._iatk_binary_path], stdout=PIPE, stdin=PIPE, stderr=PIPE)
594
595        out, err = p.communicate(input=input)
596        for line in err.splitlines():
597            iatk_service_logger.debug(line.decode())
598        return out
599
600    def _raise_error_if_returned(self, output):
601        jsonrpc_data = output.decode("utf-8")
602        data_dict = json.loads(jsonrpc_data.strip())
603
604        # Error is only returned in the response if one happened. Otherwise it is omitted
605        if data_dict.get("error", None):
606            message = data_dict.get("error", {}).get("message", "")
607            error_code = data_dict.get("error", {}).get("Code", 0)
608            raise IatkException(message=message, error_code=error_code)
609
610        
611    def retry_get_trace_tree_until(self, tracing_header: str, assertion_fn: Callable[[GetTraceTreeOutput], None], timeout_seconds: int = 30):
612        """
613        function to retry get_trace_tree condition or timeout is met
614
615        IAM Permissions Needed
616        ----------------------
617        xray:BatchGetTraces
618        
619        Parameters
620        ----------
621        trace_header:
622            x-ray trace header
623        assertion_fn : Callable[[GetTraceTreeOutput], bool]
624            Callable fuction that makes an assertion and raises an AssertionError if it fails
625        timeout_seconds : int
626            Timeout (in seconds) to stop the fetching
627        
628        Returns
629        -------
630        bool
631            True if the condition was met or false if the timeout is met
632
633        Raises
634        ------
635        IatkException
636            When an exception occurs during get_trace_tree
637        """
638        params = RetryGetTraceTreeUntilParams(tracing_header, assertion_fn, timeout_seconds)
639        @self.retry_until(assertion_fn=params.assertion_fn, timeout=params.timeout_seconds)
640        def fetch_trace_tree():
641            try:
642                response = self._get_trace_tree(
643                    params=GetTraceTreeParams(tracing_header=params.tracing_header),
644                    caller="retry_get_trace_tree_until",
645                )
646                return response
647            except IatkException as e:
648                 if "trace not found" in str(e):
649                    pass
650                    raise RetryableException(e)
651                 else:
652                    raise IatkException(e, 500)
653        try:
654            response = fetch_trace_tree()
655            return response
656        except IatkException as e:
657            raise IatkException(e, 500)

Creates and setups AWS Integrated Application Test Kit

Parameters
  • region (str, optional): AWS Region used to interact with AWS
  • profile (str, optional): AWS Profile used to communicate with AWS resources
AwsIatk(region: Optional[str] = None, profile: Optional[str] = None)
region: Optional[str] = None
profile: Optional[str] = None
def get_physical_id_from_stack( self, logical_resource_id: str, stack_name: str) -> PhysicalIdFromStackOutput:
114    def get_physical_id_from_stack(
115        self, logical_resource_id: str, stack_name: str
116    ) -> PhysicalIdFromStackOutput:
117        """
118        Fetch a Phsyical Id from a Logical Id within an AWS CloudFormation stack
119
120        IAM Permissions Needed
121        ----------------------
122        cloudformation:DescribeStackResources
123
124        Parameters
125        ----------
126        logical_resource_id : str
127            Name of the Logical Id within the Stack to fetch
128        stack_name : str
129            Name of the CloudFormation Stack
130        
131        Returns
132        -------
133        PhysicalIdFromStackOutput
134            Data Class that holds the Phsyical Id of the resource
135
136        Raises
137        ------
138        IatkException
139            When failed to fetch Phsyical Id
140        """
141        params = PhysicalIdFromStackParams(logical_resource_id, stack_name)
142        payload = params.to_payload(self.region, self.profile)
143        response = self._invoke_iatk(payload)
144        output = PhysicalIdFromStackOutput(response)
145        LOG.debug(f"Physical id: {output.physical_id}, Logical id: {params.logical_resource_id}")
146        return output

Fetch a Phsyical Id from a Logical Id within an AWS CloudFormation stack

IAM Permissions Needed

cloudformation:DescribeStackResources

Parameters
  • logical_resource_id (str): Name of the Logical Id within the Stack to fetch
  • stack_name (str): Name of the CloudFormation Stack
Returns
  • PhysicalIdFromStackOutput: Data Class that holds the Phsyical Id of the resource
Raises
  • IatkException: When failed to fetch Phsyical Id
def get_stack_outputs( self, stack_name: str, output_names: List[str]) -> GetStackOutputsOutput:
148    def get_stack_outputs(self, stack_name: str, output_names: List[str]) -> GetStackOutputsOutput:
149        """
150        Fetch Stack Outputs from an AWS CloudFormation stack
151
152        IAM Permissions Needed
153        ----------------------
154        cloudformation:DescribeStacks
155
156        Parameters
157        ----------
158        stack_name : str
159            Name of the Stack
160        output_names : List[str] 
161            List of strings that represent the StackOutput Keys   
162        
163        Returns
164        -------
165        GetStackOutputsOutput
166            Data Class that holds the Stack Outputs of the resource
167
168        Raises
169        ------
170        IatkException
171            When failed to fetch Stack Outputs
172        """
173        params = GetStackOutputsParams(stack_name, output_names)
174        payload = params.to_payload(self.region, self.profile)
175        response = self._invoke_iatk(payload)
176        output = GetStackOutputsOutput(response)
177        LOG.debug(f"Output: {output}")
178        return output

Fetch Stack Outputs from an AWS CloudFormation stack

IAM Permissions Needed

cloudformation:DescribeStacks

Parameters
  • stack_name (str): Name of the Stack
  • output_names (List[str]): List of strings that represent the StackOutput Keys
Returns
  • GetStackOutputsOutput: Data Class that holds the Stack Outputs of the resource
Raises
  • IatkException: When failed to fetch Stack Outputs
def add_listener( self, event_bus_name: str, rule_name: str, target_id: Optional[str] = None, tags: Optional[Dict[str, str]] = None) -> AddEbListenerOutput:
180    def add_listener(self, event_bus_name: str, rule_name: str, target_id: Optional[str] = None, tags: Optional[Dict[str, str]] = None) -> AddEbListenerOutput:
181        """
182        Add Listener Resource to an AWS Event Bridge Bus to enable testing
183
184        IAM Permissions Needed
185        ----------------------
186        events:DescribeEventBus
187        events:DescribeRule
188        events:PutRule
189        events:PutTargets
190        events:DeleteRule
191        events:RemoveTargets
192        events:TagResource
193
194        sqs:CreateQueue
195        sqs:GetQueueAttributes
196        sqs:GetQueueUrl
197        sqs:DeleteQueue
198        sqs:TagQueue
199
200        Parameters
201        ----------
202        event_bus_name : str
203            Name of the AWS Event Bus
204        rule_name : str
205            Name of a Rule on the EventBus to replicate
206        target_id : str, optional
207            Target Id on the given rule to replicate
208        tags : Dict[str, str], optional
209            A key-value pair associated EventBridge rule.
210        
211        Returns
212        -------
213        AddEbListenerOutput
214            Data Class that holds the Listener created
215
216        Raises
217        ------
218        IatkException
219            When failed to add listener
220        """
221        params = AddEbListenerParams(event_bus_name, rule_name, target_id, tags)
222        payload = params.to_payload(self.region, self.profile)
223        response = self._invoke_iatk(payload)
224        output = AddEbListenerOutput(response)
225        LOG.debug(f"Output: {output}")
226        return output

Add Listener Resource to an AWS Event Bridge Bus to enable testing

IAM Permissions Needed

events:DescribeEventBus events:DescribeRule events:PutRule events:PutTargets events:DeleteRule events:RemoveTargets events:TagResource

sqs:CreateQueue sqs:GetQueueAttributes sqs:GetQueueUrl sqs:DeleteQueue sqs:TagQueue

Parameters
  • event_bus_name (str): Name of the AWS Event Bus
  • rule_name (str): Name of a Rule on the EventBus to replicate
  • target_id (str, optional): Target Id on the given rule to replicate
  • tags (Dict[str, str], optional): A key-value pair associated EventBridge rule.
Returns
  • AddEbListenerOutput: Data Class that holds the Listener created
Raises
  • IatkException: When failed to add listener
def remove_listeners( self, ids: Optional[List[str]] = None, tag_filters: Optional[List[RemoveListeners_TagFilter]] = None) -> RemoveListenersOutput:
228    def remove_listeners(self, ids: Optional[List[str]] = None, tag_filters: Optional[List[RemoveListeners_TagFilter]] = None) -> RemoveListenersOutput:
229        """
230        Remove Listener Resource(s) from an AWS Event Bridge Bus
231
232        IAM Permissions Needed
233        ----------------------
234        tag:GetResources
235        
236        sqs:DeleteQueue
237        sqs:GetQueueUrl
238        sqs:GetQueueAttributes
239        sqs:ListQueueTags
240
241        events:DeleteRule
242        events:RemoveTargets
243        events:ListTargetsByRule
244        
245        Parameters
246        ----------
247        ids : List[str], optional
248            List of Listener Ids to remove, one of ids and tag_filters must be supplied
249        tag_filters : List[RemoveListeners_TagFilter], optional
250            List of RemoveListeners_TagFilter, one of ids and tag_filters must be supplied
251        
252        Returns
253        -------
254        RemoveListenersOutput
255            Data Class that holds the Listener(s) that were removed
256
257        Raises
258        ------
259        IatkException
260            When failed to remove listener(s)
261        """
262        params = RemoveListenersParams(ids, tag_filters)
263        payload = params.to_payload(self.region, self.profile)
264        response = self._invoke_iatk(payload)
265        output = RemoveListenersOutput(response)
266        LOG.debug(f"Output: {output}")
267        return output

Remove Listener Resource(s) from an AWS Event Bridge Bus

IAM Permissions Needed

tag:GetResources

sqs:DeleteQueue sqs:GetQueueUrl sqs:GetQueueAttributes sqs:ListQueueTags

events:DeleteRule events:RemoveTargets events:ListTargetsByRule

Parameters
  • ids (List[str], optional): List of Listener Ids to remove, one of ids and tag_filters must be supplied
  • tag_filters (List[RemoveListeners_TagFilter], optional): List of RemoveListeners_TagFilter, one of ids and tag_filters must be supplied
Returns
  • RemoveListenersOutput: Data Class that holds the Listener(s) that were removed
Raises
  • IatkException: When failed to remove listener(s)
def poll_events( self, listener_id: str, wait_time_seconds: int, max_number_of_messages: int) -> PollEventsOutput:
278    def poll_events(self, listener_id: str, wait_time_seconds: int, max_number_of_messages: int) -> PollEventsOutput:
279        """
280        Poll Events from a specific Listener
281
282        IAM Permissions Needed
283        ----------------------
284        sqs:GetQueueUrl
285        sqs:ListQueueTags
286        sqs:ReceiveMessage
287        sqs:DeleteMessage
288        sqs:GetQueueAttributes
289
290        events:DescribeRule
291
292        Parameters
293        ----------
294        listener_id : str
295            Id of the Listener that was created
296        wait_time_seconds : int
297            Time in seconds to wait for polling
298        max_number_of_messages : int
299            Max number of messages to poll
300        
301        Returns
302        -------
303        PollEventsOutput
304            Data Class that holds the Events captured by the Listener
305
306        Raises
307        ------
308        IatkException
309            When failed to Poll Events
310        """
311        params = PollEventsParams(listener_id, wait_time_seconds, max_number_of_messages)
312        output = self._poll_events(params)
313        LOG.debug(f"Output: {output}")
314        return output

Poll Events from a specific Listener

IAM Permissions Needed

sqs:GetQueueUrl sqs:ListQueueTags sqs:ReceiveMessage sqs:DeleteMessage sqs:GetQueueAttributes

events:DescribeRule

Parameters
  • listener_id (str): Id of the Listener that was created
  • wait_time_seconds (int): Time in seconds to wait for polling
  • max_number_of_messages (int): Max number of messages to poll
Returns
  • PollEventsOutput: Data Class that holds the Events captured by the Listener
Raises
  • IatkException: When failed to Poll Events
def wait_until_event_matched( self, listener_id: str, assertion_fn: Callable[[str], NoneType], timeout_seconds: int = 30) -> bool:
316    def wait_until_event_matched(self, listener_id: str, assertion_fn: Callable[[str], None], timeout_seconds: int = 30) -> bool:
317        """
318        Poll Events on a given Listener until a match is found or timeout met.
319
320        IAM Permissions Needed
321        ----------------------
322        sqs:GetQueueUrl
323        sqs:ListQueueTags
324        sqs:ReceiveMessage
325        sqs:DeleteMessage
326        sqs:GetQueueAttributes
327
328        events:DescribeRule
329        
330        Parameters
331        ----------
332        listener_id : str
333            Id of the Listener that was created
334        assertion_fn : Callable[[str], bool]
335            Callable function that has an assertion and raises an AssertionError if it fails
336        timeout_seconds : int
337            Timeout (in seconds) to stop the polling
338        
339        Returns
340        -------
341        bool
342            True if the event was matched before timeout otherwise False
343
344        Raises
345        ------
346        IatkException
347            When failed to Poll Events
348        """
349        params = WaitUntilEventMatchedParams(listener_id, assertion_fn, timeout_seconds)
350        start = datetime.now()
351        elapsed = lambda _: (datetime.now() - start).total_seconds()
352        while elapsed(None) < params.timeout_seconds:
353            out = self._poll_events(params=params._poll_event_params, caller="wait_until_event_matched")
354            events = out.events
355            if events:
356                for event in events:
357                    try: 
358                        params.assertion_fn(event)
359                        LOG.debug("event matched")
360                        return True
361                    except AssertionError as e:
362                        LOG.debug(f"Assertion failed: {e}")
363                        
364        LOG.debug(f"timeout after {params.timeout_seconds} seconds")
365        LOG.debug("no matching event found")
366        return False

Poll Events on a given Listener until a match is found or timeout met.

IAM Permissions Needed

sqs:GetQueueUrl sqs:ListQueueTags sqs:ReceiveMessage sqs:DeleteMessage sqs:GetQueueAttributes

events:DescribeRule

Parameters
  • listener_id (str): Id of the Listener that was created
  • assertion_fn (Callable[[str], bool]): Callable function that has an assertion and raises an AssertionError if it fails
  • timeout_seconds (int): Timeout (in seconds) to stop the polling
Returns
  • bool: True if the event was matched before timeout otherwise False
Raises
  • IatkException: When failed to Poll Events
def get_trace_tree(self, tracing_header: str) -> GetTraceTreeOutput:
377    def get_trace_tree(
378        self, tracing_header: str
379    ) -> GetTraceTreeOutput:
380        """
381        Fetch the trace tree structure using the provided tracing_header
382
383        IAM Permissions Needed
384        ----------------------
385        xray:BatchGetTraces
386        
387        Parameters
388        ----------
389        tracing_header : str
390            Trace header to get the trace tree
391        
392        Returns
393        -------
394        GetTraceTreeOutput
395            Data Class that holds the trace tree structure
396
397        Raises
398        ------
399        IatkException
400            When failed to fetch a trace tree
401        """
402        params = GetTraceTreeParams(tracing_header)
403        output = self._get_trace_tree(params)
404        return output

Fetch the trace tree structure using the provided tracing_header

IAM Permissions Needed

xray:BatchGetTraces

Parameters
  • tracing_header (str): Trace header to get the trace tree
Returns
  • GetTraceTreeOutput: Data Class that holds the trace tree structure
Raises
  • IatkException: When failed to fetch a trace tree
def generate_mock_event( self, registry_name: Optional[str] = None, schema_name: Optional[str] = None, schema_version: Optional[str] = None, event_ref: Optional[str] = None, skip_optional: Optional[bool] = None, contexts: Optional[List[Callable[[dict], dict]]] = None) -> GenerateMockEventOutput:
414    def generate_mock_event(
415        self, 
416        registry_name: Optional[str] = None, 
417        schema_name: Optional[str] = None,
418        schema_version: Optional[str] = None,
419        event_ref: Optional[str] = None,
420        skip_optional: Optional[bool] = None,
421        contexts: Optional[List[Callable[[dict], dict]]] = None
422    ) -> GenerateMockEventOutput:
423        """
424        Generate a mock event based on a schema from EventBridge Schema Registry
425
426        IAM Permissions Needed
427        ----------------------
428        schemas:DescribeSchema
429        
430        Parameters
431        ----------
432        registry_name : str
433            name of the registry of the schema stored in EventBridge Schema Registry
434        schema_name : str
435            name of the schema stored in EventBridge Schema Registry
436        schema_version : str
437            version of the schema stored in EventBridge Schema Registry
438        event_ref : str
439            location to the event in the schema in json schema ref syntax, only applicable for openapi schema
440        skip_optional : bool
441            if set to true, do not generate optional fields
442        contexts : List[Callable[[dict], dict]]
443            a list of callables to apply context on the generated mock event
444        
445        Returns
446        -------
447        GenerateMockEventOutput
448            Data Class that holds the trace tree structure
449
450        Raises
451        ------
452        IatkException
453            When failed to fetch a trace tree
454        """
455        params = GenerateMockEventParams(registry_name, schema_name, schema_version, event_ref, skip_optional, contexts)
456        out = self._generate_barebone_event(
457            GenerateBareboneEventParams(
458                registry_name=params.registry_name,
459                schema_name=params.schema_name,
460                schema_version=params.schema_version,
461                event_ref=params.event_ref,
462                skip_optional=params.skip_optional,
463            )
464        )
465        event = out.event
466
467        if params.contexts is not None and type(params.contexts) == list:
468            event = self._apply_contexts(event, params.contexts)
469
470        return GenerateMockEventOutput(event)

Generate a mock event based on a schema from EventBridge Schema Registry

IAM Permissions Needed

schemas:DescribeSchema

Parameters
  • registry_name (str): name of the registry of the schema stored in EventBridge Schema Registry
  • schema_name (str): name of the schema stored in EventBridge Schema Registry
  • schema_version (str): version of the schema stored in EventBridge Schema Registry
  • event_ref (str): location to the event in the schema in json schema ref syntax, only applicable for openapi schema
  • skip_optional (bool): if set to true, do not generate optional fields
  • contexts (List[Callable[[dict], dict]]): a list of callables to apply context on the generated mock event
Returns
  • GenerateMockEventOutput: Data Class that holds the trace tree structure
Raises
  • IatkException: When failed to fetch a trace tree
def retry_until( self, assertion_fn, timeout=10, retryable_exceptions=(<class 'aws_iatk.RetryableException'>,)):
472    def retry_until(self, assertion_fn, timeout = 10, retryable_exceptions = (RetryableException,)):
473        """
474        Decorator function to retry until condition or timeout is met
475
476        IAM Permissions Needed
477        ----------------------
478        
479        Parameters
480        ----------
481        assertion_fn: Callable[[any], None]
482            Callable function that has an assertion and raises an AssertionError if it fails
483        timeout: int or float
484            value that specifies how long the function will retry for until it times out
485        
486        Returns
487        -------
488        bool
489            True if the condition was met or false if the timeout is met
490
491        Raises
492        ------
493        ValueException
494            When timeout is a negative number
495        TypeException
496            When timeout or condition is not a suitable type
497        """
498        if(not(isinstance(timeout, int) or  isinstance(timeout, float))):
499            raise TypeError("timeout must be an int or float")
500        elif(timeout < 0):
501            raise ValueError("timeout must not be a negative value")
502        if(not callable(assertion_fn)):
503            raise TypeError("condition is not a callable function")
504        def retry_until_decorator(func):
505            @wraps(func)
506            def _wrapper(*args, **kwargs):
507                start = datetime.now()
508                attempt = 1
509                delay = 0.05
510                elapsed = lambda _: (datetime.now() - start).total_seconds()
511                if timeout == 0:
512                    elapsed = lambda _: -1
513                while elapsed(None) < timeout:
514                    try:
515                        output = func(*args, **kwargs)
516                    except retryable_exceptions:
517                        continue
518                    try:
519                        assertion_fn(output)
520                        return True
521                    except AssertionError as e:
522                        LOG.debug(f"Assertion failed: {e}")
523                    time.sleep(math.pow(2, attempt) * delay)
524                    attempt += 1
525                LOG.debug(f"timeout after {timeout} seconds")
526                LOG.debug("condition not satisfied")
527                return False
528            return _wrapper
529        return retry_until_decorator

Decorator function to retry until condition or timeout is met

IAM Permissions Needed
Parameters
  • assertion_fn (Callable[[any], None]): Callable function that has an assertion and raises an AssertionError if it fails
  • timeout (int or float): value that specifies how long the function will retry for until it times out
Returns
  • bool: True if the condition was met or false if the timeout is met
Raises
  • ValueException: When timeout is a negative number
  • TypeException: When timeout or condition is not a suitable type
def patch_aws_client(self, client: 'boto3.client', sampled=1) -> 'boto3.client':
545    def patch_aws_client(self, client: "boto3.client", sampled = 1) -> "boto3.client":
546        """
547        Patches boto3 client to register event to include generated x-ray trace id and sampling rule as part of request header before invoke/execution
548
549        Parameters
550        ----------
551        params : boto3.client
552            boto3.client for specified aws service
553        sampled : int
554            value 0 to not sample the request or value 1 to sample the request
555        
556        Returns
557        -------
558        boto3.client
559            same client passed in the params with the event registered
560        """
561        def _add_header(request, **kwargs):
562            trace_id_string= 'Root=;Sampled={}'.format(sampled)
563            
564            request.headers.add_header('X-Amzn-Trace-Id', trace_id_string)
565            LOG.debug(f"Trace ID format: {trace_id_string}")
566
567        service_name = client.meta.service_model.service_name
568        event_string = 'before-sign.{}.*'
569        LOG.debug(f"service id: {client.meta.service_model.service_id}, service name: {service_name}")
570        
571        client.meta.events.register(event_string.format(service_name), _add_header)
572        
573        return client

Patches boto3 client to register event to include generated x-ray trace id and sampling rule as part of request header before invoke/execution

Parameters
  • params (boto3.client): boto3.client for specified aws service
  • sampled (int): value 0 to not sample the request or value 1 to sample the request
Returns
  • boto3.client: same client passed in the params with the event registered
def retry_get_trace_tree_until( self, tracing_header: str, assertion_fn: Callable[[GetTraceTreeOutput], NoneType], timeout_seconds: int = 30):
611    def retry_get_trace_tree_until(self, tracing_header: str, assertion_fn: Callable[[GetTraceTreeOutput], None], timeout_seconds: int = 30):
612        """
613        function to retry get_trace_tree condition or timeout is met
614
615        IAM Permissions Needed
616        ----------------------
617        xray:BatchGetTraces
618        
619        Parameters
620        ----------
621        trace_header:
622            x-ray trace header
623        assertion_fn : Callable[[GetTraceTreeOutput], bool]
624            Callable fuction that makes an assertion and raises an AssertionError if it fails
625        timeout_seconds : int
626            Timeout (in seconds) to stop the fetching
627        
628        Returns
629        -------
630        bool
631            True if the condition was met or false if the timeout is met
632
633        Raises
634        ------
635        IatkException
636            When an exception occurs during get_trace_tree
637        """
638        params = RetryGetTraceTreeUntilParams(tracing_header, assertion_fn, timeout_seconds)
639        @self.retry_until(assertion_fn=params.assertion_fn, timeout=params.timeout_seconds)
640        def fetch_trace_tree():
641            try:
642                response = self._get_trace_tree(
643                    params=GetTraceTreeParams(tracing_header=params.tracing_header),
644                    caller="retry_get_trace_tree_until",
645                )
646                return response
647            except IatkException as e:
648                 if "trace not found" in str(e):
649                    pass
650                    raise RetryableException(e)
651                 else:
652                    raise IatkException(e, 500)
653        try:
654            response = fetch_trace_tree()
655            return response
656        except IatkException as e:
657            raise IatkException(e, 500)

function to retry get_trace_tree condition or timeout is met

IAM Permissions Needed

xray:BatchGetTraces

Parameters
  • trace_header:: x-ray trace header
  • assertion_fn (Callable[[GetTraceTreeOutput], bool]): Callable fuction that makes an assertion and raises an AssertionError if it fails
  • timeout_seconds (int): Timeout (in seconds) to stop the fetching
Returns
  • bool: True if the condition was met or false if the timeout is met
Raises
  • IatkException: When an exception occurs during get_trace_tree
class IatkException(builtins.Exception):
85class IatkException(Exception):
86    def __init__(self, message, error_code) -> None:
87        super().__init__(message)
88
89        self.error_code = error_code

Common base class for all non-exit exceptions.

IatkException(message, error_code)
86    def __init__(self, message, error_code) -> None:
87        super().__init__(message)
88
89        self.error_code = error_code
error_code
Inherited Members
builtins.BaseException
with_traceback
add_note
args
@dataclass
class PhysicalIdFromStackOutput:
14@dataclass
15class PhysicalIdFromStackOutput:
16    """
17    AwsIatk.get_physical_id_from_stack output
18
19    Parameters
20    ----------
21    physical_id : str
22        Physical Id of the Resource requested
23    """
24    physical_id: str
25
26    def __init__(self, data_dict) -> None:
27        self.physical_id = data_dict.get("result", {}).get("output", "")

AwsIatk.get_physical_id_from_stack output

Parameters
  • physical_id (str): Physical Id of the Resource requested
PhysicalIdFromStackOutput(data_dict)
26    def __init__(self, data_dict) -> None:
27        self.physical_id = data_dict.get("result", {}).get("output", "")
physical_id: str
@dataclass
class GetStackOutputsOutput:
15@dataclass
16class GetStackOutputsOutput:
17    """
18    AwsIatk.get_stack_outputs output
19
20    Parameters
21    ----------
22    outputs : Dict[str, str]
23        Dictionary of keys being the StackOutput Key and value
24        being the StackOutput Value       
25    """
26    outputs: Dict[str, str]
27
28    def __init__(self, data_dict: dict) -> None:
29        self.outputs = data_dict.get("result", {}).get("output", {})

AwsIatk.get_stack_outputs output

Parameters
  • outputs (Dict[str, str]): Dictionary of keys being the StackOutput Key and value being the StackOutput Value
GetStackOutputsOutput(data_dict: dict)
28    def __init__(self, data_dict: dict) -> None:
29        self.outputs = data_dict.get("result", {}).get("output", {})
outputs: Dict[str, str]
@dataclass
class AddEbListenerOutput:
41@dataclass
42class AddEbListenerOutput:
43    """
44    AwsIatk.add_listener output
45
46    Parameters
47    ----------
48    id : str
49        Id that corresponds to the listener created
50    target_under_test : AddEbListener_Resource
51        Target Resource that test resources were added
52    components : List[AddEbListener_Resource]
53        List of all Resources created to support the listener
54        on the `target_under_test`
55    """
56    id: str
57    target_under_test: AddEbListener_Resource
58    components: List[AddEbListener_Resource]
59
60    def __init__(self, data_dict: dict) -> None:
61        output = data_dict.get("result", {}).get("output", {})
62        self.id = output.get("Id", "")
63        self.target_under_test = AddEbListener_Resource(
64            output.get("TargetUnderTest", {})
65        )
66        self.components = []
67        for component in output.get("Components", []):
68            self.components.append(AddEbListener_Resource(component))

AwsIatk.add_listener output

Parameters
  • id (str): Id that corresponds to the listener created
  • target_under_test (AddEbListener_Resource): Target Resource that test resources were added
  • components (List[AddEbListener_Resource]): List of all Resources created to support the listener on the target_under_test
AddEbListenerOutput(data_dict: dict)
60    def __init__(self, data_dict: dict) -> None:
61        output = data_dict.get("result", {}).get("output", {})
62        self.id = output.get("Id", "")
63        self.target_under_test = AddEbListener_Resource(
64            output.get("TargetUnderTest", {})
65        )
66        self.components = []
67        for component in output.get("Components", []):
68            self.components.append(AddEbListener_Resource(component))
id: str
target_under_test: aws_iatk.add_eb_listener.AddEbListener_Resource
components: List[aws_iatk.add_eb_listener.AddEbListener_Resource]
@dataclass
class RemoveListenersOutput:
15@dataclass
16class RemoveListenersOutput:
17    """
18    AwsIatk.remove_listeners Output
19
20    Parameters
21    ----------
22    message : str
23        Message indicates whether or not the remove succeeded.
24    """
25    message: str
26
27    def __init__(self, data_dict: dict) -> None:
28        self.message = data_dict.get("result", {}).get("output", "")

AwsIatk.remove_listeners Output

Parameters
  • message (str): Message indicates whether or not the remove succeeded.
RemoveListenersOutput(data_dict: dict)
27    def __init__(self, data_dict: dict) -> None:
28        self.message = data_dict.get("result", {}).get("output", "")
message: str
@dataclass
class RemoveListeners_TagFilter:
31@dataclass
32class RemoveListeners_TagFilter:
33    """
34    Tag filters
35
36    Parameters
37    ----------
38    key : str
39        One part of a key-value pair that makes up a tag. A key is a general label that acts like a category for more specific tag values.
40    values : List[str], optional
41        One part of a key-value pair that make up a tag. A value acts as a descriptor within a tag category (key). The value can be empty or null.
42    """
43    key: str
44    values: Optional[List[str]] = None
45
46    def to_dict(self) -> dict:
47        d = {
48            "Key": self.key,
49        }
50        if self.values:
51            d["Values"] = self.values
52        return d

Tag filters

Parameters
  • key (str): One part of a key-value pair that makes up a tag. A key is a general label that acts like a category for more specific tag values.
  • values (List[str], optional): One part of a key-value pair that make up a tag. A value acts as a descriptor within a tag category (key). The value can be empty or null.
RemoveListeners_TagFilter(key: str, values: Optional[List[str]] = None)
key: str
values: Optional[List[str]] = None
def to_dict(self) -> dict:
46    def to_dict(self) -> dict:
47        d = {
48            "Key": self.key,
49        }
50        if self.values:
51            d["Values"] = self.values
52        return d
@dataclass
class PollEventsOutput:
15@dataclass
16class PollEventsOutput:
17    """
18    AwsIatk.poll_events Output
19
20    Parameters
21    ----------
22    events : List[str]
23        List of event found
24    """
25    events: List[str]
26
27    def __init__(self, data_dict) -> None:
28        output = data_dict.get("result", {}).get("output", [])
29        self.events = output

AwsIatk.poll_events Output

Parameters
  • events (List[str]): List of event found
PollEventsOutput(data_dict)
27    def __init__(self, data_dict) -> None:
28        output = data_dict.get("result", {}).get("output", [])
29        self.events = output
events: List[str]
@dataclass
class GetTraceTreeOutput:
15@dataclass
16class GetTraceTreeOutput:
17    """
18    AwsIatk.get_trace_tree output
19
20    Parameters
21    ----------
22    trace_tree : Tree
23        Trace tree structure of the provided trace id
24    """
25    trace_tree: Tree
26
27    def __init__(self, data_dict) -> None:
28        trace_tree_output = data_dict.get("result", {}).get("output", {})
29        self.trace_tree = Tree(trace_tree_output)

AwsIatk.get_trace_tree output

Parameters
  • trace_tree (Tree): Trace tree structure of the provided trace id
GetTraceTreeOutput(data_dict)
27    def __init__(self, data_dict) -> None:
28        trace_tree_output = data_dict.get("result", {}).get("output", {})
29        self.trace_tree = Tree(trace_tree_output)
trace_tree: aws_iatk.xray.Tree
@dataclass
class GenerateBareboneEventOutput:
16@dataclass
17class GenerateBareboneEventOutput:
18    """
19    AwsIatk.generate_barebone_event output
20    
21    Parameters
22    ----------
23    event : dict
24        mock event
25    """
26    event: dict
27
28    def __init__(self, data_dict: dict) -> None:
29        event = data_dict.get("result", {}).get("output")
30        self.event = json.loads(event) if event else None

AwsIatk.generate_barebone_event output

Parameters
  • event (dict): mock event
GenerateBareboneEventOutput(data_dict: dict)
28    def __init__(self, data_dict: dict) -> None:
29        event = data_dict.get("result", {}).get("output")
30        self.event = json.loads(event) if event else None
event: dict
@dataclass
class GenerateMockEventOutput:
78@dataclass
79class GenerateMockEventOutput:
80    """
81    AwsIatk.generate_mock_event output
82    
83    Parameters
84    ----------
85    event : dict
86        mock event
87    """
88    event: dict

AwsIatk.generate_mock_event output

Parameters
  • event (dict): mock event
GenerateMockEventOutput(event: dict)
event: dict