diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/pom.xml b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/pom.xml new file mode 100644 index 0000000000..13ad18810e --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/pom.xml @@ -0,0 +1,109 @@ + + 4.0.0 + + + spring-cloud-stream-kafka + spring-cloud-stream-kafka + Simple Spring Cloud Stream + jar + + + org.springframework.boot + spring-boot-starter-parent + 2.1.5.RELEASE + + + + 1.8 + Greenwich.SR1 + + + + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + + + + org.springframework.cloud + spring-cloud-stream-schema + + + + org.springframework.cloud + spring-cloud-stream-test-support + test + + + + io.confluent + kafka-avro-serializer + 4.0.0 + + + + org.apache.avro + avro-compiler + 1.8.2 + + + + org.apache.avro + avro-maven-plugin + 1.8.2 + + + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.avro + avro-maven-plugin + 1.8.2 + + + schemas + generate-sources + + schema + protocol + idl-protocol + + + ${project.basedir}/src/main/resources/ + ${project.basedir}/src/main/java/ + + + + + + + + + + confluent + https://packages.confluent.io/maven/ + + + + diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/AvroKafkaApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/AvroKafkaApplication.java new file mode 100644 index 0000000000..47c060c143 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/AvroKafkaApplication.java @@ -0,0 +1,18 @@ +package com.baeldung; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient; + +@SpringBootApplication +@EnableBinding(Processor.class) +@EnableSchemaRegistryClient +public class AvroKafkaApplication { + + public static void main(String[] args) { + SpringApplication.run(AvroKafkaApplication.class, args); + } + +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/config/SchemRegistryConfig.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/config/SchemRegistryConfig.java new file mode 100644 index 0000000000..38ac94d952 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/config/SchemRegistryConfig.java @@ -0,0 +1,18 @@ +package com.baeldung.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.stream.schema.client.ConfluentSchemaRegistryClient; +import org.springframework.cloud.stream.schema.client.SchemaRegistryClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class SchemRegistryConfig { + + @Bean + public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}") String endPoint) { + ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient(); + client.setEndpoint(endPoint); + return client; + } +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/consumer/AvroConsumer.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/consumer/AvroConsumer.java new file mode 100644 index 0000000000..477946dd73 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/consumer/AvroConsumer.java @@ -0,0 +1,21 @@ +package com.baeldung.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.stereotype.Service; + +import com.baeldung.schema.Employee; + +@Service +public class AvroConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(AvroConsumer.class); + + @StreamListener(Processor.INPUT) + public void consumeEmployeeDetails(Employee employeeDetails) { + LOGGER.info("Let's process employee details: {}", employeeDetails); + } + +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/controller/AvroController.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/controller/AvroController.java new file mode 100644 index 0000000000..b98b27c9fe --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/controller/AvroController.java @@ -0,0 +1,22 @@ +package com.baeldung.controller; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +import com.baeldung.producer.AvroProducer; + +@RestController +public class AvroController { + + @Autowired + private AvroProducer avroProducer; + + @PostMapping("/employees/{id}/{firstName}/{lastName}") + public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName, @PathVariable String lastName) { + avroProducer.produceEmployeeDetails(id, firstName, lastName); + return "Sent employee details to consumer"; + } + +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/producer/AvroProducer.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/producer/AvroProducer.java new file mode 100644 index 0000000000..ff7729dd8c --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/java/com/baeldung/producer/AvroProducer.java @@ -0,0 +1,42 @@ +package com.baeldung.producer; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import com.baeldung.schema.Employee; +import com.baeldung.schema.EmployeeKey; + +@Service +public class AvroProducer { + + @Autowired + private Processor processor; + + public void produceEmployeeDetails(int empId, String firstName, String lastName) { + + // creating employee details + Employee employee = new Employee(); + employee.setId(empId); + employee.setFirstName(firstName); + employee.setLastName(lastName); + employee.setDepartment("IT"); + employee.setDesignation("Engineer"); + + // creating partition key for kafka topic + EmployeeKey employeeKey = new EmployeeKey(); + employeeKey.setId(empId); + employeeKey.setDepartmentName("IT"); + + Message message = MessageBuilder.withPayload(employee) + .setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey) + .build(); + + processor.output() + .send(message); + } + +} diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/resources/application.yaml b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/resources/application.yaml new file mode 100644 index 0000000000..2e30c07374 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/resources/application.yaml @@ -0,0 +1,29 @@ +spring: + cloud: + stream: + default: + producer: + useNativeEncoding: true + consumer: + useNativeEncoding: true + bindings: + input: + destination: employee-details + content-type: application/*+avro + group: group-1 + concurrency: 3 + output: + destination: employee-details + content-type: application/*+avro + kafka: + binder: + producer-properties: + key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + schema.registry.url: http://localhost:8081 + consumer-properties: + key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + schema.registry.url: http://localhost:8081 + specific.avro.reader: true + \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/resources/employee-key-schema.avsc b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/resources/employee-key-schema.avsc new file mode 100644 index 0000000000..d18d657e99 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/resources/employee-key-schema.avsc @@ -0,0 +1,14 @@ +{ + "type": "record", + "name": "EmployeeKey", + "namespace": "com.baeldung.schema", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "departmentName", + "type": "string" + }] +} \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/resources/employee-schema.avsc b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/resources/employee-schema.avsc new file mode 100644 index 0000000000..2abf57e4a2 --- /dev/null +++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka/src/main/resources/employee-schema.avsc @@ -0,0 +1,29 @@ +{ + "type": "record", + "name": "Employee", + "namespace": "com.baeldung.schema", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "firstName", + "type": "string" + }, + { + "name": "lastName", + "type": "string" + }, + { + "name": "department", + "type": "string", + "default": "IT " + }, + { + "name": "designation", + "type": "string", + "default": "Software Engineer" + } + ] +} \ No newline at end of file