Skip to main content

MSK Provisioned

An MSK Provisioned cluster with helpers to manage topics, ACLs and IAM permissions.

Overview

The construct creates an MSK Provisioned Cluster, with the latest Kafka version in MSK as default. You can change the defaults by passing your own parameters as a Resource property to construct initializer. The construct supports creating clusters with mTLS, IAM or both as authentication methods. The construct use IAM as authentication by default if none is provided. It offers methods to manage topics and ACLs. Last, it also provides methods to grant an existing principal (ie IAM Role or IAM User or CN -Common Name-) with the permission to produce or consume to/from a kafka topic. The diagram below shows the high level architecture.

MSK Provisioned High level architecture

The construct can create a VPC on your behalf that is used to deploy MSK Provisioned cluster or you can provide your own VPC definition through the vpcConfigs property when you initialize the construct. The VPC that is created on your behalf has 10.0.0.0/16 CIDR range, and comes with an S3 VPC Endpoint Gateway attached to it. The construct also creates a security group that is attached to the brokers.

Construct cluster setup

The construct sets up a dedicated security group for Zookeeper as advised in the AWS documentation. When authentication is set to TLS, the construct apply ACLs on the provided principal in the props defined as certificateDefinition. This principal is used by the custom resource to manage ACL. Last, the construct applies MSK configuration setting allow.everyone.if.no.acl.found to false. You can also provide your own MSK configuration, in this case the construct does not create one and will apply the one you passed as part of the props.

Interacting with cluster

The construct has the following methods, you will usage examples in the new sections:

  • setTopic: Perform create, update, and delete operations on Topics
  • setACL: Perform create, update, and delete operations on ACL
  • grantProduce: Attach an IAM policy to a principal to write to a topic
  • grantConsume: Attach an IAM policy to a principal to read from a topic

Below you can find an example of creating an MSK Provisioned configuration with the default options.

const msk = new MskProvisioned(stack, 'cluster');

Usage

Bring Your Own VPC

The construct allows you to provide your own VPC that was created outside the CDK Stack. Below you will find an example usage.

let vpc = Vpc.fromVpcAttributes(stack, 'vpc', {
vpcId: 'vpc-1111111111',
vpcCidrBlock: '10.0.0.0/16',
availabilityZones: ['eu-west-1a', 'eu-west-1b'],
publicSubnetIds: ['subnet-111111111', 'subnet-11111111'],
privateSubnetIds: ['subnet-11111111', 'subnet-1111111'],
});

const msk = new MskProvisioned(stack, 'cluster', {
vpc: vpc,
clusterName: 'my-cluster',
subnets: vpc.selectSubnets(),
});

Create a cluster with mTLS authentication

The construct allows you to create a cluster with mTLS, below is a code snippet showing the configuration.

When using MSK with mTLS the constructs requires a principal that is assigned to the custom resources that manage ACLs and Topics. The certificate and private key are expected to be in a secret managed by AWS Secrets Manager. The secret needs to be in the format defined below and stored a JSON Key/value and not Plaintext in the Secret. The construct grants the lambda that supports the Custom Resource read access to the secret as an Identity based policy.

    {
key : "-----BEGIN RSA PRIVATE KEY----- XXXXXXXXXXXXXXXXX -----END RSA PRIVATE KEY-----",

cert : "-----BEGIN CERTIFICATE----- yyyyyyyyyyyyyyyy -----END CERTIFICATE-----"
}
let certificateAuthority = CertificateAuthority.fromCertificateAuthorityArn(
stack, 'certificateAuthority',
'arn:aws:acm-pca:eu-west-1:123456789012:certificate-authority/aaaaaaaa-bbbb-454a-cccc-b454877f0d1b');

const msk = new MskProvisioned(stack, 'cluster', {
clientAuthentication: ClientAuthentication.saslTls(
{
iam: true,
certificateAuthorities: [certificateAuthority],
},
),
certificateDefinition: {
adminPrincipal: 'User:CN=Admin',
aclAdminPrincipal: 'User:CN=aclAdmin',
secretCertificate: Secret.fromSecretCompleteArn(stack, 'secret', 'arn:aws:secretsmanager:eu-west-1:123456789012:secret:dsf/mskCert-3UhUJJ'),
},
allowEveryoneIfNoAclFound: false,
});

setTopic

This method allows you to create, update or delete an ACL. Its backend uses kafkajs. The topic is defined by the property type called MskTopic. If your MSK cluster uses KRaft mode rather than ZooKeeper, we encourage you to set to the parameter waitForLeaders = true and timeout = 10000. Below you can see the definition of the topic as well as a usage.

{
topic: <String>,
numPartitions: <Number>, // default: -1 (uses broker `num.partitions` configuration)
replicationFactor: <Number>, // default: -1 (uses broker `default.replication.factor` configuration)
replicaAssignment: <Array>, // Example: [{ partition: 0, replicas: [0,1,2] }] - default: []
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}

Dependeding on the authentication type that is set in the cluster, you need to put the right parameter in authentication, for mTLS use Authentitcation.MTLS and for IAM use Authentitcation.IAM. The example below uses IAM as authentication.

msk.setTopic('topic1',
Authentication.IAM, {
topic: 'topic1',
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{
name: 'retention.ms',
value: '90000',
},
{
name: 'retention.bytes',
value: '90000',
},
],
}, cdk.RemovalPolicy.DESTROY, false, 1500);

setACL

This method allows you to create, update or delete a topic. Its backend uses kafkajs. The topic is defined by the property type called MskACL. This method should be used only when the cluster authentication is set to mTLS. Below you can see the definition of the topic as well as an example of use.

{
resourceType: <AclResourceTypes>,
resourceName: <String>,
resourcePatternType: <ResourcePatternTypes>,
principal: <String>,
host: <String>,
operation: <AclOperationTypes>,
permissionType: <AclPermissionTypes>,
}

You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in clientAuthentication: for mTLS use Authentitcation.MTLS and for IAM use Authentitcation.IAM. Default value is Authentitcation.MTLS. The example below uses IAM as authentication.

msk.setAcl('acl', {
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-1',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:Cn=Bar',
host: '*',
operation: AclOperationTypes.CREATE,
permissionType: AclPermissionTypes.ALLOW,
},
cdk.RemovalPolicy.DESTROY,
Authentication.IAM,
);

grantProduce

This method allows to grant a Principal the rights to write to a kafka topic. In case of IAM authentication the method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided. For mTLS authentication, the method apply an ACL for the provided Common Name that allows it to write to the topic.

msk.grantProduce('consume', 'foo', Authentication.MTLS, 'User:Cn=MyUser');

grantConsume

This method allows to grant a Principal the rights to read to a kafka topic. In case of IAM authentication the method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided. For mTLS authentication, the method apply an ACL for the provided Common Name that allows it to read from the topic.

msk.grantConsume('consume', 'foo', Authentication.MTLS, 'User:Cn=MyUser');

addCluster Policy

This method allows you to add IAM resource policy to your MSK cluster. This method can enable you for example to setup cross account access for your Amazon MSK cluster.

const msk = new MskProvisioned(stack, 'cluster');

const cluterPolicy = new PolicyDocument(
{
statements: [
new PolicyStatement({
actions: [
'kafka:CreateVpcConnection',
'kafka:GetBootstrapBrokers',
'kafka:DescribeClusterV2',
],
resources: [msk.cluster.attrArn],
effect: Effect.ALLOW,
principals: [new ServicePrincipal('firehose.amazonaws.com')],
}),
],
},
);

msk.addClusterPolicy(cluterPolicy, 'cluterPolicy');