MSK Serverless
An MSK Serverless cluster with helpers to manage topics and IAM permissions.
Overview
The construct creates an MSK Serverless Cluster, with the latest Kafka version in MSK as default. You can change the dafaults by passing your own parameters as a Resource property to construct initializer. There is also a method to create topics. Last, it also provides methods to grant an existing principal (ie IAM Role or IAM User) with the permission to produce
or consume
from a kafka topic. The diagram below shows the high level architecture.
The construct can create a VPC on your behalf that is used to deploy MSK Serverless cluser or you can provide your own VPC definition through the vpcConfigs
property when you initialize the construct. The VPC that is created on your behalf has 10.0.0.0/16
CIDR range, and comes with an S3 VPC Endpoint Gateway attached to it. The construct also creates a security group for that is attached to the brokers.
The construct has the following interfaces, you will usage examples in the new sections:
- setTopic: Perform create, update, and delete operations on Topics
- grantProduce: Attach an IAM policy to a principal to write to a topic
- grantConsume: Attach an IAM policy to a principal to read from a topic
Below you can find an example of creating an MSK Serverless configuration with the default options.
- TypeScript
- Python
const msk = new MskServerless(stack, 'cluster');
msk = MskServerless(stack, "cluster")
Usage
Bring Your Own VPC
The construct allows you to provide your own VPC that was created outside the CDK Stack. Below you will find an example usage.
- TypeScript
- Python
let vpc = Vpc.fromVpcAttributes(stack, 'vpc', {
vpcId: 'vpc-1111111111',
vpcCidrBlock: '10.0.0.0/16',
availabilityZones: ['eu-west-1a', 'eu-west-1b'],
publicSubnetIds: ['subnet-111111111', 'subnet-11111111'],
privateSubnetIds: ['subnet-11111111', 'subnet-1111111'],
});
const msk = new MskServerless(stack, 'cluster', {
clusterName: 'msk-byov',
securityGroups: [SecurityGroup.fromLookupByName(stack, 'brokerSecurityGroup', 'broker-sg', vpc)],
subnets: vpc.selectSubnets(),
vpc: vpc,
});
vpc = Vpc.from_vpc_attributes(stack, "vpc",
vpc_id="vpc-1111111111",
vpc_cidr_block="10.0.0.0/16",
availability_zones=["eu-west-1a", "eu-west-1b"],
public_subnet_ids=["subnet-111111111", "subnet-11111111"],
private_subnet_ids=["subnet-11111111", "subnet-1111111"]
)
msk = MskServerless(stack, "cluster",
cluster_name="msk-byov",
security_groups=[SecurityGroup.from_lookup_by_name(stack, "brokerSecurityGroup", "broker-sg", vpc)],
subnets=vpc.select_subnets(),
vpc=vpc
)
setTopic
This method allows you to create, update or delete a topic. Its backend uses kafkajs.
The topic is defined by the property type called MskTopic
.
If your MSK cluster uses KRaft mode rather than ZooKeeper, we encourage you to set to the parameter waitForLeaders = true
and timeout = 10000
.
Below you can see the definition of the topic as well as a usage.
{
topic: <String>,
numPartitions: <Number>, // default: -1 (uses broker `num.partitions` configuration)
replicationFactor: <Number>, // default: -1 (uses broker `default.replication.factor` configuration)
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
- TypeScript
- Python
const msk = new MskServerless(stack, 'cluster');
let topic: MskTopic = {
topic: 'topic1',
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{
name: 'retention.ms',
value: '90000',
},
{
name: 'retention.bytes',
value: '90000',
},
],
}
msk.addTopic('topic1', topic, cdk.RemovalPolicy.DESTROY, false, 1500);
msk = MskServerless(stack, "cluster")
topic = MskTopic(
topic="topic1",
num_partitions=3,
replication_factor=1,
config_entries=[{
"name": "retention.ms",
"value": "90000"
}, {
"name": "retention.bytes",
"value": "90000"
}
]
)
msk.add_topic("topic1", topic, cdk.RemovalPolicy.DESTROY, False, 1500)
grantProduce
This method allows to grant a Principal
the rights to write to a kafka topic.
The method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided.
- TypeScript
- Python
const msk = new MskServerless(stack, 'cluster');
let iamRole = Role.fromRoleName(stack, 'role', 'role');
msk.grantProduce('topic1',iamRole);
msk = MskServerless(stack, "cluster")
iam_role = Role.from_role_name(stack, "role", "role")
msk.grant_produce("topic1", iam_role)
grantConsume
This method allows to grant a Principal
the rights to read to a kafka topic.
The method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided.
- TypeScript
- Python
const msk = new MskServerless(stack, 'cluster');
const iamRole = Role.fromRoleName(stack, 'role', 'role');
msk.grantConsume('topic1',iamRole);
msk = MskServerless(stack, "cluster")
iam_role = Role.from_role_name(stack, "role", "role")
msk.grant_consume("topic1", iam_role)
addClusterPolicy
This method allows you to add IAM resource policy to your MSK cluster. This method can enable you for example to setup cross account access for your Amazon MSK cluster.
- TypeScript
- Python
const msk = new MskServerless(stack, 'cluster');
const cluterPolicy = new PolicyDocument(
{
statements: [
new PolicyStatement({
actions: [
'kafka:CreateVpcConnection',
'kafka:GetBootstrapBrokers',
'kafka:DescribeClusterV2',
],
resources: [msk.cluster.attrArn],
effect: Effect.ALLOW,
principals: [new ServicePrincipal('firehose.amazonaws.com')],
}),
],
},
);
msk.addClusterPolicy(cluterPolicy, 'cluterPolicy');
msk = MskServerless(stack, "cluster")
cluter_policy = PolicyDocument(
statements=[
PolicyStatement(
actions=["kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2"
],
resources=[msk.cluster.attr_arn],
effect=Effect.ALLOW,
principals=[ServicePrincipal("firehose.amazonaws.com")]
)
]
)
msk.add_cluster_policy(cluter_policy, "cluterPolicy")