Class KafkaUtils
- java.lang.Object
-
- com.amazonaws.athena.connectors.kafka.KafkaUtils
-
public class KafkaUtils extends Object
-
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description protected static Path
copyCertificatesFromS3ToTempFolder(Map<String,String> configOptions)
Downloads the truststore and keystore certificates from S3 to temp directory.static SplitParameters
createSplitParam(Map<String,String> params)
Translates Split parameters as readable pojo format.static org.apache.kafka.clients.consumer.Consumer<String,org.apache.avro.generic.GenericRecord>
getAvroKafkaConsumer(Map<String,String> configOptions)
static org.apache.kafka.clients.consumer.Consumer<String,String>
getKafkaConsumer(Map<String,String> configOptions)
Creates Kafka consumer instance.static org.apache.kafka.clients.consumer.Consumer<String,TopicResultSet>
getKafkaConsumer(org.apache.arrow.vector.types.pojo.Schema schema, Map<String,String> configOptions)
Creates instance of Kafka consumer.static Properties
getKafkaProperties(Map<String,String> configOptions)
Creates the required settings for kafka consumer.static org.apache.kafka.clients.consumer.Consumer<String,com.google.protobuf.DynamicMessage>
getProtobufKafkaConsumer(Map<String,String> configOptions)
protected static Properties
setSaslPlainAuthKafkaProperties(Properties properties, Map<String,String> configOptions)
Creates the required SASL based settings for kafka consumer.protected static Properties
setSaslSslAuthKafkaProperties(Properties properties, Map<String,String> configOptions)
Creates the required SASL based settings for kafka consumer.protected static Properties
setScramPlainTextAuthKafkaProperties(Properties properties, Map<String,String> configOptions)
protected static Properties
setScramSSLAuthKafkaProperties(Properties properties, Map<String,String> configOptions)
Creates the required SASL based settings for kafka consumer.protected static Properties
setSSLAuthKafkaProperties(Properties properties, Map<String,String> configOptions)
Creates the required SSL based settings for kafka consumer.static org.apache.arrow.vector.types.pojo.ArrowType
toArrowType(String dataType)
Converts string data type name to ArrowType.
-
-
-
Method Detail
-
getKafkaConsumer
public static org.apache.kafka.clients.consumer.Consumer<String,String> getKafkaConsumer(Map<String,String> configOptions) throws Exception
Creates Kafka consumer instance.- Returns:
- Throws:
Exception
-
getKafkaConsumer
public static org.apache.kafka.clients.consumer.Consumer<String,TopicResultSet> getKafkaConsumer(org.apache.arrow.vector.types.pojo.Schema schema, Map<String,String> configOptions) throws Exception
Creates instance of Kafka consumer. In the properties we have to specify the Deserializer type, we are supporting only JSON or CSV data from kafka topic. Therefor, to transform topic data into pojo we need to specify Deserializer type to KafkaConsumer. Schema metadata can tell use about the topic data type i.e. dataFormat = json | csv.
-
getAvroKafkaConsumer
public static org.apache.kafka.clients.consumer.Consumer<String,org.apache.avro.generic.GenericRecord> getAvroKafkaConsumer(Map<String,String> configOptions) throws Exception
- Throws:
Exception
-
getProtobufKafkaConsumer
public static org.apache.kafka.clients.consumer.Consumer<String,com.google.protobuf.DynamicMessage> getProtobufKafkaConsumer(Map<String,String> configOptions) throws Exception
- Throws:
Exception
-
getKafkaProperties
public static Properties getKafkaProperties(Map<String,String> configOptions) throws Exception
Creates the required settings for kafka consumer.- Returns:
Properties
- Throws:
Exception
- -Exception
-
setSSLAuthKafkaProperties
protected static Properties setSSLAuthKafkaProperties(Properties properties, Map<String,String> configOptions) throws Exception
Creates the required SSL based settings for kafka consumer.- Parameters:
properties
- - common properties for kafka consumer- Returns:
Properties
- Throws:
Exception
- -Exception
-
setScramSSLAuthKafkaProperties
protected static Properties setScramSSLAuthKafkaProperties(Properties properties, Map<String,String> configOptions) throws Exception
Creates the required SASL based settings for kafka consumer.- Parameters:
properties
- - SASL/SCRAM properties for kafka consumer- Returns:
Properties
- Throws:
Exception
- -Exception
-
setScramPlainTextAuthKafkaProperties
protected static Properties setScramPlainTextAuthKafkaProperties(Properties properties, Map<String,String> configOptions) throws Exception
- Throws:
Exception
-
setSaslPlainAuthKafkaProperties
protected static Properties setSaslPlainAuthKafkaProperties(Properties properties, Map<String,String> configOptions) throws Exception
Creates the required SASL based settings for kafka consumer.- Parameters:
properties
- - SASL/PLAINTEXT properties for kafka consumer- Returns:
Properties
- Throws:
Exception
- -Exception
-
setSaslSslAuthKafkaProperties
protected static Properties setSaslSslAuthKafkaProperties(Properties properties, Map<String,String> configOptions) throws Exception
Creates the required SASL based settings for kafka consumer.- Parameters:
properties
- - SASL/SSL properties for kafka consumer- Returns:
Properties
- Throws:
Exception
- -Exception
-
copyCertificatesFromS3ToTempFolder
protected static Path copyCertificatesFromS3ToTempFolder(Map<String,String> configOptions) throws Exception
Downloads the truststore and keystore certificates from S3 to temp directory.
-
createSplitParam
public static SplitParameters createSplitParam(Map<String,String> params)
Translates Split parameters as readable pojo format.- Parameters:
params
- - the properties for split object- Returns:
SplitParameters
-
toArrowType
public static org.apache.arrow.vector.types.pojo.ArrowType toArrowType(String dataType)
Converts string data type name to ArrowType. After pulling schema from glue schema registry we use this method to convert the data type in glue schema to ArrowType.- Parameters:
dataType
- - the column name of schema- Returns:
ArrowType
-
-