BAEL-5636 Move the files from patterns/ into patterns-modules/ (#12742)

This commit is contained in:
bipster
2022-09-20 00:49:57 -04:00
committed by GitHub
parent ae78511925
commit 7c63f5cc17
7 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,48 @@
package com.baeldung.seda.apachecamel;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
public class WordCountRoute extends RouteBuilder {
public static final String receiveTextUri = "seda:receiveText?concurrentConsumers=5";
public static final String splitWordsUri = "seda:splitWords?concurrentConsumers=5";
public static final String toLowerCaseUri = "seda:toLowerCase?concurrentConsumers=5";
public static final String countWordsUri = "seda:countWords?concurrentConsumers=5";
public static final String returnResponse = "mock:result";
@Override
public void configure() throws Exception {
from(receiveTextUri).to(splitWordsUri);
from(splitWordsUri).transform(ExpressionBuilder.bodyExpression(s -> s.toString()
.split(" ")))
.to(toLowerCaseUri);
from(toLowerCaseUri).split(body(), new StringListAggregationStrategy())
.transform(ExpressionBuilder.bodyExpression(body -> body.toString()
.toLowerCase()))
.end()
.to(countWordsUri);
from(countWordsUri).transform(ExpressionBuilder.bodyExpression(List.class, body -> body.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))))
.to(returnResponse);
}
}
class StringListAggregationStrategy extends AbstractListAggregationStrategy<String> {
@Override
public String getValue(Exchange exchange) {
return exchange.getIn()
.getBody(String.class);
}
}

View File

@@ -0,0 +1,56 @@
package com.baeldung.seda.springintegration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
@Configuration
public class ChannelConfiguration {
private final TaskExecutor receiveTextChannelThreadPool;
private final TaskExecutor splitWordsChannelThreadPool;
private final TaskExecutor toLowerCaseChannelThreadPool;
private final TaskExecutor countWordsChannelThreadPool;
private final TaskExecutor returnResponseChannelThreadPool;
public ChannelConfiguration(TaskExecutor receiveTextChannelThreadPool, TaskExecutor splitWordsChannelThreadPool, TaskExecutor toLowerCaseChannelThreadPool, TaskExecutor countWordsChannelThreadPool, TaskExecutor returnResponseChannelThreadPool) {
this.receiveTextChannelThreadPool = receiveTextChannelThreadPool;
this.splitWordsChannelThreadPool = splitWordsChannelThreadPool;
this.toLowerCaseChannelThreadPool = toLowerCaseChannelThreadPool;
this.countWordsChannelThreadPool = countWordsChannelThreadPool;
this.returnResponseChannelThreadPool = returnResponseChannelThreadPool;
}
@Bean(name = "receiveTextChannel")
public MessageChannel getReceiveTextChannel() {
return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)
.get();
}
@Bean(name = "splitWordsChannel")
public MessageChannel getSplitWordsChannel() {
return MessageChannels.executor("split-words", splitWordsChannelThreadPool)
.get();
}
@Bean(name = "toLowerCaseChannel")
public MessageChannel getToLowerCaseChannel() {
return MessageChannels.executor("to-lower-case", toLowerCaseChannelThreadPool)
.get();
}
@Bean(name = "countWordsChannel")
public MessageChannel getCountWordsChannel() {
return MessageChannels.executor("count-words", countWordsChannelThreadPool)
.get();
}
@Bean(name = "returnResponseChannel")
public MessageChannel getReturnResponseChannel() {
return MessageChannels.executor("return-response", returnResponseChannelThreadPool)
.get();
}
}

View File

@@ -0,0 +1,81 @@
package com.baeldung.seda.springintegration;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@Configuration
@EnableIntegration
public class IntegrationConfiguration {
private final MessageChannel receiveTextChannel;
private final MessageChannel splitWordsChannel;
private final MessageChannel toLowerCaseChannel;
private final MessageChannel countWordsChannel;
private final MessageChannel returnResponseChannel;
private final Function<String, String[]> splitWordsFunction = sentence -> sentence.split(" ");
private final Function<List<String>, Map<String, Long>> convertArrayListToCountMap = list -> list.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
private final Function<String, String> toLowerCase = String::toLowerCase;
private final MessageGroupProcessor buildMessageWithListPayload = messageGroup -> MessageBuilder.withPayload(messageGroup.streamMessages()
.map(Message::getPayload)
.collect(Collectors.toList()))
.build();
private final ReleaseStrategy listSizeReached = r -> r.size() == r.getSequenceSize();
public IntegrationConfiguration(MessageChannel receiveTextChannel, MessageChannel splitWordsChannel, MessageChannel toLowerCaseChannel, MessageChannel countWordsChannel, MessageChannel returnResponseChannel) {
this.receiveTextChannel = receiveTextChannel;
this.splitWordsChannel = splitWordsChannel;
this.toLowerCaseChannel = toLowerCaseChannel;
this.countWordsChannel = countWordsChannel;
this.returnResponseChannel = returnResponseChannel;
}
@Bean
public IntegrationFlow receiveText() {
return IntegrationFlows.from(receiveTextChannel)
.channel(splitWordsChannel)
.get();
}
@Bean
public IntegrationFlow splitWords() {
return IntegrationFlows.from(splitWordsChannel)
.transform(splitWordsFunction)
.channel(toLowerCaseChannel)
.get();
}
@Bean
public IntegrationFlow toLowerCase() {
return IntegrationFlows.from(toLowerCaseChannel)
.split()
.transform(toLowerCase)
.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)
.outputProcessor(buildMessageWithListPayload))
.channel(countWordsChannel)
.get();
}
@Bean
public IntegrationFlow countWords() {
return IntegrationFlows.from(countWordsChannel)
.transform(convertArrayListToCountMap)
.channel(returnResponseChannel)
.get();
}
}

View File

@@ -0,0 +1,61 @@
package com.baeldung.seda.springintegration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class TaskExecutorConfiguration {
@Bean("receiveTextChannelThreadPool")
public TaskExecutor receiveTextChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("receive-text-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("splitWordsChannelThreadPool")
public TaskExecutor splitWordsChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("split-words-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("toLowerCaseChannelThreadPool")
public TaskExecutor toLowerCaseChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("tto-lower-case-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("countWordsChannelThreadPool")
public TaskExecutor countWordsChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("count-words-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("returnResponseChannelThreadPool")
public TaskExecutor returnResponseChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("return-response-channel-thread-pool");
executor.initialize();
return executor;
}
}

View File

@@ -0,0 +1,12 @@
package com.baeldung.seda.springintegration;
import java.util.Map;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
public Map<String, Long> countWords(String test);
}

View File

@@ -0,0 +1,48 @@
package com.baeldung.seda.apachecamel;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;
import com.baeldung.seda.apachecamel.WordCountRoute;
public class ApacheCamelSedaIntegrationTest extends CamelTestSupport {
@Test
public void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap() throws InterruptedException {
Map<String, Long> expected = new HashMap<>();
expected.put("my", 1L);
expected.put("name", 1L);
expected.put("is", 1L);
expected.put("hesam", 1L);
getMockEndpoint(WordCountRoute.returnResponse).expectedBodiesReceived(expected);
template.sendBody(WordCountRoute.receiveTextUri, "My name is Hesam");
assertMockEndpointsSatisfied();
}
@Test
public void givenTextWithDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap() throws InterruptedException {
Map<String, Long> expected = new HashMap<>();
expected.put("the", 3L);
expected.put("dog", 1L);
expected.put("chased", 1L);
expected.put("rabbit", 1L);
expected.put("into", 1L);
expected.put("jungle", 1L);
getMockEndpoint(WordCountRoute.returnResponse).expectedBodiesReceived(expected);
template.sendBody(WordCountRoute.receiveTextUri, "the dog chased the rabbit into the jungle");
assertMockEndpointsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
RoutesBuilder wordCountRoute = new WordCountRoute();
return wordCountRoute;
}
}

View File

@@ -0,0 +1,50 @@
package com.baeldung.seda.springintegration;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.support.AnnotationConfigContextLoader;
@ExtendWith(SpringExtension.class)
@ContextConfiguration(loader = AnnotationConfigContextLoader.class, classes = { TaskExecutorConfiguration.class, ChannelConfiguration.class, IntegrationConfiguration.class })
@EnableIntegration
@IntegrationComponentScan(basePackages = { "com.baeldung.seda.springintegration" })
public class SpringIntegrationSedaIntegrationTest {
@Autowired
TestGateway testGateway;
@Test
void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
Map<String, Long> actual = testGateway.countWords("My name is Hesam");
Map<String, Long> expected = new HashMap<>();
expected.put("my", 1L);
expected.put("name", 1L);
expected.put("is", 1L);
expected.put("hesam", 1L);
org.junit.Assert.assertEquals(expected, actual);
}
@Test
void givenTextWithDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
Map<String, Long> actual = testGateway.countWords("the dog chased the rabbit into the jungle");
Map<String, Long> expected = new HashMap<>();
expected.put("the", 3L);
expected.put("dog", 1L);
expected.put("chased", 1L);
expected.put("rabbit", 1L);
expected.put("into", 1L);
expected.put("jungle", 1L);
org.junit.Assert.assertEquals(expected, actual);
}
}