Skip to main content

Kafka API

Standalone access to Kafka data plane API to perform Create/Update/Delete operations for ACLs and Topics. The constructs support both MSK Serverless and MSK Provisioned, and is used when you need to bring your own cluster.

Overview

The construct leverages the CDK Provider Framework to deploy a custom resource to manage topics, and in case of mTLS authentication deploys also a custom resource to manage ACLs.

let certificateAuthority = CertificateAuthority.fromCertificateAuthorityArn(
stack, 'certificateAuthority',
'arn:aws:acm-pca:eu-west-1:12345678912:certificate-authority/dummy-ca'
);

let secret = Secret.fromSecretCompleteArn(
stack, 'secret', 'arn:aws:secretsmanager:eu-west-1:12345678912:secret:dsf/mskCert-dummy'
);

let vpc = Vpc.fromVpcAttributes(stack, 'vpc', {
vpcId: 'vpc-1111111111',
vpcCidrBlock: '10.0.0.0/16',
availabilityZones: ['eu-west-1a', 'eu-west-1b'],
publicSubnetIds: ['subnet-111111111', 'subnet-11111111'],
privateSubnetIds: ['subnet-11111111', 'subnet-1111111'],
});


const kafkaApi = new KafkaApi(stack, 'kafkaApi', {
vpc: vpc,
clusterArn: 'arn:aws:kafka:eu-west-1:12345678912:cluster/byo-msk/dummy-5cf3-42d5-aece-dummmy-2',
clusterType: MskClusterType.PROVISIONED,
brokerSecurityGroup: SecurityGroup.fromSecurityGroupId(stack, 'brokerSecurityGroup', 'sg-98237412hsa'),
certficateSecret: secret,
clientAuthentication: ClientAuthentication.saslTls({
iam: true,
certificateAuthorities: [certificateAuthority],
},),
kafkaClientLogLevel: KafkaClientLogLevel.DEBUG,
serviceToken: 'arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX',
});

When deploying multiple stacks with the Kafka Api, if there is an already existing service token deployed for the custom resource, you can reuse it to reduce the number of resources created like lambdas and ENI that are used to create and manage the lifecycle the custom resources, like ACLs and Topics.

warning

The construct needs to be deployed in the same region as the MSK cluster.

Using mTLS authentication

When using MSK with mTLS the constructs requires a principal that is assigned to the custom resources that manage ACLs and Topics. The certificate and private key are expected to be in a secret managed by AWS Secrets Manager. The secret needs to be in the format defined below and stored a JSON Key/value and not Plaintext in the Secret. The construct grants the lambda that supports the Custom Resource read access to the secret as an Identity based policy.

    {
key : "-----BEGIN RSA PRIVATE KEY----- XXXXXXXXXXXXXXXXX -----END RSA PRIVATE KEY-----",

cert : "-----BEGIN CERTIFICATE----- yyyyyyyyyyyyyyyy -----END CERTIFICATE-----"
}

You can create the secret with the following AWS CLI command:

aws secretsmanager create-secret --name my-secret \
--secret-string '{"key": "PRIVATE-KEY", "cert": "CERTIFICATE"}'
danger

Do not create the secret as part of the CDK application. The secret contains the private key and the deployment is not secured.

You can use this utility to generate the certificates:

  1. Build the tool
  2. Run the following command to generate the certificates and print them
java -jar AuthMSK-1.0-SNAPSHOT.jar -caa <PCA_ARN> -ccf tmp/client_cert.pem -pem -pkf tmp/private_key.pem -ksp "XXXXXXXXXX" -ksl tmp/kafka.client.keystore.jks
cat tmp/client_cert.pem
cat tmp/private_key.pem
  1. Copy/paste the value of the client certificate and the private key in the secret

setTopic

This method allows you to create, update or delete a topic. Its backend uses kafkajs. The topic is defined by the property type called MskTopic. Below you can see the definition of the topic.

{
topic: <String>,
numPartitions: <Number>, // default: -1 (uses broker `num.partitions` configuration)
replicationFactor: <Number>, // default: -1 (uses broker `default.replication.factor` configuration)
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}

Dependeding on the authentication type used in the cluster, you need to put the right parameter in authentication, for mTLS use Authentitcation.MTLS and for IAM use Authentitcation.IAM. The example below uses IAM as authentication.

kafkaApi.setTopic('topic1',
Authentication.IAM,
{
topic: 'topic1',
numPartitions: 3,
replicationFactor: 1,
},
cdk.RemovalPolicy.DESTROY, false, 1500
);
warning

Only the number of partitions can be updated after the creation of the topic.

setACL

This method allows you to create, update or delete an ACL. Its backend uses kafkajs. The topic is defined by the property type called MskACL. This method can be used when the cluster authentication is set to mTLS or IAM+mTLS. Below you can see the definition of the ACL as well as an example of use.

{
resourceType: <AclResourceTypes>,
resourceName: <String>,
resourcePatternType: <ResourcePatternTypes>,
principal: <String>,
host: <String>,
operation: <AclOperationTypes>,
permissionType: <AclPermissionTypes>,
}

You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in clientAuthentication: for mTLS use Authentitcation.MTLS and for IAM use Authentitcation.IAM. Default value is Authentitcation.MTLS. The example below uses mTLS as authentication.

kafkaApi.setAcl('acl',
{
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-1',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:Cn=MyUser',
host: '*',
operation: AclOperationTypes.CREATE,
permissionType: AclPermissionTypes.ALLOW,
},
cdk.RemovalPolicy.DESTROY,
Authentication.MTLS
);

grantProduce

This method allows to grant a Principal the permissions to write to a kafka topic. In case of IAM authentication the method attaches an IAM policy as defined in the AWS documentation scoped only to the topic provided. For mTLS authentication, the method applies an ACL for the provided Common Name that allow write operations on the topic.

kafkaApi.grantProduce('consume', 'foo', Authentication.MTLS, 'User:Cn=bar');

grantConsume

This method allows to grant a Principal the permissions to read to a kafka topic. In case of IAM authentication the method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided. For mTLS authentication, the method applies an ACL for the provided Common Name that allow read operations on the topic.

kafkaApi.grantConsume('consume', 'foo', Authentication.MTLS, 'User:Cn=bar');