MSK Provisioned
An MSK Provisioned cluster with helpers to manage topics, ACLs and IAM permissions.
Overview
The construct creates an MSK Provisioned Cluster, with the latest Kafka version in MSK as default. You can change the defaults by passing your own parameters as a Resource property to construct initializer. The construct supports creating clusters with mTLS, IAM or both as authentication methods. The construct use IAM as authentication by default if none is provided. It offers methods to manage topics and ACLs. Last, it also provides methods to grant an existing principal (ie IAM Role or IAM User or CN -Common Name-) with the permission to produce
or consume
to/from a kafka topic. The diagram below shows the high level architecture.
The construct can create a VPC on your behalf that is used to deploy MSK Provisioned cluster or you can provide your own VPC definition through the vpcConfigs
property when you initialize the construct. The VPC that is created on your behalf has 10.0.0.0/16
CIDR range, and comes with an S3 VPC Endpoint Gateway attached to it. The construct also creates a security group that is attached to the brokers. It's also possible to encrypt the environment variables of the Lambda functions used within this construct using a specific AWS KMS Key through the environmentEncryption
property. Note that you need to ensure the key policy does not block access for these Lambda roles, as this could prevent successful encryption and decryption operations.
Construct cluster setup
The construct sets up a dedicated security group for Zookeeper as advised in the AWS documentation. When authentication is set to TLS, the construct apply ACLs on the provided principal in the props defined as certificateDefinition
. This principal is used by the custom resource to manage ACL. Last, the construct applies MSK configuration setting allow.everyone.if.no.acl.found
to false
. You can also provide your own MSK configuration, in this case the construct does not create one and will apply the one you passed as part of the props.
Interacting with cluster
The construct has the following methods, you will usage examples in the new sections:
- setTopic: Perform create, update, and delete operations on Topics
- setACL: Perform create, update, and delete operations on ACL
- grantProduce: Attach an IAM policy to a principal to write to a topic
- grantConsume: Attach an IAM policy to a principal to read from a topic
Below you can find an example of creating an MSK Provisioned configuration with the default options.
- TypeScript
- Python
const msk = new MskProvisioned(stack, 'cluster');
msk = MskProvisioned(stack, "cluster")
Usage
Bring Your Own VPC
The construct allows you to provide your own VPC that was created outside the CDK Stack. Below you will find an example usage.
- TypeScript
- Python
let vpc = Vpc.fromVpcAttributes(stack, 'vpc', {
vpcId: 'vpc-1111111111',
vpcCidrBlock: '10.0.0.0/16',
availabilityZones: ['eu-west-1a', 'eu-west-1b'],
publicSubnetIds: ['subnet-111111111', 'subnet-11111111'],
privateSubnetIds: ['subnet-11111111', 'subnet-1111111'],
});
const msk = new MskProvisioned(stack, 'cluster', {
vpc: vpc,
clusterName: 'my-cluster',
subnets: vpc.selectSubnets(),
});
vpc = Vpc.from_vpc_attributes(stack, "vpc",
vpc_id="vpc-1111111111",
vpc_cidr_block="10.0.0.0/16",
availability_zones=["eu-west-1a", "eu-west-1b"],
public_subnet_ids=["subnet-111111111", "subnet-11111111"],
private_subnet_ids=["subnet-11111111", "subnet-1111111"]
)
msk = MskProvisioned(stack, "cluster",
vpc=vpc,
cluster_name="my-cluster",
subnets=vpc.select_subnets()
)
Create a cluster with mTLS authentication
The construct allows you to create a cluster with mTLS, below is a code snippet showing the configuration.
When using MSK with mTLS the constructs requires a principal that is assigned to the custom resources that manage ACLs and Topics. The certificate and private key are expected to be in a secret managed by AWS Secrets Manager. The secret needs to be in the format defined below and stored a JSON Key/value
and not Plaintext
in the Secret. The construct grants the lambda that supports the Custom Resource read access to the secret as an Identity based policy
.
{
key : "-----BEGIN RSA PRIVATE KEY----- XXXXXXXXXXXXXXXXX -----END RSA PRIVATE KEY-----",
cert : "-----BEGIN CERTIFICATE----- yyyyyyyyyyyyyyyy -----END CERTIFICATE-----"
}
- TypeScript
- Python
let certificateAuthority = CertificateAuthority.fromCertificateAuthorityArn(
stack, 'certificateAuthority',
'arn:aws:acm-pca:eu-west-1:123456789012:certificate-authority/aaaaaaaa-bbbb-454a-cccc-b454877f0d1b');
const msk = new MskProvisioned(stack, 'cluster', {
clientAuthentication: ClientAuthentication.saslTls(
{
iam: true,
certificateAuthorities: [certificateAuthority],
},
),
certificateDefinition: {
adminPrincipal: 'User:CN=Admin',
aclAdminPrincipal: 'User:CN=aclAdmin',
secretCertificate: Secret.fromSecretCompleteArn(stack, 'secret', 'arn:aws:secretsmanager:eu-west-1:123456789012:secret:dsf/mskCert-3UhUJJ'),
},
allowEveryoneIfNoAclFound: false,
});
certificate_authority = CertificateAuthority.from_certificate_authority_arn(stack, "certificateAuthority", "arn:aws:acm-pca:eu-west-1:123456789012:certificate-authority/aaaaaaaa-bbbb-454a-cccc-b454877f0d1b")
msk = MskProvisioned(stack, "cluster",
client_authentication=ClientAuthentication.sasl_tls(
iam=True,
certificate_authorities=[certificate_authority]
),
certificate_definition=AclAdminProps(
admin_principal="User:CN=Admin",
acl_admin_principal="User:CN=aclAdmin",
secret_certificate=Secret.from_secret_complete_arn(stack, "secret", "arn:aws:secretsmanager:eu-west-1:123456789012:secret:dsf/mskCert-3UhUJJ")
),
allow_everyone_if_no_acl_found=False
)
setTopic
This method allows you to create, update or delete an ACL. Its backend uses kafkajs.
The topic is defined by the property type called MskTopic
.
If your MSK cluster uses KRaft mode rather than ZooKeeper, we encourage you to set to the parameter waitForLeaders = true
and timeout = 10000
.
Below you can see the definition of the topic as well as a usage.
{
topic: <String>,
numPartitions: <Number>, // default: -1 (uses broker `num.partitions` configuration)
replicationFactor: <Number>, // default: -1 (uses broker `default.replication.factor` configuration)
replicaAssignment: <Array>, // Example: [{ partition: 0, replicas: [0,1,2] }] - default: []
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
Dependeding on the authentication type that is set in the cluster, you need to put the right parameter in authentication, for mTLS use Authentitcation.MTLS
and for IAM use Authentitcation.IAM
. The example below uses IAM as authentication.
- TypeScript
- Python
msk.setTopic('topic1',
Authentication.IAM, {
topic: 'topic1',
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{
name: 'retention.ms',
value: '90000',
},
{
name: 'retention.bytes',
value: '90000',
},
],
}, cdk.RemovalPolicy.DESTROY, false, 1500);
msk.set_topic("topic1", Authentication.IAM, MskTopic(
topic="topic1",
num_partitions=3,
replication_factor=1,
config_entries=[{
"name": "retention.ms",
"value": "90000"
}, {
"name": "retention.bytes",
"value": "90000"
}
]
), cdk.RemovalPolicy.DESTROY, False, 1500)
setACL
This method allows you to create, update or delete a topic. Its backend uses kafkajs.
The topic is defined by the property type called MskACL
. This method should be used only when the cluster authentication is set to mTLS
. Below you can see the definition of the topic as well as an example of use.
{
resourceType: <AclResourceTypes>,
resourceName: <String>,
resourcePatternType: <ResourcePatternTypes>,
principal: <String>,
host: <String>,
operation: <AclOperationTypes>,
permissionType: <AclPermissionTypes>,
}
You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in clientAuthentication
: for mTLS use Authentitcation.MTLS
and for IAM use Authentitcation.IAM
. Default value is Authentitcation.MTLS
. The example below uses IAM as authentication.
- TypeScript
- Python
msk.setAcl('acl', {
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-1',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:Cn=Bar',
host: '*',
operation: AclOperationTypes.CREATE,
permissionType: AclPermissionTypes.ALLOW,
},
cdk.RemovalPolicy.DESTROY,
Authentication.IAM,
);
msk.set_acl("acl", Acl(
resource_type=AclResourceTypes.TOPIC,
resource_name="topic-1",
resource_pattern_type=ResourcePatternTypes.LITERAL,
principal="User:Cn=Bar",
host="*",
operation=AclOperationTypes.CREATE,
permission_type=AclPermissionTypes.ALLOW
), cdk.RemovalPolicy.DESTROY, Authentication.IAM)
grantProduce
This method allows to grant a Principal
the rights to write to a kafka topic.
In case of IAM authentication the method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided. For mTLS authentication, the method apply an ACL for the provided Common Name
that allows it to write to the topic.
- TypeScript
- Python
msk.grantProduce('consume', 'foo', Authentication.MTLS, 'User:Cn=MyUser');
msk.grant_produce("consume", "foo", Authentication.MTLS, "User:Cn=MyUser")
grantConsume
This method allows to grant a Principal
the rights to read to a kafka topic.
In case of IAM authentication the method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided. For mTLS authentication, the method apply an ACL for the provided Common Name
that allows it to read from the topic.
- TypeScript
- Python
msk.grantConsume('consume', 'foo', Authentication.MTLS, 'User:Cn=MyUser');
msk.grant_consume("consume", "foo", Authentication.MTLS, "User:Cn=MyUser")
addCluster Policy
This method allows you to add IAM resource policy to your MSK cluster. This method can enable you for example to setup cross account access for your Amazon MSK cluster.
- TypeScript
- Python
const msk = new MskProvisioned(stack, 'cluster');
const cluterPolicy = new PolicyDocument(
{
statements: [
new PolicyStatement({
actions: [
'kafka:CreateVpcConnection',
'kafka:GetBootstrapBrokers',
'kafka:DescribeClusterV2',
],
resources: [msk.cluster.attrArn],
effect: Effect.ALLOW,
principals: [new ServicePrincipal('firehose.amazonaws.com')],
}),
],
},
);
msk.addClusterPolicy(cluterPolicy, 'cluterPolicy');
msk = MskProvisioned(stack, "cluster")
cluter_policy = PolicyDocument(
statements=[
PolicyStatement(
actions=["kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2"
],
resources=[msk.cluster.attr_arn],
effect=Effect.ALLOW,
principals=[ServicePrincipal("firehose.amazonaws.com")]
)
]
)
msk.add_cluster_policy(cluter_policy, "cluterPolicy")