From 62fa3aa11bc695eb23c5e718c4e3a80f24f4ea23 Mon Sep 17 00:00:00 2001 From: Dhawal Kapil Date: Wed, 22 Aug 2018 21:43:17 +0530 Subject: [PATCH] BAEL-1790 Spring Integration Java DSL Tutorlal --- .../baeldung/dsl/JavaDSLFileCopyConfig.java | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 spring-integration/src/main/java/com/baeldung/dsl/JavaDSLFileCopyConfig.java diff --git a/spring-integration/src/main/java/com/baeldung/dsl/JavaDSLFileCopyConfig.java b/spring-integration/src/main/java/com/baeldung/dsl/JavaDSLFileCopyConfig.java new file mode 100644 index 0000000000..7e91345f04 --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/dsl/JavaDSLFileCopyConfig.java @@ -0,0 +1,146 @@ +package com.baeldung.dsl; + +import java.io.File; +import java.util.Scanner; +import java.util.concurrent.TimeUnit; + +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.channel.PriorityChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.core.GenericSelector; +import org.springframework.integration.core.MessageSource; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.file.FileReadingMessageSource; +import org.springframework.integration.file.FileWritingMessageHandler; +import org.springframework.messaging.MessageHandler; + +/** + * JavaDSLFileCopyConfig contains various Integration Flows created from various spring integration components. + * Activate only one flow at a time by un-commenting @Bean annotation from IntegrationFlow beans. + *

+ * Different flows are :
+ * - {@link #fileMover()} - default app - activated
+ * - {@link #fileMoverWithLambda()} - app with file writing expressions as lambda
+ * - {@link #fileMoverWithPriorityChannel()} - app with priority channel
+ * - {@link #fileReader()}, {@link #fileWriter()}, {@link #anotherFileWriter()} - app with bridge + */ +@Configuration +@EnableIntegration +@IntegrationComponentScan +public class JavaDSLFileCopyConfig { + + public static final String INPUT_DIR = "source"; + public static final String OUTPUT_DIR = "target"; + public static final String OUTPUT_DIR2 = "target2"; + + @Bean + public MessageSource sourceDirectory() { + FileReadingMessageSource messageSource = new FileReadingMessageSource(); + messageSource.setDirectory(new File(INPUT_DIR)); + return messageSource; + } + + @Bean + public GenericSelector onlyJpgs() { + return new GenericSelector() { + + @Override + public boolean accept(File source) { + return source.getName() + .endsWith(".jpg"); + } + }; + } + + @Bean + public MessageHandler targetDirectory() { + FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR)); + handler.setExpectReply(false); // end of pipeline, reply not needed + return handler; + } + + @Bean + public IntegrationFlow fileMover() { + return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000))) + .filter(onlyJpgs()) + .handle(targetDirectory()) + .get(); + } + + // @Bean + public IntegrationFlow fileMoverWithLambda() { + return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000))) + .filter(message -> ((File) message).getName() + .endsWith(".jpg")) + .handle(targetDirectory()) + .get(); + } + + @Bean + public PriorityChannel alphabetically() { + return new PriorityChannel(1000, (left, right) -> ((File) left.getPayload()).getName() + .compareTo(((File) right.getPayload()).getName())); + } + + // @Bean + public IntegrationFlow fileMoverWithPriorityChannel() { + return IntegrationFlows.from(sourceDirectory()) + .filter(onlyJpgs()) + .channel("alphabetically") + .handle(targetDirectory()) + .get(); + } + + @Bean + public MessageHandler anotherTargetDirectory() { + FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR2)); + handler.setExpectReply(false); // end of pipeline, reply not needed + return handler; + } + + // @Bean + public IntegrationFlow fileReader() { + return IntegrationFlows.from(sourceDirectory()) + .filter(onlyJpgs()) + .channel("holdingTank") + .get(); + } + + // @Bean + public IntegrationFlow fileWriter() { + return IntegrationFlows.from("holdingTank") + .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20))) + .handle(targetDirectory()) + .get(); + } + + // @Bean + public IntegrationFlow anotherFileWriter() { + return IntegrationFlows.from("holdingTank") + .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10))) + .handle(anotherTargetDirectory()) + .get(); + } + + public static void main(final String... args) { + final AbstractApplicationContext context = new AnnotationConfigApplicationContext(JavaDSLFileCopyConfig.class); + context.registerShutdownHook(); + final Scanner scanner = new Scanner(System.in); + System.out.print("Please enter a string and press : "); + while (true) { + final String input = scanner.nextLine(); + if ("q".equals(input.trim())) { + context.close(); + scanner.close(); + break; + } + } + System.exit(0); + } +}