Skip to content

OpenSearch

Unstable API

0.8.0

@project-lakechain/opensearch-storage-connector

TypeScript Icon

The OpenSearch storage connector enables developers to index CloudEvents in an Amazon OpenSearch domain at scale within their pipelines. This connector uses AWS Firehose to buffer events and store them in batch to OpenSearch using a serverless architecture.


🗄ī¸ Indexing Documents

To use the OpenSearch storage connector, you import it in your CDK stack, and connect it to a data source providing documents.

import { OpenSearchStorageConnector } from '@project-lakechain/opensearch-storage-connector';
import { CacheStorage } from '@project-lakechain/core';
class Stack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string) {
const cache = new CacheStorage(this, 'Cache');
// Sample VPC.
const vpc = new ec2.Vpc(this, 'Vpc');
// The OpenSearch domain.
const domain = // ...
// Create the OpenSearch storage connector.
const connector = new OpenSearchStorageConnector.Builder()
.withScope(this)
.withIdentifier('OpenSearchStorage')
.withCacheStorage(cache)
.withSource(source)
.withDomain(domain)
.withVpc(vpc)
.withIndexName('document-index')
.build();
}
}


Buffering Hints

This connector creates an AWS Firehose delivery stream to buffer events before sending them to OpenSearch. You can customize the way that the connector will buffer events by specifying optional buffering hints.

ℹī¸ The buffering hints are set to 10MB or 60s by default.

const connector = new OpenSearchStorageConnector.Builder()
.withScope(this)
.withIdentifier('OpenSearchStorage')
.withCacheStorage(cache)
.withSource(source)
.withDomain(domain)
.withVpc(vpc)
.withIndexName('document-index')
.withBufferingHints({
intervalInSeconds: 60,
sizeInMBs: 50
})
.build();


Index Rotation

You can also configure the rotation period of the OpenSearch index that is applied by AWS Firehose.

ℹī¸ The index rotation is set to NoRotation by default.

import {
OpenSearchStorageConnector,
IndexRotationPeriod
} from '@project-lakechain/opensearch-storage-connector';
const connector = new OpenSearchStorageConnector.Builder()
.withScope(this)
.withIdentifier('OpenSearchStorage')
.withCacheStorage(cache)
.withSource(source)
.withDomain(domain)
.withVpc(vpc)
.withIndexName('document-index')
.withIndexRotationPeriod(IndexRotationPeriod.OneDay)
.build();

Possible values for the index rotation period are NoRotation, OneHour, OneDay, OneWeek, and OneMonth.



ℹī¸ Limits

This middleware forwards each discrete document events to OpenSearch using the Default Document Id Format, which means that Firehose will generate a new unique document ID for each record based on a unique internal identifier.

This identifier remains stable across delivery attempts. However if you resubmit the same document in the pipeline (having the same URI), Firehose will generate a new unique document identifier, resulting in the duplication of the document in the OpenSearch index.

Another limitation of this middleware is that it currently only supports OpenSearch domains, and not OpenSearch Serverless collections.



🏗ī¸ Architecture

This middleware uses AWS Firehose to buffer incoming document events from other middlewares in a pipeline, and uses the AWS Firehose native integration with OpenSearch to index documents.

OpenSearch Storage Connector Architecture



🏷ī¸ Properties


Supported Inputs
Mime TypeDescription
*/*This middleware supports any type of documents.
Supported Outputs

This middleware does not produce any output.

Supported Compute Types
TypeDescription
CPUThis middleware only supports CPU compute.


📖 Examples