New sample for message routing callback

This commit is contained in:
Soby Chacko
2021-07-26 16:30:56 -04:00
parent 786bb72248
commit 1aeaba8e9a
8 changed files with 267 additions and 0 deletions

View File

@@ -34,6 +34,7 @@
<module>kafka-native-serialization</module>
<module>function-based-stream-app-samples</module>
<module>kafka-security-samples</module>
<module>routing-samples</module>
</modules>
<properties>

View File

@@ -0,0 +1,24 @@
## Spring Cloud Stream Message Routing Sample
Demo for MessageRoutingCallback.
### Running the app
Make sure that you have Kafka running.
Run the main class - `MessageRoutingApplication`. You should see the following in the console output.
```
Menu(id=null, name=null)
```
and
```
Order(id=null, price=null)
```
The `CustoMessageRoutingCallback` examines each method sent to the routing function (`functionRouter`) and then route to the appropriate function.
The main class sends two messages - one for `Menu` and another for `Order` - to the `functionRouter-in-0` topic which is intercepted by the routing callback for invoking the correct function.

View File

@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<artifactId>message-routing-callback</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>message-routing-callback</name>
<description>Demo for spring cloud stream app that have many consumers pointing to to the same topic</description>
<properties>
<spring-cloud.version>2020.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,28 @@
package demo;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import java.util.Arrays;
import java.util.Optional;
/**
* A custom implementation of {@link MessageRoutingCallback} that will route each message to its corresponding binding channel.
* This is the equivalence of {@link StreamListener#condition()}
*/
public class CustomMessageRoutingCallback implements MessageRoutingCallback {
public static final String EVENT_TYPE = "event_type";
@Override
public String functionDefinition(Message<?> message) {
return Optional.of(message.getHeaders())
.filter(headers -> headers.containsKey(EVENT_TYPE))
.map(messageHeaders -> messageHeaders.get(EVENT_TYPE))
.map(eventType -> EventTypeToBinding.valueOf((String)eventType))
.map(EventTypeToBinding::getBinding)
.orElseThrow(() -> new IllegalStateException("event_type was not recognized !! supported values are " + Arrays.toString(EventTypeToBinding.values())));
}
}

View File

@@ -0,0 +1,13 @@
package demo;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@Getter
@RequiredArgsConstructor
public enum EventTypeToBinding {
ORDER_EVENT("orderConsumer"),
MENU_EVENT("menuConsumer");
private final String binding;
}

View File

@@ -0,0 +1,100 @@
package demo;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@Slf4j
@SpringBootApplication
public class MessageRoutingApplication {
public static void main(String[] args) {
final ConfigurableApplicationContext run = SpringApplication.run(MessageRoutingApplication.class, args);
final KafkaTemplate kafkaTemplate = run.getBean(KafkaTemplate.class);
kafkaTemplate.setDefaultTopic("functionRouter-in-0");
Message<Menu> menuMessage =
MessageBuilder.withPayload(new Menu())
.setHeader("event_type", "MENU_EVENT").build();
kafkaTemplate.send(menuMessage);
Message<Order> orderMessage =
MessageBuilder.withPayload(new Order())
.setHeader("event_type", "ORDER_EVENT").build();
kafkaTemplate.send(orderMessage);
System.out.println();
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public MessageRoutingCallback messageRoutingCallback() {
return new CustomMessageRoutingCallback();
}
@Bean
public Consumer<Order> orderConsumer(){
return order -> log.info(order.toString());
}
@Bean
public Consumer<Menu> menuConsumer(){
return menu -> log.info(menu.toString());
}
@Bean
public Supplier<Message<Menu>> supply() {
return () -> MessageBuilder.withPayload(new Menu()).setHeader("event_type", "MENU_EVENT").build();
}
}
@Data
class Order {
private String id;
private Double price;
}
@Data
class Menu {
private String id;
private String name;
}

View File

@@ -0,0 +1,6 @@
spring:
cloud:
stream:
function:
routing:
enabled: true

15
routing-samples/pom.xml Normal file
View File

@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>routing-samples</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>routing-samples</name>
<description>Collection of Spring Cloud Stream Routing Samples</description>
<modules>
<module>message-routing-callback</module>
</modules>
</project>