diff --git a/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java new file mode 100644 index 0000000000..308b00d8e9 --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/tx/TxIntegrationConfig.java @@ -0,0 +1,156 @@ +package com.baeldung.tx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.annotation.Transformer; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.core.MessageSource; +import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.file.FileReadingMessageSource; +import org.springframework.integration.file.filters.SimplePatternFileListFilter; +import org.springframework.integration.file.transformer.FileToStringTransformer; +import org.springframework.integration.scheduling.PollerMetadata; +import org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory; +import org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor; +import org.springframework.integration.transaction.TransactionInterceptorBuilder; +import org.springframework.integration.transaction.TransactionSynchronizationFactory; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.messaging.MessageChannel; +import org.springframework.transaction.interceptor.TransactionInterceptor; + +import javax.sql.DataSource; +import java.io.File; +import java.util.Scanner; + +@Configuration +@EnableIntegration +public class TxIntegrationConfig { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + public final String INPUT_DIR = "/tmp/tx/"; + public final String FILE_PATTERN = "*.txt"; + + @Autowired + private JdbcTemplate jdbcTemplate; + + @Autowired + private DataSource dataSource; + + @Autowired + private TransactionSynchronizationFactory transactionSynchronizationFactory; + + @Autowired + DataSourceTransactionManager txManager; + + @Bean + public MessageChannel inputChannel() { + return new DirectChannel(); + } + + @Bean + @InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata")) + public MessageSource fileReadingMessageSource() { + FileReadingMessageSource sourceReader = new FileReadingMessageSource(); + sourceReader.setDirectory(new File(INPUT_DIR)); + sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN)); + return sourceReader; + } + + @Bean + public PollerMetadata pollerMetadata() { + return Pollers + .fixedDelay(5000) + .advice(transactionInterceptor()) + .transactionSynchronizationFactory(transactionSynchronizationFactory) + .get(); + } + + private TransactionInterceptor transactionInterceptor() { + return new TransactionInterceptorBuilder() + .transactionManager(txManager) + .build(); + } + + @Bean + public TransactionSynchronizationFactory transactionSynchronizationFactory(){ + ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = + new ExpressionEvaluatingTransactionSynchronizationProcessor(); + transactionSynchronizationProcessor.setAfterCommitExpression( + new LiteralExpression("payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); + transactionSynchronizationProcessor.setAfterRollbackExpression( + new LiteralExpression("payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); + return new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor); + } + + @Bean + @Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel") + public FileToStringTransformer fileToStringTransformer() { + return new FileToStringTransformer(); + } + + @ServiceActivator(inputChannel = "toServiceChannel") + public void serviceActivator(String payload) { + + jdbcTemplate.update("insert into STUDENT values(?)", payload); + + if (payload.toLowerCase().startsWith("fail")) { + log.error("Service failure. Test result: {} ", payload); + throw new RuntimeException("Service failure."); + } + + log.info("Service success. Test result: {}", payload); + } + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.H2) + .addScript("classpath:table.sql") + .build(); + } + + @Bean + public DataSourceTransactionManager txManager() { + return new DataSourceTransactionManager(dataSource); + } + + public static void main(final String... args) { + final AbstractApplicationContext context = new AnnotationConfigApplicationContext(TxIntegrationConfig.class); + context.registerShutdownHook(); + + final Scanner scanner = new Scanner(System.in); + System.out.print("Integration flow is running. Type q + to quit "); + while (true) { + final String input = scanner.nextLine(); + if ("q".equals(input.trim())) { + context.close(); + scanner.close(); + break; + } + } + System.exit(0); + } + +} + + +