Kafka API
Standalone access to Kafka data plane API to perform Create/Update/Delete operations for ACLs and Topics. The constructs support both MSK Serverless and MSK Provisioned, and is used when you need to bring your own cluster.
Overview
The construct leverages the CDK Provider Framework to deploy a custom resource to manage topics
, and in case of mTLS
authentication deploys also a custom resource to manage ACLs
.
- TypeScript
- Python
let certificateAuthority = CertificateAuthority.fromCertificateAuthorityArn(
stack, 'certificateAuthority',
'arn:aws:acm-pca:eu-west-1:12345678912:certificate-authority/dummy-ca'
);
let secret = Secret.fromSecretCompleteArn(
stack, 'secret', 'arn:aws:secretsmanager:eu-west-1:12345678912:secret:dsf/mskCert-dummy'
);
let vpc = Vpc.fromVpcAttributes(stack, 'vpc', {
vpcId: 'vpc-1111111111',
vpcCidrBlock: '10.0.0.0/16',
availabilityZones: ['eu-west-1a', 'eu-west-1b'],
publicSubnetIds: ['subnet-111111111', 'subnet-11111111'],
privateSubnetIds: ['subnet-11111111', 'subnet-1111111'],
});
const kafkaApi = new KafkaApi(stack, 'kafkaApi', {
vpc: vpc,
clusterArn: 'arn:aws:kafka:eu-west-1:12345678912:cluster/byo-msk/dummy-5cf3-42d5-aece-dummmy-2',
clusterType: MskClusterType.PROVISIONED,
brokerSecurityGroup: SecurityGroup.fromSecurityGroupId(stack, 'brokerSecurityGroup', 'sg-98237412hsa'),
certficateSecret: secret,
clientAuthentication: ClientAuthentication.saslTls({
iam: true,
certificateAuthorities: [certificateAuthority],
},),
kafkaClientLogLevel: KafkaClientLogLevel.DEBUG,
serviceToken: 'arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX',
});
certificate_authority = CertificateAuthority.from_certificate_authority_arn(stack, "certificateAuthority", "arn:aws:acm-pca:eu-west-1:12345678912:certificate-authority/dummy-ca")
secret = Secret.from_secret_complete_arn(stack, "secret", "arn:aws:secretsmanager:eu-west-1:12345678912:secret:dsf/mskCert-dummy")
vpc = Vpc.from_vpc_attributes(stack, "vpc",
vpc_id="vpc-1111111111",
vpc_cidr_block="10.0.0.0/16",
availability_zones=["eu-west-1a", "eu-west-1b"],
public_subnet_ids=["subnet-111111111", "subnet-11111111"],
private_subnet_ids=["subnet-11111111", "subnet-1111111"]
)
kafka_api = KafkaApi(stack, "kafkaApi",
vpc=vpc,
cluster_arn="arn:aws:kafka:eu-west-1:12345678912:cluster/byo-msk/dummy-5cf3-42d5-aece-dummmy-2",
cluster_type=MskClusterType.PROVISIONED,
broker_security_group=SecurityGroup.from_security_group_id(stack, "brokerSecurityGroup", "sg-98237412hsa"),
certficate_secret=secret,
client_authentication=ClientAuthentication.sasl_tls(
iam=True,
certificate_authorities=[certificate_authority]
),
kafka_client_log_level=KafkaClientLogLevel.DEBUG,
service_token="arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX"
)
When deploying multiple stacks with the Kafka Api, if there is an already existing service token deployed for the custom resource, you can reuse it to reduce the number of resources created like lambdas and ENI that are used to create and manage the lifecycle the custom resources, like ACLs and Topics.
It's also possible to encrypt the environment variables of the Lambda functions used within this construct using a specific AWS KMS Key through the environmentEncryption
property. Note that you need to ensure the key policy does not block access for these Lambda roles, as this could prevent successful encryption and decryption operations.
The construct needs to be deployed in the same region as the MSK cluster.
Using mTLS authentication
When using MSK with mTLS the constructs requires a principal that is assigned to the custom resources that manage ACLs and Topics. The certificate and private key are expected to be in a secret managed by AWS Secrets Manager. The secret needs to be in the format defined below and stored a JSON Key/value
and not Plaintext
in the Secret. The construct grants the lambda that supports the Custom Resource read access to the secret as an Identity based policy
.
{
key : "-----BEGIN RSA PRIVATE KEY----- XXXXXXXXXXXXXXXXX -----END RSA PRIVATE KEY-----",
cert : "-----BEGIN CERTIFICATE----- yyyyyyyyyyyyyyyy -----END CERTIFICATE-----"
}
You can create the secret with the following AWS CLI command:
aws secretsmanager create-secret --name my-secret \
--secret-string '{"key": "PRIVATE-KEY", "cert": "CERTIFICATE"}'
Do not create the secret as part of the CDK application. The secret contains the private key and the deployment is not secured.
You can use this utility to generate the certificates:
- Build the tool
- Run the following command to generate the certificates and print them
java -jar AuthMSK-1.0-SNAPSHOT.jar -caa <PCA_ARN> -ccf tmp/client_cert.pem -pem -pkf tmp/private_key.pem -ksp "XXXXXXXXXX" -ksl tmp/kafka.client.keystore.jks
cat tmp/client_cert.pem
cat tmp/private_key.pem
- Copy/paste the value of the client certificate and the private key in the secret
setTopic
This method allows you to create, update or delete a topic. Its backend uses kafkajs.
The topic is defined by the property type called MskTopic
. Below you can see the definition of the topic.
{
topic: <String>,
numPartitions: <Number>, // default: -1 (uses broker `num.partitions` configuration)
replicationFactor: <Number>, // default: -1 (uses broker `default.replication.factor` configuration)
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
Dependeding on the authentication type used in the cluster, you need to put the right parameter in authentication, for mTLS use Authentitcation.MTLS
and for IAM use Authentitcation.IAM
. The example below uses IAM as authentication.
- TypeScript
- Python
kafkaApi.setTopic('topic1',
Authentication.IAM,
{
topic: 'topic1',
numPartitions: 3,
replicationFactor: 1,
},
cdk.RemovalPolicy.DESTROY, false, 1500
);
kafka_api.set_topic("topic1", Authentication.IAM, MskTopic(
topic="topic1",
num_partitions=3,
replication_factor=1
), cdk.RemovalPolicy.DESTROY, False, 1500)
Only the number of partitions can be updated after the creation of the topic.
setACL
This method allows you to create, update or delete an ACL. Its backend uses kafkajs.
The topic is defined by the property type called MskACL
. This method can be used when the cluster authentication is set to mTLS
or IAM
+mTLS
. Below you can see the definition of the ACL as well as an example of use.
{
resourceType: <AclResourceTypes>,
resourceName: <String>,
resourcePatternType: <ResourcePatternTypes>,
principal: <String>,
host: <String>,
operation: <AclOperationTypes>,
permissionType: <AclPermissionTypes>,
}
You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in clientAuthentication
: for mTLS use Authentitcation.MTLS
and for IAM use Authentitcation.IAM
. Default value is Authentitcation.MTLS
. The example below uses mTLS as authentication.
- TypeScript
- Python
kafkaApi.setAcl('acl',
{
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-1',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:Cn=MyUser',
host: '*',
operation: AclOperationTypes.CREATE,
permissionType: AclPermissionTypes.ALLOW,
},
cdk.RemovalPolicy.DESTROY,
Authentication.MTLS
);
kafka_api.set_acl("acl", Acl(
resource_type=AclResourceTypes.TOPIC,
resource_name="topic-1",
resource_pattern_type=ResourcePatternTypes.LITERAL,
principal="User:Cn=MyUser",
host="*",
operation=AclOperationTypes.CREATE,
permission_type=AclPermissionTypes.ALLOW
), cdk.RemovalPolicy.DESTROY, Authentication.MTLS)
grantProduce
This method allows to grant a Principal
the permissions to write to a kafka topic.
In case of IAM authentication the method attaches an IAM policy as defined in the AWS documentation scoped only to the topic provided. For mTLS authentication, the method applies an ACL for the provided Common Name
that allow write operations on the topic.
- TypeScript
- Python
kafkaApi.grantProduce('consume', 'foo', Authentication.MTLS, 'User:Cn=bar');
kafka_api.grant_produce("consume", "foo", Authentication.MTLS, "User:Cn=bar")
grantConsume
This method allows to grant a Principal
the permissions to read to a kafka topic.
In case of IAM authentication the method attachs an IAM policy as defined in the AWS documentation scoped only to the topic provided. For mTLS authentication, the method applies an ACL for the provided Common Name
that allow read operations on the topic.
- TypeScript
- Python
kafkaApi.grantConsume('consume', 'foo', Authentication.MTLS, 'User:Cn=bar');
kafka_api.grant_consume("consume", "foo", Authentication.MTLS, "User:Cn=bar")