Skip to main content

MSK Serverless

An MSK Serverless cluster with helpers to manage topics and IAM permissions.

Overview

The construct creates an MSK Serverless Cluster, with the latest Kafka version in MSK as default. You can change the dafaults by passing your own parameters as a Resource property to construct initializer. There is also a method to create topics. Last, it also provides methods to grant an existing principal (ie IAM Role or IAM User) with the permission to produce or consume from a kafka topic. The diagram below shows the high level architecture.

MSK Serverless High level architecture

The construct can create a VPC on your behalf that is used to deploy MSK Serverless cluser or you can provide your own VPC definition through the vpcConfigs property when you initialize the construct. The VPC that is created on your behalf has 10.0.0.0/16 CIDR range, and comes with an S3 VPC Endpoint Gateway attached to it. The construct also creates a security group for that is attached to the brokers.

The construct has the following interfaces, you will usage examples in the new sections:

  • setTopic: Perform create, update, and delete operations on Topics
  • grantProduce: Attach an IAM policy to a principal to write to a topic
  • grantConsume: Attach an IAM policy to a principal to read from a topic

Below you can find an example of creating an MSK Serverless configuration with the default options.

const msk = new MskServerless(stack, 'cluster');

Usage

Bring Your Own VPC

The construct allows you to provide your own VPC that was created outside the CDK Stack. Below you will find an example usage.

let vpc = Vpc.fromVpcAttributes(stack, 'vpc', {
vpcId: 'vpc-1111111111',
vpcCidrBlock: '10.0.0.0/16',
availabilityZones: ['eu-west-1a', 'eu-west-1b'],
publicSubnetIds: ['subnet-111111111', 'subnet-11111111'],
privateSubnetIds: ['subnet-11111111', 'subnet-1111111'],
});

const msk = new MskServerless(stack, 'cluster', {
clusterName: 'msk-byov',
securityGroups: [SecurityGroup.fromLookupByName(stack, 'brokerSecurityGroup', 'broker-sg', vpc)],
subnets: vpc.selectSubnets(),
vpc: vpc,
});

setTopic

This method allows you to create, update or delete a topic. Its backend uses kafkajs. The topic is defined by the property type called MskTopic. If your MSK cluster uses KRaft mode rather than ZooKeeper, we encourage you to set to the parameter waitForLeaders = true and timeout = 10000. Below you can see the definition of the topic as well as a usage.

{
topic: <String>,
numPartitions: <Number>, // default: -1 (uses broker `num.partitions` configuration)
replicationFactor: <Number>, // default: -1 (uses broker `default.replication.factor` configuration)
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
const msk = new MskServerless(stack, 'cluster');

let topic: MskTopic = {
topic: 'topic1',
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{
name: 'retention.ms',
value: '90000',
},
{
name: 'retention.bytes',
value: '90000',
},
],
}

msk.addTopic('topic1', topic, cdk.RemovalPolicy.DESTROY, false, 1500);

grantProduce

This method allows to grant a Principal the rights to write to a kafka topic. The method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided.

const msk = new MskServerless(stack, 'cluster');

let iamRole = Role.fromRoleName(stack, 'role', 'role');

msk.grantProduce('topic1',iamRole);

grantConsume

This method allows to grant a Principal the rights to read to a kafka topic. The method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided.

const msk = new MskServerless(stack, 'cluster');

const iamRole = Role.fromRoleName(stack, 'role', 'role');

msk.grantConsume('topic1',iamRole);

addClusterPolicy

This method allows you to add IAM resource policy to your MSK cluster. This method can enable you for example to setup cross account access for your Amazon MSK cluster.

const msk = new MskServerless(stack, 'cluster');

const cluterPolicy = new PolicyDocument(
{
statements: [
new PolicyStatement({
actions: [
'kafka:CreateVpcConnection',
'kafka:GetBootstrapBrokers',
'kafka:DescribeClusterV2',
],
resources: [msk.cluster.attrArn],
effect: Effect.ALLOW,
principals: [new ServicePrincipal('firehose.amazonaws.com')],
}),
],
},
);

msk.addClusterPolicy(cluterPolicy, 'cluterPolicy');