Skip to content

Reducer

Unstable API 0.10.0 @project-lakechain/reducer TypeScript

The Reducer middleware is an essential flow control mechanism of Project Lakechain, allowing developers to reduce a collection of documents into a single entity. It is a structural element of the framework making it possible to express map-reduce pipelines.

At its core, this middleware allows pipeline builders to group relevant documents into a single semantical envelope, and perform combined operations on them. In combination with other middlewares this unlocks many use-cases, for example, aggregating multiple audio files together to concatenate them, zip a collection of documents on-the-fly, or insert a set of subtitles into a video.

The Reducer middleware can aggregate multiple documents based on specific strategies which we document below.



πŸ“˜ Strategies

Strategies defines under which conditions you want to reduce documents together. There are two different strategies implemented by this middleware. But before diving into the strategies, let’s first understand the concept of chainId.

Chain Ids

A chainId is a unique identifier part of CloudEvents. It identifies a specific execution of a pipeline β€” sometimes referred to as a chain. This identifier remains stable for the entire lifecycle of a pipeline execution, across all middleware steps.

To illustrate this, let’s picture a simple pipeline where we monitor videos uploaded to an S3 bucket. This pipeline creates subtitles for that video and then merges the subtitles back into the video. It might look like the following.

Example

We say that a new pipeline execution is triggered when a new video is uploaded to the S3 bucket. This specific execution is identified by a chainId which remains the same across the Transcribe, Reduce, FFMPEG and S3 steps. In a nutshell, it’s a great way to follow the lifecycle of a document processing execution within a pipeline.



Time Windows

The time window strategy reduces events belonging to the same chainId within a user defined time window. The time window can be comprised between 1 second and 48 hours. When the time window reaches its end, the aggregated events are reduced into a single composite event, and forwarded to the next middlewares in the pipeline.

This strategy is a good fit for scenarios where you don’t necessarily know how many documents will be produced by previous middlewares preceding the Reducer step.

It starts aggregating documents belonging to the same chainId when the first document with that identifier is received. For example, if your time window is set to 10 minutes, and the first document is received at 12:00, all documents received until 12:10 having the same chainId will be aggregated together and forwarded to the next middlewares.

Timeline

Jitter

The TimeWindowStrategy allows you to optionally specify a jitter which consists of a random number between zero and an arbitrary value. Using a jitter can be useful to smoothen the aggregation process across multiple chainId.

For example, if your time window is 10 minutes, and you add a jitter of 30 seconds, each reduce operation will occur after 10 minutes + a random value comprised between zero and 30 seconds.

Unmatched Events

The time window strategy awaits for a time condition to be met. When the time window is reached, all aggregated documents are reduced. If documents with the same chainId are received after a reduce operation occurred, they will be dismissed.

Usage

To reduce events using the TimeWindowStrategy, you must import and instantiate the Reducer middleware as part of your pipeline.

πŸ’ Below is an example showcasing how to instantiate the reducer using the TimeWindowStrategy with a time window of 15 seconds and a jitter of 5 seconds.

import * as cdk from 'aws-cdk-lib';
import { Reducer, TimeWindowStrategy } from '@project-lakechain/reducer';
class Stack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string) {
const reducer = new Reducer.Builder()
.withScope(this)
.withIdentifier('Reducer')
.withCacheStorage(cache)
.withSources([M1, M2, M3]) // πŸ‘ˆ Specifies data sources.
.withReducerStrategy(new TimeWindowStrategy.Builder().
.withTimeWindow(cdk.Duration.seconds(15))
.withJitter(cdk.Duration.seconds(5))
.build()
)
.build();
}
}


Static Counter

The static counter strategy reduces all events belonging to the same chainId, based on a static counter. It allows you to specify the number of documents to aggregate together before reducing them into a single event.

This strategy is a good fit when you know the exact number of documents that you expect to be reduced.

For example, let’s say that you want to translate a document in french, english, and spanish using the Translate Text Processor, and reduce the translated documents back together to zip them. In this case, you know that you will be expecting exactly 3 documents associated with the translated languages.

Unmatched Events

As the reducer awaits for the static count condition to be met, it will aggregate documents for a period of up to 48 hours. If the counter is not reached after this period, the aggregated documents will be dismissed.

Similarly, if a reduce operation already occurred for a given chain identifier, any subsequent document that may arrive after the count condition has been met will be dismissed.

Usage

To reduce events using the StaticCounterStrategy, you must import and instantiate the Reducer middleware as part of your pipeline.

πŸ’ Below is an example showcasing how to instantiate the reducer using the StaticCounterStrategy with a counter of 3.

import * as cdk from 'aws-cdk-lib';
import { Reducer, StaticCounterStrategy } from '@project-lakechain/reducer';
class Stack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string) {
const reducer = new Reducer.Builder()
.withScope(this)
.withIdentifier('Reducer')
.withCacheStorage(cache)
.withSources([M1, M2, M3]) // πŸ‘ˆ Specifies the sources.
.withReducerStrategy(new StaticCounterStrategy.Builder()
.withEventCount(3)
.build()
)
.build();
}
}


Conditional Strategy

The conditional strategy reduces events based on a custom user-provided condition. It allows you to define a funclet or a lambda function that gets called back when a new document belonging to a given chainId is being aggregated. This conditional expression defines when the aggregated events should be reduced.

This strategy is a good fit when you want to control the reduce process based on a specific condition. For example, let’s say that you want to reduce a collection of events based on the metadata of the documents, or even based on a third-party API, you can use the conditional strategy to do that.

Unmatched Events

This strategy allows you to evaluate each aggregated document for a duration of up to 48 hours. If the condition is unmet after this period, the aggregated documents will be dismissed.

If a reduce operation already occurred for a given chain identifier, any subsequent document that may arrive after the condition has been met, and having the same chain identifier, will be dismissed.

Usage

To reduce events using the ConditionalStrategy, you must import and instantiate the Reducer middleware as part of your pipeline.

πŸ’ Below is an example showcasing how to instantiate the reducer using the ConditionalStrategy with a custom condition.

import * as cdk from 'aws-cdk-lib';
import { CloudEvent, TextMetadata } from '@project-lakechain/sdk';
import { Reducer, ConditionalStrategy } from '@project-lakechain/reducer';
/**
* This conditional expression is called by the reducer middleware
* for every new received event. In this example, we want to reduce
* the events based on the total number of chunks produced by the
* previous middlewares.
* @param events the event to process.
* @param storedEvents the list of events stored in the table.
* @returns a promise resolving to a boolean value.
*/
export const conditional = async (event: CloudEvent, storedEvents: CloudEvent[]) => {
const metadata = event.data().metadata().properties?.attrs as TextMetadata;
// Return a boolean value.
return (storedEvents.length === metadata.chunk?.total);
};
class Stack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string) {
const reducer = new Reducer.Builder()
.withScope(this)
.withIdentifier('Reducer')
.withCacheStorage(cache)
.withSources([M1, M2, M3]) // πŸ‘ˆ Specifies the sources.
.withReducerStrategy(new ConditionalStrategy.Builder()
.withConditional(conditional)
.build()
)
.build();
}
}
Funclet Signature

Funclet expressions use the power of a full programming language to express complex reduce conditional expressions. They are asynchronous and can be defined as TypeScript named functions, anonymous functions, or arrow functions.

A reduce conditional funclet takes 2 arguments. A CloudEvent describing the document that is being handled by the reducer, and a collection of the stored events up until now β€” excluding the received event. It must return a promise to a boolean value representing the result of the evaluation, true if the reduce operation should occur, false otherwise.

type ConditionalExpression = (event: CloudEvent, storedEvents: CloudEvent[]) => Promise<boolean>;


πŸ—οΈ Architecture

The architecture implemented by this middleware depends on the selected strategy. Below is a description of the architecture implemented by each strategy.

TimeWindowStrategy

This strategy implements a serverless aggregation architecture based on DynamoDB for document event aggregation, and the EventBridge Scheduler service for scheduling the execution of the reducer for each chainId group of events when the time window is reached.

Time Window Architecture

StaticCounterStrategy

This strategy also implements a serverless aggregation architecture based on DynamoDB as the document aggregator, and leverages an event-driven approach to count the number of documents received for a specific chainId using DynamoDB streams.

Static Counter Architecture

ConditionalStrategy

The conditional strategy implements a serverless aggregation architecture based on DynamoDB as the document aggregator, and leverages an event-driven approach to evaluate a conditional expression for each received document belonging to the same chainId.

Conditional Strategy Architecture



🏷️ Properties


Supported Inputs
Mime TypeDescription
*/*The reducer middleware can consume any type of document.
Supported Outputs
Mime TypeDescription
application/cloudevents+jsonComposite event.
Supported Compute Types
TypeDescription
CPUThis middleware only supports CPU compute.


πŸ“– Examples