Class KafkaMetadataHandler
- java.lang.Object
-
- com.amazonaws.athena.connector.lambda.handlers.MetadataHandler
-
- com.amazonaws.athena.connectors.kafka.KafkaMetadataHandler
-
- All Implemented Interfaces:
com.amazonaws.services.lambda.runtime.RequestStreamHandler
public class KafkaMetadataHandler extends MetadataHandler
-
-
Field Summary
-
Fields inherited from class com.amazonaws.athena.connector.lambda.handlers.MetadataHandler
configOptions, DISABLE_SPILL_ENCRYPTION, KMS_KEY_ID_ENV, SPILL_BUCKET_ENV, SPILL_PREFIX_ENV
-
-
Constructor Summary
Constructors Constructor Description KafkaMetadataHandler(Map<String,String> configOptions)
KafkaMetadataHandler(org.apache.kafka.clients.consumer.Consumer<String,String> kafkaConsumer, Map<String,String> configOptions)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description GetSplitsResponse
doGetSplits(BlockAllocator allocator, GetSplitsRequest request)
Creating splits for each partition.GetTableResponse
doGetTable(BlockAllocator blockAllocator, GetTableRequest getTableRequest)
Creates new object of GetTableResponse.ListSchemasResponse
doListSchemaNames(BlockAllocator blockAllocator, ListSchemasRequest listSchemasRequest)
It will list the schema name which is set to default.ListTablesResponse
doListTables(BlockAllocator blockAllocator, ListTablesRequest federationListTablesRequest)
List all the tables.void
getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request, QueryStatusChecker queryStatusChecker)
Since the kafka partition is not a part of the topic schema as well as not part of topic message data, we should not implement this method.List<TopicPartitionPiece>
pieceTopicPartition(long startOffset, long endOffset)
Splits topic partition into smaller piece and calculates the start and end offsets of each piece.-
Methods inherited from class com.amazonaws.athena.connector.lambda.handlers.MetadataHandler
doGetDataSourceCapabilities, doGetQueryPassthroughSchema, doGetTableLayout, doHandleRequest, doPing, enhancePartitionSchema, getSecret, handleRequest, makeEncryptionKey, makeSpillLocation, onPing, resolveSecrets
-
-
-
-
Method Detail
-
doListSchemaNames
public ListSchemasResponse doListSchemaNames(BlockAllocator blockAllocator, ListSchemasRequest listSchemasRequest)
It will list the schema name which is set to default.- Specified by:
doListSchemaNames
in classMetadataHandler
- Parameters:
blockAllocator
- - instance ofBlockAllocator
listSchemasRequest
- - instance ofListSchemasRequest
- Returns:
ListSchemasResponse
-
doListTables
public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTablesRequest federationListTablesRequest)
List all the tables. It pulls all the schema names from a Glue registry.- Specified by:
doListTables
in classMetadataHandler
- Parameters:
blockAllocator
- - instance ofBlockAllocator
federationListTablesRequest
- - instance ofListTablesRequest
- Returns:
ListTablesResponse
-
doGetTable
public GetTableResponse doGetTable(BlockAllocator blockAllocator, GetTableRequest getTableRequest) throws Exception
Creates new object of GetTableResponse. It pulls topic schema from Glue registry and converts into arrow schema.- Specified by:
doGetTable
in classMetadataHandler
- Parameters:
blockAllocator
- - instance ofBlockAllocator
getTableRequest
- - instance ofGetTableRequest
- Returns:
GetTableResponse
- Throws:
Exception
-
getPartitions
public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request, QueryStatusChecker queryStatusChecker)
Since the kafka partition is not a part of the topic schema as well as not part of topic message data, we should not implement this method. There is no physical schema field that had been used to create the topic partitions, therefor we can not add any partition information in GetTableResponse (in the previous lifecycle method doGetTable). As there is no partition information in the topic schema getPartitions method will not be invoked. NOTE that even if we add some fields for the topic partitions, those fields must be added in the table schema, and it will impact on spiller for writing meaningless data for partition column. In fact, for each record we will be receiving from kafka topic, there will be no such column while schema will contain additional field for partition.- Specified by:
getPartitions
in classMetadataHandler
- Parameters:
blockWriter
- - instance ofBlockWriter
request
- - instance ofGetTableLayoutRequest
queryStatusChecker
- - instance ofQueryStatusChecker
-
doGetSplits
public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest request) throws Exception
Creating splits for each partition. We are further dividing each topic partition into more pieces to increase the number of Split to result more parallelism. In the split metadata we are keeping the topic name and partition key as well as the start and end offset indexes for each divided partition parts. This information will be used in RecordHandler to initiate kafka consumer.- Specified by:
doGetSplits
in classMetadataHandler
- Parameters:
allocator
- - instance ofBlockAllocator
request
- - instance ofGetSplitsRequest
- Returns:
GetSplitsResponse
- Throws:
Exception
-
pieceTopicPartition
public List<TopicPartitionPiece> pieceTopicPartition(long startOffset, long endOffset)
Splits topic partition into smaller piece and calculates the start and end offsets of each piece.- Parameters:
startOffset
- - the first offset of topic partitionendOffset
- - the last offset of topic partition- Returns:
List
-
-