Initial commit

This commit is contained in:
Amrut Prabhu
2019-02-13 15:59:43 +01:00
commit 9cf949e537
10 changed files with 522 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.idea/*
*.iml

30
pom.xml Normal file
View File

@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.amrut.prabhu</groupId>
<artifactId>kafka-workouts</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,55 @@
package com.amrut.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static Logger logger = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) {
String MY_GROUP_ID = "My_App";
String BOOT_STRAP_SERVER = "127.0.0.1:9092";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOT_STRAP_SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, MY_GROUP_ID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//create kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//subscribe the topic
consumer.subscribe(Collections.singleton("amrut"));
//read records
while(true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record : consumerRecords){
logger.info("Record ---------------------------------------------------");
logger.info("Partition:-"+record.partition());
logger.info("Offset:-"+record.offset());
logger.info("Key:-"+record.key());
logger.info("Value:-"+record.value());
}
}
}
}

View File

@@ -0,0 +1,62 @@
package com.amrut.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerAssignAndSeek {
public static Logger logger = LoggerFactory.getLogger(ConsumerAssignAndSeek.class);
public static void main(String[] args) {
String BOOT_STRAP_SERVER = "127.0.0.1:9092";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOT_STRAP_SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//create kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//assign a partition the topic
TopicPartition topicPartition = new TopicPartition("amrut", 0);
consumer.assign(Arrays.asList(topicPartition));
//seek to an offset
long offset = 5l;
consumer.seek(topicPartition, offset);
//read 4 records
for (int recordsRead = 0; recordsRead < 4; recordsRead++) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : consumerRecords) {
logger.info("Record ---------------------------------------------------");
logger.info("Partition:-" + record.partition());
logger.info("Offset:-" + record.offset());
logger.info("Key:-" + record.key());
logger.info("Value:-" + record.value());
}
}
}
}

View File

@@ -0,0 +1,115 @@
package com.amrut.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerGroupWithThreads {
public static Logger logger = LoggerFactory.getLogger(ConsumerGroupWithThreads.class);
public static void main(String[] args) {
new ConsumerGroupWithThreads().run();
}
public void run() {
String MY_GROUP_ID = "My_App_2";
String BOOT_STRAP_SERVER = "127.0.0.1:9092";
ConsumerThread consumerThread = new ConsumerThread(BOOT_STRAP_SERVER, MY_GROUP_ID);
Thread thread = new Thread(consumerThread);
thread.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumerThread.shutdown();
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("thread is interrupted");
}
logger.info("Main is exiting");
}
class ConsumerThread implements Runnable {
private Logger logger = LoggerFactory.getLogger(ConsumerThread.class);
private final String MY_GROUP_ID;
private final String BOOT_STRAP_SERVER;
private KafkaConsumer<String, String> consumer;
public ConsumerThread(String BOOT_STRAP_SERVER,
String MY_GROUP_ID) {
this.BOOT_STRAP_SERVER = BOOT_STRAP_SERVER;
this.MY_GROUP_ID = MY_GROUP_ID;
}
@Override
public void run() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOT_STRAP_SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, MY_GROUP_ID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//create kafka consumer
consumer = new KafkaConsumer<String, String>(properties);
//subscribe the topic
consumer.subscribe(Collections.singleton("amrut"));
try {
//read records
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : consumerRecords) {
logger.info("Record ---------------------------------------------------");
logger.info("Partition:-" + record.partition());
logger.info("Offset:-" + record.offset());
logger.info("Key:-" + record.key());
logger.info("Value:-" + record.value());
}
}
} catch (WakeupException e) {
logger.error("Wakeup exception for shutdown");
} finally {
consumer.close();
logger.info("consumer closing");
}
}
public void shutdown() {
// causes the poll function to throw a wakeup exception.
consumer.wakeup();
}
}
}

View File

@@ -0,0 +1,65 @@
package com.amrut.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerWithManualCommit {
public static Logger logger = LoggerFactory.getLogger(ConsumerWithManualCommit.class);
public static void main(String[] args) {
String MY_GROUP_ID = "My_App";
String BOOT_STRAP_SERVER = "127.0.0.1:9092";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOT_STRAP_SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, MY_GROUP_ID);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// properties for manual commit of offsets
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"5");
//create kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//subscribe the topic
consumer.subscribe(Collections.singleton("amrut"));
//read records
while(true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record : consumerRecords){
logger.info("Record ---------------------------------------------------");
logger.info("Partition:-"+record.partition());
logger.info("Offset:-"+record.offset());
logger.info("Key:-"+record.key());
logger.info("Value:-"+record.value());
}
//committing offsets
logger.info("Committing offsets synchronously");
consumer.commitSync();
}
}
}

View File

@@ -0,0 +1,34 @@
package com.amrut.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
//Kafka properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// kafka producer
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
// kafka record
ProducerRecord<String, String> record= new ProducerRecord<String, String>("amrut","message");
// send record -- async
producer.send(record);
//flush data
producer.flush();
//close producer
producer.close();
}
}

View File

@@ -0,0 +1,47 @@
package com.amrut.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerWithCallbackAndKeys {
public static void main(String[] args) {
//Kafka properties
Logger log = LoggerFactory.getLogger(ProducerWithCallbackAndKeys.class);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// kafka record
ProducerRecord<String, String> record = new ProducerRecord<String, String>("amrut", "key","value");
// send record -- async
producer.send(record, (rcdMetaData, exp) -> {
// call every successful send or in case of exception
if(exp==null){
log.info("Offset:- "+rcdMetaData.offset());
log.info("Partition:- "+rcdMetaData.partition());
log.info("Timestamp:- "+rcdMetaData.timestamp());
}else {
log.error("error",exp);
}
});
//flush data
producer.flush();
//close producer
producer.close();
}
}

View File

@@ -0,0 +1,58 @@
package com.amrut.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerWithHighThroughput {
public static void main(String[] args) {
//Kafka properties
Logger log = LoggerFactory.getLogger(ProducerWithHighThroughput.class);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Safe producer properties
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // safe for kafka 2.0 or else use 1
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
// high throughput properties
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,Integer.toString(32*1024)); //32KB
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG,"10"); //10 ms
// kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// kafka record
ProducerRecord<String, String> record = new ProducerRecord<String, String>("amrut", "key", "value");
// send record -- async
producer.send(record, (rcdMetaData, exp) -> {
// call every successful send or in case of exception
if (exp == null) {
log.info("Offset:- " + rcdMetaData.offset());
log.info("Partition:- " + rcdMetaData.partition());
log.info("Timestamp:- " + rcdMetaData.timestamp());
} else {
log.error("error", exp);
}
});
//flush data
producer.flush();
//close producer
producer.close();
}
}

View File

@@ -0,0 +1,54 @@
package com.amrut.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerWithSafety {
public static void main(String[] args) {
//Kafka properties
Logger log = LoggerFactory.getLogger(ProducerWithSafety.class);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Safe producer properties
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // safe for kafka 2.0 or else use 1
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
// kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// kafka record
ProducerRecord<String, String> record = new ProducerRecord<String, String>("amrut", "key", "value");
// send record -- async
producer.send(record, (rcdMetaData, exp) -> {
// call every successful send or in case of exception
if (exp == null) {
log.info("Offset:- " + rcdMetaData.offset());
log.info("Partition:- " + rcdMetaData.partition());
log.info("Timestamp:- " + rcdMetaData.timestamp());
} else {
log.error("error", exp);
}
});
//flush data
producer.flush();
//close producer
producer.close();
}
}