Reducer
The Reducer
middleware is an essential flow control mechanism of Project Lakechain, allowing developers to reduce a collection of documents into a single entity. It is a structural element of the framework making it possible to express map-reduce pipelines.
At its core, this middleware allows pipeline builders to group relevant documents into a single semantical envelope, and perform combined operations on them. In combination with other middlewares this unlocks many use-cases, for example, aggregating multiple audio files together to concatenate them, zip a collection of documents on-the-fly, or insert a set of subtitles into a video.
The
Reducer
middleware can aggregate multiple documents based on specific strategies which we document below.
π Strategies
Strategies defines under which conditions you want to reduce documents together. There are two different strategies implemented by this middleware. But before diving into the strategies, letβs first understand the concept of chainId
.
Chain Ids
A chainId
is a unique identifier part of CloudEvents. It identifies a specific execution of a pipeline β sometimes referred to as a chain. This identifier remains stable for the entire lifecycle of a pipeline execution, across all middleware steps.
To illustrate this, letβs picture a simple pipeline where we monitor videos uploaded to an S3 bucket. This pipeline creates subtitles for that video and then merges the subtitles back into the video. It might look like the following.
We say that a new pipeline execution is triggered when a new video is uploaded to the S3 bucket. This specific execution is identified by a chainId
which remains the same across the Transcribe
, Reduce
, FFMPEG
and S3
steps. In a nutshell, itβs a great way to follow the lifecycle of a document processing execution within a pipeline.
Time Windows
The time window strategy reduces events belonging to the same chainId
within a user defined time window. The time window can be comprised between 1 second and 48 hours. When the time window reaches its end, the aggregated events are reduced into a single composite event, and forwarded to the next middlewares in the pipeline.
This strategy is a good fit for scenarios where you donβt necessarily know how many documents will be produced by previous middlewares preceding the Reducer
step.
It starts aggregating documents belonging to the same chainId
when the first document with that identifier is received. For example, if your time window is set to 10 minutes, and the first document is received at 12:00, all documents received until 12:10 having the same chainId
will be aggregated together and forwarded to the next middlewares.
Jitter
The TimeWindowStrategy
allows you to optionally specify a jitter which consists of a random number between zero and an arbitrary value. Using a jitter can be useful to smoothen the aggregation process across multiple chainId
.
For example, if your time window is 10 minutes, and you add a jitter of 30 seconds, each reduce operation will occur after 10 minutes + a random value comprised between zero and 30 seconds.
Unmatched Events
The time window strategy awaits for a time condition to be met. When the time window is reached, all aggregated documents are reduced. If documents with the same chainId
are received after a reduce operation occurred, they will be dismissed.
Usage
To reduce events using the TimeWindowStrategy
, you must import and instantiate the Reducer
middleware as part of your pipeline.
π Below is an example showcasing how to instantiate the reducer using the
TimeWindowStrategy
with a time window of 15 seconds and a jitter of 5 seconds.
Static Counter
The static counter strategy reduces all events belonging to the same chainId
, based on a static counter. It allows you to specify the number of documents to aggregate together before reducing them into a single event.
This strategy is a good fit when you know the exact number of documents that you expect to be reduced.
For example, letβs say that you want to translate a document in french, english, and spanish using the Translate Text Processor, and reduce the translated documents back together to zip them. In this case, you know that you will be expecting exactly 3 documents associated with the translated languages.
Unmatched Events
As the reducer awaits for the static count condition to be met, it will aggregate documents for a period of up to 48 hours. If the counter is not reached after this period, the aggregated documents will be dismissed.
Similarly, if a reduce operation already occurred for a given chain identifier, any subsequent document that may arrive after the count condition has been met will be dismissed.
Usage
To reduce events using the StaticCounterStrategy
, you must import and instantiate the Reducer
middleware as part of your pipeline.
π Below is an example showcasing how to instantiate the reducer using the
StaticCounterStrategy
with a counter of 3.
Conditional Strategy
The conditional strategy reduces events based on a custom user-provided condition. It allows you to define a funclet or a lambda function that gets called back when a new document belonging to a given chainId
is being aggregated. This conditional expression defines when the aggregated events should be reduced.
This strategy is a good fit when you want to control the reduce process based on a specific condition. For example, letβs say that you want to reduce a collection of events based on the metadata of the documents, or even based on a third-party API, you can use the conditional strategy to do that.
Unmatched Events
This strategy allows you to evaluate each aggregated document for a duration of up to 48 hours. If the condition is unmet after this period, the aggregated documents will be dismissed.
If a reduce operation already occurred for a given chain identifier, any subsequent document that may arrive after the condition has been met, and having the same chain identifier, will be dismissed.
Usage
To reduce events using the ConditionalStrategy
, you must import and instantiate the Reducer
middleware as part of your pipeline.
π Below is an example showcasing how to instantiate the reducer using the
ConditionalStrategy
with a custom condition.
Funclet Signature
Funclet expressions use the power of a full programming language to express complex reduce conditional expressions. They are asynchronous and can be defined as TypeScript named functions, anonymous functions, or arrow functions.
A reduce conditional funclet takes 2 arguments. A CloudEvent describing the document that is being handled by the reducer, and a collection of the stored events up until now β excluding the received event. It must return a promise to a boolean value representing the result of the evaluation, true if the reduce operation should occur, false otherwise.
ποΈ Architecture
The architecture implemented by this middleware depends on the selected strategy. Below is a description of the architecture implemented by each strategy.
TimeWindowStrategy
This strategy implements a serverless aggregation architecture based on DynamoDB for document event aggregation, and the EventBridge Scheduler service for scheduling the execution of the reducer for each chainId
group of events when the time window is reached.
StaticCounterStrategy
This strategy also implements a serverless aggregation architecture based on DynamoDB as the document aggregator, and leverages an event-driven approach to count the number of documents received for a specific chainId
using DynamoDB streams.
ConditionalStrategy
The conditional strategy implements a serverless aggregation architecture based on DynamoDB as the document aggregator, and leverages an event-driven approach to evaluate a conditional expression for each received document belonging to the same chainId
.
π·οΈ Properties
Supported Inputs
Mime Type | Description |
---|---|
*/* | The reducer middleware can consume any type of document. |
Supported Outputs
Mime Type | Description |
---|---|
application/cloudevents+json | Composite event. |
Supported Compute Types
Type | Description |
---|---|
CPU | This middleware only supports CPU compute. |
π Examples
- Building a Generative Podcast - Builds a pipeline for creating a generative weekly AWS news podcast.
- Building a Video Chaptering Service - Builds a pipeline for automatic video chaptering generation.
- Bedrock Translation Pipeline - Translates documents using a large-language model hosted on Amazon Bedrock.