From 16b7d2178086dfccd033a970ed967fdf4d60ac00 Mon Sep 17 00:00:00 2001 From: enpy303 Date: Wed, 18 Sep 2019 08:27:59 +0200 Subject: [PATCH 1/8] BAEL 1383 --- spring-integration/pom.xml | 13 +++- .../com/baeldung/tx/ServiceActivator.java | 27 ++++++++ .../spring-integration-tx-context.xml | 67 +++++++++++++++++++ .../src/main/resources/table.sql | 4 ++ .../com/baeldung/tx/TxIntegrationTest.java | 56 ++++++++++++++++ 5 files changed, 166 insertions(+), 1 deletion(-) create mode 100755 spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java create mode 100755 spring-integration/src/main/resources/META-INF/spring/integration/spring-integration-tx-context.xml create mode 100644 spring-integration/src/main/resources/table.sql create mode 100644 spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java diff --git a/spring-integration/pom.xml b/spring-integration/pom.xml index 367d25de98..f06cb91ef6 100644 --- a/spring-integration/pom.xml +++ b/spring-integration/pom.xml @@ -68,6 +68,11 @@ spring-integration-security ${spring.version} + + org.springframework.integration + spring-integration-jdbc + ${spring.version} + org.springframework.security @@ -75,6 +80,12 @@ ${spring.version} test + + + com.h2database + h2 + 1.4.197 + @@ -106,7 +117,7 @@ UTF-8 - 5.0.3.RELEASE + 5.0.13.RELEASE 1.1.4.RELEASE 1.4.7 1.1.1 diff --git a/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java b/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java new file mode 100755 index 0000000000..874d5d6957 --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java @@ -0,0 +1,27 @@ +package com.baeldung.tx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; + +public class ServiceActivator { + + @Autowired + private JdbcTemplate jdbcTemplate; + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + public void checkTestResults(String payload) { + + this.jdbcTemplate.update("insert into STUDENT values(?)", payload); + + if (payload.toLowerCase().startsWith("fail")) { + log.error("Service failure. Test result: {} ", payload); + throw new RuntimeException("Service failure."); + } + + log.info("Service success. Test result: {}", payload); + } + +} diff --git a/spring-integration/src/main/resources/META-INF/spring/integration/spring-integration-tx-context.xml b/spring-integration/src/main/resources/META-INF/spring/integration/spring-integration-tx-context.xml new file mode 100755 index 0000000000..2861826521 --- /dev/null +++ b/spring-integration/src/main/resources/META-INF/spring/integration/spring-integration-tx-context.xml @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration/src/main/resources/table.sql b/spring-integration/src/main/resources/table.sql new file mode 100644 index 0000000000..9ca58f6f27 --- /dev/null +++ b/spring-integration/src/main/resources/table.sql @@ -0,0 +1,4 @@ + +CREATE TABLE IF NOT EXISTS STUDENT ( + TEST VARCHAR(256) +); \ No newline at end of file diff --git a/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java b/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java new file mode 100644 index 0000000000..dd9f4ab286 --- /dev/null +++ b/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java @@ -0,0 +1,56 @@ +package com.baeldung.tx; + +import org.junit.Assert; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; + +public final class TxIntegrationTest { + + private static final String CONTEXT_CONFIG = "classpath:META-INF/spring/integration/spring-integration-tx-context.xml"; + + @Test + public void whenFileDoesntStartWithFail_thanTxSuccessful() throws InterruptedException, IOException { + final AbstractApplicationContext context = + new ClassPathXmlApplicationContext(CONTEXT_CONFIG); + + String fileName = System.getProperty("java.io.tmpdir") + "/tx/test1.txt"; + FileWriter fw = new FileWriter(fileName); + fw.write("PASSED!"); + fw.close(); + + context.registerShutdownHook(); + Thread.sleep(5000); + + File file = new File(fileName + ".PASSED"); + Assert.assertTrue(file.exists()); + } + + @Test + public void whenFileStartsWithFail_thanTxFailed() { + + String fileName = System.getProperty("java.io.tmpdir") + "/tx/test2.txt"; + + try { + final AbstractApplicationContext context = + new ClassPathXmlApplicationContext(CONTEXT_CONFIG); + + FileWriter fw = new FileWriter(fileName); + fw.write("FAILED!"); + fw.close(); + + context.registerShutdownHook(); + Thread.sleep(5000); + } catch (Exception e) { + // Exception is expected, do nothing + } + + File file = new File(fileName + ".FAILED"); + Assert.assertTrue(file.exists()); + } + +} From e94f7c56c2d1507a21fe447afee26efeb3ad0527 Mon Sep 17 00:00:00 2001 From: enpy303 Date: Tue, 8 Oct 2019 13:01:50 +0200 Subject: [PATCH 2/8] java config added --- .../com/baeldung/tx/TxIntegrationConfig.java | 156 ++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java diff --git a/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java new file mode 100644 index 0000000000..308b00d8e9 --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java @@ -0,0 +1,156 @@ +package com.baeldung.tx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.annotation.Transformer; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.core.MessageSource; +import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.file.FileReadingMessageSource; +import org.springframework.integration.file.filters.SimplePatternFileListFilter; +import org.springframework.integration.file.transformer.FileToStringTransformer; +import org.springframework.integration.scheduling.PollerMetadata; +import org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory; +import org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor; +import org.springframework.integration.transaction.TransactionInterceptorBuilder; +import org.springframework.integration.transaction.TransactionSynchronizationFactory; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.messaging.MessageChannel; +import org.springframework.transaction.interceptor.TransactionInterceptor; + +import javax.sql.DataSource; +import java.io.File; +import java.util.Scanner; + +@Configuration +@EnableIntegration +public class TxIntegrationConfig { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + public final String INPUT_DIR = "/tmp/tx/"; + public final String FILE_PATTERN = "*.txt"; + + @Autowired + private JdbcTemplate jdbcTemplate; + + @Autowired + private DataSource dataSource; + + @Autowired + private TransactionSynchronizationFactory transactionSynchronizationFactory; + + @Autowired + DataSourceTransactionManager txManager; + + @Bean + public MessageChannel inputChannel() { + return new DirectChannel(); + } + + @Bean + @InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata")) + public MessageSource fileReadingMessageSource() { + FileReadingMessageSource sourceReader = new FileReadingMessageSource(); + sourceReader.setDirectory(new File(INPUT_DIR)); + sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN)); + return sourceReader; + } + + @Bean + public PollerMetadata pollerMetadata() { + return Pollers + .fixedDelay(5000) + .advice(transactionInterceptor()) + .transactionSynchronizationFactory(transactionSynchronizationFactory) + .get(); + } + + private TransactionInterceptor transactionInterceptor() { + return new TransactionInterceptorBuilder() + .transactionManager(txManager) + .build(); + } + + @Bean + public TransactionSynchronizationFactory transactionSynchronizationFactory(){ + ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = + new ExpressionEvaluatingTransactionSynchronizationProcessor(); + transactionSynchronizationProcessor.setAfterCommitExpression( + new LiteralExpression("payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); + transactionSynchronizationProcessor.setAfterRollbackExpression( + new LiteralExpression("payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); + return new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor); + } + + @Bean + @Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel") + public FileToStringTransformer fileToStringTransformer() { + return new FileToStringTransformer(); + } + + @ServiceActivator(inputChannel = "toServiceChannel") + public void serviceActivator(String payload) { + + jdbcTemplate.update("insert into STUDENT values(?)", payload); + + if (payload.toLowerCase().startsWith("fail")) { + log.error("Service failure. Test result: {} ", payload); + throw new RuntimeException("Service failure."); + } + + log.info("Service success. Test result: {}", payload); + } + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.H2) + .addScript("classpath:table.sql") + .build(); + } + + @Bean + public DataSourceTransactionManager txManager() { + return new DataSourceTransactionManager(dataSource); + } + + public static void main(final String... args) { + final AbstractApplicationContext context = new AnnotationConfigApplicationContext(TxIntegrationConfig.class); + context.registerShutdownHook(); + + final Scanner scanner = new Scanner(System.in); + System.out.print("Integration flow is running. Type q + to quit "); + while (true) { + final String input = scanner.nextLine(); + if ("q".equals(input.trim())) { + context.close(); + scanner.close(); + break; + } + } + System.exit(0); + } + +} + + + From e07705f3278efc25fc175f7c756244198dab6859 Mon Sep 17 00:00:00 2001 From: enpy Date: Tue, 8 Oct 2019 14:56:28 +0200 Subject: [PATCH 3/8] Update spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java Co-Authored-By: KevinGilmore --- .../src/main/java/com/baeldung/tx/ServiceActivator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java b/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java index 874d5d6957..56399c653f 100755 --- a/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java +++ b/spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java @@ -17,7 +17,7 @@ public class ServiceActivator { this.jdbcTemplate.update("insert into STUDENT values(?)", payload); if (payload.toLowerCase().startsWith("fail")) { - log.error("Service failure. Test result: {} ", payload); + log.error("Service failure. Test result: {}", payload); throw new RuntimeException("Service failure."); } From e2f6729c5e7b44f496c7a0c935a1223702e1dc14 Mon Sep 17 00:00:00 2001 From: enpy Date: Tue, 8 Oct 2019 14:56:38 +0200 Subject: [PATCH 4/8] Update spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java Co-Authored-By: KevinGilmore --- .../src/test/java/com/baeldung/tx/TxIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java b/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java index dd9f4ab286..20ed93b631 100644 --- a/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java +++ b/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java @@ -14,7 +14,7 @@ public final class TxIntegrationTest { private static final String CONTEXT_CONFIG = "classpath:META-INF/spring/integration/spring-integration-tx-context.xml"; @Test - public void whenFileDoesntStartWithFail_thanTxSuccessful() throws InterruptedException, IOException { + public void whenFileDoesntStartWithFail_thenTxSuccessful() throws InterruptedException, IOException { final AbstractApplicationContext context = new ClassPathXmlApplicationContext(CONTEXT_CONFIG); From 0cb1f8f4cb6d20e59ed99471c5c6067001e2b85c Mon Sep 17 00:00:00 2001 From: enpy Date: Tue, 8 Oct 2019 14:56:44 +0200 Subject: [PATCH 5/8] Update spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java Co-Authored-By: KevinGilmore --- .../src/test/java/com/baeldung/tx/TxIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java b/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java index 20ed93b631..1bbd16aa4b 100644 --- a/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java +++ b/spring-integration/src/test/java/com/baeldung/tx/TxIntegrationTest.java @@ -31,7 +31,7 @@ public final class TxIntegrationTest { } @Test - public void whenFileStartsWithFail_thanTxFailed() { + public void whenFileStartsWithFail_thenTxFailed() { String fileName = System.getProperty("java.io.tmpdir") + "/tx/test2.txt"; From 576929f990a59afd8411f865b880067850752483 Mon Sep 17 00:00:00 2001 From: enpy303 Date: Fri, 11 Oct 2019 00:24:35 +0200 Subject: [PATCH 6/8] java config spel fix --- .../java/com/baeldung/tx/TxIntegrationConfig.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java index 308b00d8e9..48fbfe9322 100644 --- a/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java +++ b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java @@ -7,7 +7,7 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.AbstractApplicationContext; -import org.springframework.expression.common.LiteralExpression; +import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.ServiceActivator; @@ -86,13 +86,14 @@ public class TxIntegrationConfig { } @Bean - public TransactionSynchronizationFactory transactionSynchronizationFactory(){ + public TransactionSynchronizationFactory transactionSynchronizationFactory() { ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = - new ExpressionEvaluatingTransactionSynchronizationProcessor(); - transactionSynchronizationProcessor.setAfterCommitExpression( - new LiteralExpression("payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); - transactionSynchronizationProcessor.setAfterRollbackExpression( - new LiteralExpression("payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); + new ExpressionEvaluatingTransactionSynchronizationProcessor(); + SpelExpressionParser spelExpressionParser = new SpelExpressionParser(); + transactionSynchronizationProcessor.setAfterCommitExpression(spelExpressionParser.parseExpression( + "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); + transactionSynchronizationProcessor.setAfterRollbackExpression(spelExpressionParser.parseExpression( + "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); return new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor); } From c653a270c06c224a3dc4e2a3ad7b35fdcbbc7bd5 Mon Sep 17 00:00:00 2001 From: enpy303 Date: Fri, 11 Oct 2019 11:50:28 +0200 Subject: [PATCH 7/8] formatting --- .../com/baeldung/tx/TxIntegrationConfig.java | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java index 48fbfe9322..a5dedeafed 100644 --- a/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java +++ b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java @@ -41,7 +41,7 @@ public class TxIntegrationConfig { private final Logger log = LoggerFactory.getLogger(this.getClass()); - public final String INPUT_DIR = "/tmp/tx/"; + public final String INPUT_DIR = System.getProperty("java.io.tmpdir") + "/tx/"; public final String FILE_PATTERN = "*.txt"; @Autowired @@ -72,28 +72,25 @@ public class TxIntegrationConfig { @Bean public PollerMetadata pollerMetadata() { - return Pollers - .fixedDelay(5000) - .advice(transactionInterceptor()) - .transactionSynchronizationFactory(transactionSynchronizationFactory) - .get(); + return Pollers.fixedDelay(5000) + .advice(transactionInterceptor()) + .transactionSynchronizationFactory(transactionSynchronizationFactory) + .get(); } private TransactionInterceptor transactionInterceptor() { - return new TransactionInterceptorBuilder() - .transactionManager(txManager) - .build(); + return new TransactionInterceptorBuilder().transactionManager(txManager).build(); } @Bean public TransactionSynchronizationFactory transactionSynchronizationFactory() { ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = - new ExpressionEvaluatingTransactionSynchronizationProcessor(); + new ExpressionEvaluatingTransactionSynchronizationProcessor(); SpelExpressionParser spelExpressionParser = new SpelExpressionParser(); transactionSynchronizationProcessor.setAfterCommitExpression(spelExpressionParser.parseExpression( - "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); + "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); transactionSynchronizationProcessor.setAfterRollbackExpression(spelExpressionParser.parseExpression( - "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); + "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); return new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor); } From d937267ed1cf48b9684af9cbf79a37f50b1584f4 Mon Sep 17 00:00:00 2001 From: enpy303 Date: Sat, 12 Oct 2019 15:39:04 +0200 Subject: [PATCH 8/8] formatting enhancements --- .../com/baeldung/tx/TxIntegrationConfig.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java index a5dedeafed..7a58cb3125 100644 --- a/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java +++ b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java @@ -20,10 +20,7 @@ import org.springframework.integration.file.FileReadingMessageSource; import org.springframework.integration.file.filters.SimplePatternFileListFilter; import org.springframework.integration.file.transformer.FileToStringTransformer; import org.springframework.integration.scheduling.PollerMetadata; -import org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory; -import org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor; -import org.springframework.integration.transaction.TransactionInterceptorBuilder; -import org.springframework.integration.transaction.TransactionSynchronizationFactory; +import org.springframework.integration.transaction.*; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; @@ -84,15 +81,19 @@ public class TxIntegrationConfig { @Bean public TransactionSynchronizationFactory transactionSynchronizationFactory() { - ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = - new ExpressionEvaluatingTransactionSynchronizationProcessor(); - SpelExpressionParser spelExpressionParser = new SpelExpressionParser(); - transactionSynchronizationProcessor.setAfterCommitExpression(spelExpressionParser.parseExpression( - "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); - transactionSynchronizationProcessor.setAfterRollbackExpression(spelExpressionParser.parseExpression( - "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); - return new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor); - } + ExpressionEvaluatingTransactionSynchronizationProcessor processor = + new ExpressionEvaluatingTransactionSynchronizationProcessor(); + + SpelExpressionParser spelParser = new SpelExpressionParser(); + processor.setAfterCommitExpression( + spelParser.parseExpression( + "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); + processor.setAfterRollbackExpression( + spelParser.parseExpression( + "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); + + return new DefaultTransactionSynchronizationFactory(processor); + } @Bean @Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")