Skip to main content

EMR on EKS with Flink Streaming

info

Please note that we are working on adding more features to this blueprint such as Flink examples with multiple connectors, Ingress for WebUI, Grafana dashboards etc.

Apache Flink is an open-source, unified stream processing and batch processing framework that was designed to process large amounts of data. It provides fast, reliable, and scalable data processing with fault tolerance and exactly-once semantics. Some of the key features of Flink are:

  • Distributed Processing: Flink is designed to process large volumes of data in a distributed fashion, making it horizontally scalable and fault-tolerant.
  • Stream Processing and Batch Processing: Flink provides APIs for both stream processing and batch processing. This means you can process data in real-time, as it's being generated, or process data in batches.
  • Fault Tolerance: Flink has built-in mechanisms for handling node failures, network partitions, and other types of failures.
  • Exactly-once Semantics: Flink supports exactly-once processing, which ensures that each record is processed exactly once, even in the presence of failures.
  • Low Latency: Flink's streaming engine is optimized for low-latency processing, making it suitable for use cases that require real-time processing of data.
  • Extensibility: Flink provides a rich set of APIs and libraries, making it easy to extend and customize to fit your specific use case.

Architecture

Flink Architecture high level design with EKS.

Flink Design UI

Amazon EMR releases 6.13.0 and higher support Amazon EMR on EKS with Apache Flink, or the EMR Flink Kubernetes operator, as a job submission model for Amazon EMR on EKS. With Amazon EMR on EKS with Apache Flink, you can deploy and manage Flink applications with the Amazon EMR release runtime on your own Amazon EKS clusters. Once you deploy the Flink Kubernetes operator in your Amazon EKS cluster, you can directly submit Flink applications with the operator. The operator manages the lifecycle of Flink applications.

  1. Running, suspending and deleting applications
  2. Stateful and stateless application upgrades
  3. Triggering and managing savepoints
  4. Handling errors, rolling-back broken upgrades

In addition to the above features, EMR Flink Kubernetes operator provides the following additional capabilities:

  1. Launching Flink application using jars in Amazon S3
  2. Monitoring integration with Amazon S3 and Amazon CloudWatch and container log rotation.
  3. Automatically tunes Autoscaler configurations based on historical trends of observed metrics.
  4. Faster Flink Job Restart during scaling or Failure Recovery
  5. IRSA (IAM Roles for Service Accounts) Native Integration
  6. Pyflink support

Flink Operator defines two types of Custom Resources(CR) which are the extensions of the Kubernetes API.

FlinkDeployment

  • FlinkDeployment CR defines Flink Application and Session Cluster deployments.

  • Application deployments manage a single job deployment on a dedicated Flink cluster in Application mode.

  • Session clusters allows you to run multiple Flink Jobs on an existing Session cluster.

    FlinkDeployment in Application modes, Click to toggle content!
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
    namespace: default
    name: basic-example
    spec:
    image: flink:1.16
    flinkVersion: v1_16
    flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    serviceAccount: flink
    jobManager:
    resource:
    memory: "2048m"
    cpu: 1
    taskManager:
    resource:
    memory: "2048m"
    cpu: 1
    job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: stateless
    state: running
info

Session clusters use a similar spec to Application clusters with the only difference that job is not defined in the yaml spec.

info

According to the Flink documentation, it is recommended to use FlinkDeployment in Application mode for production environments.

On top of the deployment types the Flink Kubernetes Operator also supports two modes of deployments: Native and Standalone.

Native

  • Native cluster deployment is the default deployment mode and uses Flink’s built in integration with Kubernetes when deploying the cluster.
  • Flink cluster communicates directly with Kubernetes and allows it to manage Kubernetes resources, e.g. dynamically allocate and de-allocate TaskManager pods.
  • Flink Native can be useful for advanced users who want to build their own cluster management system or integrate with existing management systems.
  • Flink Native allows for more flexibility in terms of job scheduling and execution.
  • For standard Operator use, running your own Flink Jobs in Native mode is recommended.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
...
spec:
...
mode: native

To get the most out of Flink on Kubernetes, here are some best practices to follow:

  • Use the Kubernetes Operator: Install and use the Flink Kubernetes Operator to automate the deployment and management of Flink clusters on Kubernetes.
  • Deploy in dedicated namespaces: Create a separate namespace for the Flink Kubernetes Operator and another one for Flink jobs/workloads. This ensures that the Flink jobs are isolated and have their own resources.
  • Use high-quality storage: Store Flink checkpoints and savepoints in high-quality storage such as Amazon S3 or another durable external storage. These storage options are reliable, scalable, and offer durability for large volumes of data.
  • Optimize resource allocation: Allocate sufficient resources to Flink jobs to ensure optimal performance. This can be done by setting resource requests and limits for Flink containers.
  • Proper network isolation: Use Kubernetes Network Policies to isolate Flink jobs from other workloads running on the same Kubernetes cluster. This ensures that Flink jobs have the required network access without being impacted by other workloads.
  • Configure Flink optimally: Tune Flink settings according to your use case. For example, adjust Flink's parallelism settings to ensure that Flink jobs are scaled appropriately based on the size of the input data.
  • Use checkpoints and savepoints: Use checkpoints for periodic snapshots of Flink application state and savepoints for more advanced use cases such as upgrading or downgrading the application.
  • Store checkpoints and savepoints in the right places: Store checkpoints in distributed file systems or key-value stores like Amazon S3 or another durable external storage. Store savepoints in a durable external storage like Amazon S3.

Flink Operator provides three upgrade modes for Flink jobs. Checkout the Flink upgrade docs for up-to-date information.

  1. stateless: Stateless application upgrades from empty state
  2. last-state: Quick upgrades in any application state (even for failing jobs), does not require a healthy job as it always uses the latest checkpoint information. Manual recovery may be necessary if HA metadata is lost.
  3. savepoint: Use savepoint for upgrade, providing maximal safety and possibility to serve as backup/fork point. The savepoint will be created during the upgrade process. Note that the Flink job needs to be running to allow the savepoint to get created. If the job is in an unhealthy state, the last checkpoint will be used (unless kubernetes.operator.job.upgrade.last-state-fallback.enabled is set to false). If the last checkpoint is not available, the job upgrade will fail.
info

last-state or savepoint are recommended modes for production

Deploying the Solution

👈

Execute Sample Flink job with Karpenter

👈

Cleanup

👈
caution

To avoid unwanted charges to your AWS account, delete all the AWS resources created during this deployment