Skip to main content

Spark CICD Pipeline

Self-mutable CI/CD pipeline for a Spark application based on Amazon EMR runtime.

Overview

The CI/CD pipeline uses CDK Pipeline and provisions all the resources needed to implement a CI/CD pipeline for a Spark application on Amazon EMR, including:

  • A CodePipeline triggered from the branch of the repository you defined in the codeconnection to process the CI/CD tasks
  • A CodeBuild stage to build the CDK assets and run the Spark unit tests
  • A Staging stage to deploy the application stack in the staging environment and run optional integration tests
  • A Production stage to deploy the application stack in the production environment

Spark CI/CD Pipeline

Cross-account deployment

You can use the same account or optionally use different accounts for CI/CD (where this construct is deployed), staging and production (where the application stack is deployed). If using different accounts, bootstrap staging and production accounts with CDK and add a trust relationship from the CI/CD account:

cdk bootstrap --profile staging \
aws://<STAGING_ACCOUNT_ID>/<REGION> \
--trust <CICD_ACCOUNT_ID> \
--cloudformation-execution-policies "POLICY_ARN"

More information is available here

You need to also provide the accounts information in the cdk.json in the form of:

{
"staging": {
"account": "<STAGING_ACCOUNT_ID>",
"region": "<REGION>"
},
"prod": {
"account": "<PROD_ACCOUNT_ID>",
"region": "<REGION>"
}
}

Defining a CDK Stack for the Spark application

The SparkCICDPipeline construct deploys an application stack, which contains your business logic, into staging and production environments. The application stack is a standard CDK stack that you provide. It's expected to be passed via a factory class.

To do this, implement the ApplicationStackFactory and its createStack() method. The createStack() method needs to return a Stack instance within the scope passed to the factory method. This is used to create the application stack within the scope of the CDK Pipeline stage.

The CICDStage parameter is automatically passed by the CDK Pipeline via the factory method and allows you to customize the behavior of the Stack based on the stage. For example, staging stage is used for integration tests so testing a processing job should be done via manually triggering it. In opposition to production stage where the processing job could be automated on a regular basis.

Create your application stack using the factory pattern:

export class EmrApplicationStackFactory extends dsf.utils.ApplicationStackFactory {
createStack(scope: Construct, stage: dsf.utils.CICDStage): cdk.Stack {
return new EmrApplicationStack(scope, 'EmrApplicationStack', stage);
}
}

export class EmrApplicationStack extends cdk.Stack {
constructor(scope: Construct, id: string, stage: dsf.utils.CICDStage) {
super(scope, id);

// DEFINE YOUR APPLICATION STACK HERE
// USE STAGE PARAMETER TO CUSTOMIZE THE STACK BEHAVIOR

if (stage == dsf.utils.CICDStage.PROD) {
// prod only
}
}
}

Use the factory to pass your application stack to the SparkCICDPipeline construct:

export class CICDPipelineStack extends cdk.Stack {
constructor(scope: Construct, id: string) {
super(scope, id);
new dsf.processing.SparkEmrCICDPipeline(this, 'SparkCICDPipeline', {
sparkApplicationName: 'SparkTest',
applicationStackFactory: new EmrApplicationStackFactory(),
source: CodePipelineSource.connection('owner/weekly-job', 'mainline', {
connectionArn: 'arn:aws:codeconnections:eu-west-1:123456789012:connection/aEXAMPLE-8aad-4d5d-8878-dfcab0bc441f'
}),
});
}
}

Unit tests

The construct triggers the unit tests as part of the CI/CD process using the EMR docker image and a fail fast approach. The unit tests are run during the first build step and the entire pipeline stops if the unit tests fail.

Units tests are expected to be run with pytest command after a pip install . is run from the Spark root folder configured via sparkPath.

In your Pytest script, use a Spark session with a local master and client mode as the unit tests run in a local EMR docker container:

spark = (
SparkSession.builder.master("local[1]")
.appName("local-tests")
.config("spark.submit.deployMode", "client")
.config("spark.driver.bindAddress", "127.0.0.1")
.getOrCreate()
)

Integration tests

You can optionally run integration tests as part of the CI/CD process using the AWS CLI in a bash script that return 0 exit code if success and 1 if failure. The integration tests are triggered after the deployment of the application stack in the staging environment.

You can run them via integTestScript path that should point to a bash script. For example:

root
|--spark
| |--integ.sh
|--cdk

integ.sh is a standard bash script using the AWS CLI to validate the application stack. In the following script example, a Step Function from the application stack is triggered and the result of its execution should be successful:

#!/bin/bash
EXECUTION_ARN=$(aws stepfunctions start-execution --state-machine-arn $STEP_FUNCTION_ARN | jq -r '.executionArn')
while true
do
STATUS=$(aws stepfunctions describe-execution --execution-arn $EXECUTION_ARN | jq -r '.status')
if [ $STATUS = "SUCCEEDED" ]; then
exit 0
elif [ $STATUS = "FAILED" ] || [ $STATUS = "TIMED_OUT" ] || [ $STATUS = "ABORTED" ]; then
exit 1
else
sleep 10
continue
fi
done

To use resources that are deployed by the Application Stack like the Step Functions state machine ARN in the previous example:

  1. Create a CfnOutput in your application stack with the value of your resource
export class EmrApplicationStack extends cdk.Stack {
constructor(scope: Construct, id: string, _stage: dsf.utils.CICDStage) {
super(scope, id);

const processingStateMachine = new StateMachine(this, 'ProcessingStateMachine', {
// definition ...
});

new cdk.CfnOutput(this, 'ProcessingStateMachineArn', {
value: processingStateMachine.stateMachineArn,
});
}
}
  1. Pass an environment variable to the SparkCICDPipeline construct in the form of a key/value pair via integTestEnv:
  • Key is the name of the environment variable used in the script: STEP_FUNCTION_ARN in the script example above.
  • Value is the CloudFormation output name from the application stack: ProcessingStateMachineArn in the application stack example above.
  • Add permissions required to run the integration tests script. In this example, states:StartExecution and states:DescribeExecution.
new dsf.processing.SparkEmrCICDPipeline(this, 'SparkCICDPipeline', {
sparkApplicationName: 'SparkTest',
applicationStackFactory: new EmrApplicationStackFactory(),
integTestScript: 'spark/integ.sh',
integTestEnv: {
STEP_FUNCTION_ARN: 'ProcessingStateMachineArn',
},
integTestPermissions: [
new PolicyStatement({
actions: [
'states:StartExecution',
'states:DescribeExecution',
],
resources: ['*'],
}),
],
source: CodePipelineSource.connection('owner/weekly-job', 'mainline', {
connectionArn: 'arn:aws:codeconnections:eu-west-1:123456789012:connection/aEXAMPLE-8aad-4d5d-8878-dfcab0bc441f'
})
});