diff --git a/pom.xml b/pom.xml index b1fdef2..9c69c77 100644 --- a/pom.xml +++ b/pom.xml @@ -34,6 +34,7 @@ kafka-native-serialization function-based-stream-app-samples kafka-security-samples + routing-samples diff --git a/routing-samples/message-routing-callback/README.adoc b/routing-samples/message-routing-callback/README.adoc new file mode 100644 index 0000000..0c02262 --- /dev/null +++ b/routing-samples/message-routing-callback/README.adoc @@ -0,0 +1,24 @@ +## Spring Cloud Stream Message Routing Sample + +Demo for MessageRoutingCallback. + +### Running the app + +Make sure that you have Kafka running. + +Run the main class - `MessageRoutingApplication`. You should see the following in the console output. + +``` +Menu(id=null, name=null) +``` + +and + +``` +Order(id=null, price=null) +``` + +The `CustoMessageRoutingCallback` examines each method sent to the routing function (`functionRouter`) and then route to the appropriate function. +The main class sends two messages - one for `Menu` and another for `Order` - to the `functionRouter-in-0` topic which is intercepted by the routing callback for invoking the correct function. + + diff --git a/routing-samples/message-routing-callback/pom.xml b/routing-samples/message-routing-callback/pom.xml new file mode 100644 index 0000000..1d44691 --- /dev/null +++ b/routing-samples/message-routing-callback/pom.xml @@ -0,0 +1,80 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.5.2 + + + message-routing-callback + 0.0.1-SNAPSHOT + message-routing-callback + Demo for spring cloud stream app that have many consumers pointing to to the same topic + + 2020.0.3 + + + + org.springframework.cloud + spring-cloud-stream + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-stream + test + test-binder + test-jar + + + org.springframework.kafka + spring-kafka-test + test + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/routing-samples/message-routing-callback/src/main/java/demo/CustomMessageRoutingCallback.java b/routing-samples/message-routing-callback/src/main/java/demo/CustomMessageRoutingCallback.java new file mode 100644 index 0000000..b76b4d3 --- /dev/null +++ b/routing-samples/message-routing-callback/src/main/java/demo/CustomMessageRoutingCallback.java @@ -0,0 +1,28 @@ +package demo; + +import org.springframework.cloud.function.context.MessageRoutingCallback; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; + +import java.util.Arrays; +import java.util.Optional; + +/** + * A custom implementation of {@link MessageRoutingCallback} that will route each message to its corresponding binding channel. + * This is the equivalence of {@link StreamListener#condition()} + */ +public class CustomMessageRoutingCallback implements MessageRoutingCallback { + + public static final String EVENT_TYPE = "event_type"; + + @Override + public String functionDefinition(Message message) { + return Optional.of(message.getHeaders()) + .filter(headers -> headers.containsKey(EVENT_TYPE)) + .map(messageHeaders -> messageHeaders.get(EVENT_TYPE)) + .map(eventType -> EventTypeToBinding.valueOf((String)eventType)) + .map(EventTypeToBinding::getBinding) + .orElseThrow(() -> new IllegalStateException("event_type was not recognized !! supported values are " + Arrays.toString(EventTypeToBinding.values()))); + } +} + diff --git a/routing-samples/message-routing-callback/src/main/java/demo/EventTypeToBinding.java b/routing-samples/message-routing-callback/src/main/java/demo/EventTypeToBinding.java new file mode 100644 index 0000000..4e2b5c8 --- /dev/null +++ b/routing-samples/message-routing-callback/src/main/java/demo/EventTypeToBinding.java @@ -0,0 +1,13 @@ +package demo; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public enum EventTypeToBinding { + ORDER_EVENT("orderConsumer"), + MENU_EVENT("menuConsumer"); + + private final String binding; +} diff --git a/routing-samples/message-routing-callback/src/main/java/demo/MessageRoutingApplication.java b/routing-samples/message-routing-callback/src/main/java/demo/MessageRoutingApplication.java new file mode 100644 index 0000000..4ba9460 --- /dev/null +++ b/routing-samples/message-routing-callback/src/main/java/demo/MessageRoutingApplication.java @@ -0,0 +1,100 @@ +package demo; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.function.context.MessageRoutingCallback; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +@Slf4j +@SpringBootApplication +public class MessageRoutingApplication { + + public static void main(String[] args) { + final ConfigurableApplicationContext run = SpringApplication.run(MessageRoutingApplication.class, args); + final KafkaTemplate kafkaTemplate = run.getBean(KafkaTemplate.class); + kafkaTemplate.setDefaultTopic("functionRouter-in-0"); + + Message menuMessage = + MessageBuilder.withPayload(new Menu()) + .setHeader("event_type", "MENU_EVENT").build(); + + kafkaTemplate.send(menuMessage); + + Message orderMessage = + MessageBuilder.withPayload(new Order()) + .setHeader("event_type", "ORDER_EVENT").build(); + + kafkaTemplate.send(orderMessage); + + System.out.println(); + } + + + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return props; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public MessageRoutingCallback messageRoutingCallback() { + return new CustomMessageRoutingCallback(); + } + + @Bean + public Consumer orderConsumer(){ + return order -> log.info(order.toString()); + } + + @Bean + public Consumer menuConsumer(){ + return menu -> log.info(menu.toString()); + } + + @Bean + public Supplier> supply() { + return () -> MessageBuilder.withPayload(new Menu()).setHeader("event_type", "MENU_EVENT").build(); + } +} + +@Data +class Order { + private String id; + private Double price; +} + +@Data +class Menu { + private String id; + private String name; +} diff --git a/routing-samples/message-routing-callback/src/main/resources/application.yml b/routing-samples/message-routing-callback/src/main/resources/application.yml new file mode 100644 index 0000000..163be55 --- /dev/null +++ b/routing-samples/message-routing-callback/src/main/resources/application.yml @@ -0,0 +1,6 @@ +spring: + cloud: + stream: + function: + routing: + enabled: true diff --git a/routing-samples/pom.xml b/routing-samples/pom.xml new file mode 100644 index 0000000..395ca01 --- /dev/null +++ b/routing-samples/pom.xml @@ -0,0 +1,15 @@ + + + 4.0.0 + io.spring.cloud.stream.sample + routing-samples + 0.0.1-SNAPSHOT + pom + routing-samples + Collection of Spring Cloud Stream Routing Samples + + + message-routing-callback + + +