diff --git a/spring-integration/pom.xml b/spring-integration/pom.xml index 367d25de98..f06cb91ef6 100644 --- a/spring-integration/pom.xml +++ b/spring-integration/pom.xml @@ -68,6 +68,11 @@ spring-integration-security ${spring.version} + + org.springframework.integration + spring-integration-jdbc + ${spring.version} + org.springframework.security @@ -75,6 +80,12 @@ ${spring.version} test + + + com.h2database + h2 + 1.4.197 + @@ -106,7 +117,7 @@ UTF-8 - 5.0.3.RELEASE + 5.0.13.RELEASE 1.1.4.RELEASE 1.4.7 1.1.1 diff --git a/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java b/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java new file mode 100755 index 0000000000..56399c653f --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java @@ -0,0 +1,27 @@ +package com.baeldung.tx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; + +public class ServiceActivator { + + @Autowired + private JdbcTemplate jdbcTemplate; + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + public void checkTestResults(String payload) { + + this.jdbcTemplate.update("insert into STUDENT values(?)", payload); + + if (payload.toLowerCase().startsWith("fail")) { + log.error("Service failure. Test result: {}", payload); + throw new RuntimeException("Service failure."); + } + + log.info("Service success. Test result: {}", payload); + } + +} diff --git a/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java new file mode 100644 index 0000000000..7a58cb3125 --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java @@ -0,0 +1,155 @@ +package com.baeldung.tx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.annotation.Transformer; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.core.MessageSource; +import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.file.FileReadingMessageSource; +import org.springframework.integration.file.filters.SimplePatternFileListFilter; +import org.springframework.integration.file.transformer.FileToStringTransformer; +import org.springframework.integration.scheduling.PollerMetadata; +import org.springframework.integration.transaction.*; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.messaging.MessageChannel; +import org.springframework.transaction.interceptor.TransactionInterceptor; + +import javax.sql.DataSource; +import java.io.File; +import java.util.Scanner; + +@Configuration +@EnableIntegration +public class TxIntegrationConfig { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + public final String INPUT_DIR = System.getProperty("java.io.tmpdir") + "/tx/"; + public final String FILE_PATTERN = "*.txt"; + + @Autowired + private JdbcTemplate jdbcTemplate; + + @Autowired + private DataSource dataSource; + + @Autowired + private TransactionSynchronizationFactory transactionSynchronizationFactory; + + @Autowired + DataSourceTransactionManager txManager; + + @Bean + public MessageChannel inputChannel() { + return new DirectChannel(); + } + + @Bean + @InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata")) + public MessageSource fileReadingMessageSource() { + FileReadingMessageSource sourceReader = new FileReadingMessageSource(); + sourceReader.setDirectory(new File(INPUT_DIR)); + sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN)); + return sourceReader; + } + + @Bean + public PollerMetadata pollerMetadata() { + return Pollers.fixedDelay(5000) + .advice(transactionInterceptor()) + .transactionSynchronizationFactory(transactionSynchronizationFactory) + .get(); + } + + private TransactionInterceptor transactionInterceptor() { + return new TransactionInterceptorBuilder().transactionManager(txManager).build(); + } + + @Bean + public TransactionSynchronizationFactory transactionSynchronizationFactory() { + ExpressionEvaluatingTransactionSynchronizationProcessor processor = + new ExpressionEvaluatingTransactionSynchronizationProcessor(); + + SpelExpressionParser spelParser = new SpelExpressionParser(); + processor.setAfterCommitExpression( + spelParser.parseExpression( + "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); + processor.setAfterRollbackExpression( + spelParser.parseExpression( + "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); + + return new DefaultTransactionSynchronizationFactory(processor); + } + + @Bean + @Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel") + public FileToStringTransformer fileToStringTransformer() { + return new FileToStringTransformer(); + } + + @ServiceActivator(inputChannel = "toServiceChannel") + public void serviceActivator(String payload) { + + jdbcTemplate.update("insert into STUDENT values(?)", payload); + + if (payload.toLowerCase().startsWith("fail")) { + log.error("Service failure. Test result: {} ", payload); + throw new RuntimeException("Service failure."); + } + + log.info("Service success. Test result: {}", payload); + } + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.H2) + .addScript("classpath:table.sql") + .build(); + } + + @Bean + public DataSourceTransactionManager txManager() { + return new DataSourceTransactionManager(dataSource); + } + + public static void main(final String... args) { + final AbstractApplicationContext context = new AnnotationConfigApplicationContext(TxIntegrationConfig.class); + context.registerShutdownHook(); + + final Scanner scanner = new Scanner(System.in); + System.out.print("Integration flow is running. Type q + to quit "); + while (true) { + final String input = scanner.nextLine(); + if ("q".equals(input.trim())) { + context.close(); + scanner.close(); + break; + } + } + System.exit(0); + } + +} + + + diff --git a/spring-integration/src/main/resources/META-INF/spring/integration/spring-integration-tx-context.xml b/spring-integration/src/main/resources/META-INF/spring/integration/spring-integration-tx-context.xml new file mode 100755 index 0000000000..2861826521 --- /dev/null +++ b/spring-integration/src/main/resources/META-INF/spring/integration/spring-integration-tx-context.xml @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration/src/main/resources/table.sql b/spring-integration/src/main/resources/table.sql new file mode 100644 index 0000000000..9ca58f6f27 --- /dev/null +++ b/spring-integration/src/main/resources/table.sql @@ -0,0 +1,4 @@ + +CREATE TABLE IF NOT EXISTS STUDENT ( + TEST VARCHAR(256) +); \ No newline at end of file diff --git a/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java b/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java new file mode 100644 index 0000000000..1bbd16aa4b --- /dev/null +++ b/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java @@ -0,0 +1,56 @@ +package com.baeldung.tx; + +import org.junit.Assert; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; + +public final class TxIntegrationTest { + + private static final String CONTEXT_CONFIG = "classpath:META-INF/spring/integration/spring-integration-tx-context.xml"; + + @Test + public void whenFileDoesntStartWithFail_thenTxSuccessful() throws InterruptedException, IOException { + final AbstractApplicationContext context = + new ClassPathXmlApplicationContext(CONTEXT_CONFIG); + + String fileName = System.getProperty("java.io.tmpdir") + "/tx/test1.txt"; + FileWriter fw = new FileWriter(fileName); + fw.write("PASSED!"); + fw.close(); + + context.registerShutdownHook(); + Thread.sleep(5000); + + File file = new File(fileName + ".PASSED"); + Assert.assertTrue(file.exists()); + } + + @Test + public void whenFileStartsWithFail_thenTxFailed() { + + String fileName = System.getProperty("java.io.tmpdir") + "/tx/test2.txt"; + + try { + final AbstractApplicationContext context = + new ClassPathXmlApplicationContext(CONTEXT_CONFIG); + + FileWriter fw = new FileWriter(fileName); + fw.write("FAILED!"); + fw.close(); + + context.registerShutdownHook(); + Thread.sleep(5000); + } catch (Exception e) { + // Exception is expected, do nothing + } + + File file = new File(fileName + ".FAILED"); + Assert.assertTrue(file.exists()); + } + +}