← Index

The DataStage Class

The AWS DDK provides a construct: DataStage() that can be inherited to build your own custom stages.

Building Your Own Data Stage

Let’s say we need to create a stage that publishes S3 events to an SNS Topic.

We’ll first need to create a Stage for the topic.

I’ll write a file to my application stack package called sns.py.

from typing import Any, Dict, List, Optional

from aws_cdk.aws_events import EventPattern, IRuleTarget
from aws_cdk.aws_events_targets import SnsTopic
from aws_cdk.aws_sns import Topic, ITopic
from aws_cdk.aws_kms import Key
from aws_ddk_core.pipelines import DataStage # importing DataStage class for ddk core
from constructs import Construct


class SNSStage(DataStage):
    """
    Class that represents a SNS DDK Stage.
    """

    def __init__(
        self,
        scope: Construct,
        id: str,
        environment_id: str,
        **kwargs: Any,
    ) -> None:
        """
        DDK SNS stage.
        """
        super().__init__(scope, id, **kwargs)

        self._event_detail_type: str = f"{id}-event-type"

        # create topic
        self._topic = Topic(
            self, f"{id}-topic"
        )

    @property
    def topic(self) -> ITopic:
        """
        Return: Topic
            The SNS Topic
        """
        return self._topic

    # method to retrieve Event Pattern
    def get_event_pattern(self) -> Optional[EventPattern]:
        return EventPattern(
            detail_type=[self._event_detail_type],
        )

    # methord to retrieve Event Rule Target
    def get_targets(self) -> Optional[List[IRuleTarget]]:
        return [SnsTopic(self._topic)]

Now that I have a new class defining my SNS stage, I can instantiate it and add to my Data Pipeline.

.....
from ddk_app.sns import SNSStage # import my class I built above


class DDKApplicationStack(BaseStack):
    def __init__(
        self, scope: Construct, id: str, environment_id: str, **kwargs: Any
    ) -> None:
        super().__init__(scope, id, environment_id, **kwargs)

        # create my bucket
        ddk_bucket = S3Factory.bucket(
            self,
            "ddk-bucket",
            environment_id,
        )
        
        # create an S3 Event Stage based off the class available from `aws_ddk_core.stages`
        s3_event_stage = S3EventStage(
            scope=self,
            id="ddk-s3-event",
            environment_id=environment_id,
            event_names=["CopyObject", "PutObject"],
            bucket_name=ddk_bucket.bucket_name,
            key_prefix="raw",
        )

        # instantiate my sns stage class
        sns_stage = SNSStage(
            scope=self,
            id="ddk-sns",
            environment_id=environment_id,
        )

        # construct my DataPipeline
        (
            DataPipeline(scope=self, id="ddk-pipeline")
                .add_stage(s3_event_stage)
                .add_stage(sns_stage)
        )    

Build

Use ddk deploy to build your infrastructure.

Conclusion

You should now have a Bucket that routes events to a Topic.

Result