@@ -68,6 +68,11 @@
|
||||
<artifactId>spring-integration-security</artifactId>
|
||||
<version>${spring.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-jdbc</artifactId>
|
||||
<version>${spring.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.security</groupId>
|
||||
@@ -75,6 +80,12 @@
|
||||
<version>${spring.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<version>1.4.197</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
@@ -106,7 +117,7 @@
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<spring.version>5.0.3.RELEASE</spring.version>
|
||||
<spring.version>5.0.13.RELEASE</spring.version>
|
||||
<spring-social.version>1.1.4.RELEASE</spring-social.version>
|
||||
<javax-mail.version>1.4.7</javax-mail.version>
|
||||
<javax-activation.version>1.1.1</javax-activation.version>
|
||||
|
||||
27
spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java
Executable file
27
spring-integration/src/main/java/com/baeldung/tx/ServiceActivator.java
Executable file
@@ -0,0 +1,27 @@
|
||||
package com.baeldung.tx;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
public class ServiceActivator {
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
public void checkTestResults(String payload) {
|
||||
|
||||
this.jdbcTemplate.update("insert into STUDENT values(?)", payload);
|
||||
|
||||
if (payload.toLowerCase().startsWith("fail")) {
|
||||
log.error("Service failure. Test result: {}", payload);
|
||||
throw new RuntimeException("Service failure.");
|
||||
}
|
||||
|
||||
log.info("Service success. Test result: {}", payload);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
package com.baeldung.tx;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.support.AbstractApplicationContext;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.integration.annotation.InboundChannelAdapter;
|
||||
import org.springframework.integration.annotation.Poller;
|
||||
import org.springframework.integration.annotation.ServiceActivator;
|
||||
import org.springframework.integration.annotation.Transformer;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.config.EnableIntegration;
|
||||
import org.springframework.integration.core.MessageSource;
|
||||
import org.springframework.integration.dsl.Pollers;
|
||||
import org.springframework.integration.file.FileReadingMessageSource;
|
||||
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
|
||||
import org.springframework.integration.file.transformer.FileToStringTransformer;
|
||||
import org.springframework.integration.scheduling.PollerMetadata;
|
||||
import org.springframework.integration.transaction.*;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
||||
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
|
||||
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.transaction.interceptor.TransactionInterceptor;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.io.File;
|
||||
import java.util.Scanner;
|
||||
|
||||
@Configuration
|
||||
@EnableIntegration
|
||||
public class TxIntegrationConfig {
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
public final String INPUT_DIR = System.getProperty("java.io.tmpdir") + "/tx/";
|
||||
public final String FILE_PATTERN = "*.txt";
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Autowired
|
||||
private DataSource dataSource;
|
||||
|
||||
@Autowired
|
||||
private TransactionSynchronizationFactory transactionSynchronizationFactory;
|
||||
|
||||
@Autowired
|
||||
DataSourceTransactionManager txManager;
|
||||
|
||||
@Bean
|
||||
public MessageChannel inputChannel() {
|
||||
return new DirectChannel();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata"))
|
||||
public MessageSource<File> fileReadingMessageSource() {
|
||||
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
|
||||
sourceReader.setDirectory(new File(INPUT_DIR));
|
||||
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
|
||||
return sourceReader;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PollerMetadata pollerMetadata() {
|
||||
return Pollers.fixedDelay(5000)
|
||||
.advice(transactionInterceptor())
|
||||
.transactionSynchronizationFactory(transactionSynchronizationFactory)
|
||||
.get();
|
||||
}
|
||||
|
||||
private TransactionInterceptor transactionInterceptor() {
|
||||
return new TransactionInterceptorBuilder().transactionManager(txManager).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
|
||||
ExpressionEvaluatingTransactionSynchronizationProcessor processor =
|
||||
new ExpressionEvaluatingTransactionSynchronizationProcessor();
|
||||
|
||||
SpelExpressionParser spelParser = new SpelExpressionParser();
|
||||
processor.setAfterCommitExpression(
|
||||
spelParser.parseExpression(
|
||||
"payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"));
|
||||
processor.setAfterRollbackExpression(
|
||||
spelParser.parseExpression(
|
||||
"payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));
|
||||
|
||||
return new DefaultTransactionSynchronizationFactory(processor);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")
|
||||
public FileToStringTransformer fileToStringTransformer() {
|
||||
return new FileToStringTransformer();
|
||||
}
|
||||
|
||||
@ServiceActivator(inputChannel = "toServiceChannel")
|
||||
public void serviceActivator(String payload) {
|
||||
|
||||
jdbcTemplate.update("insert into STUDENT values(?)", payload);
|
||||
|
||||
if (payload.toLowerCase().startsWith("fail")) {
|
||||
log.error("Service failure. Test result: {} ", payload);
|
||||
throw new RuntimeException("Service failure.");
|
||||
}
|
||||
|
||||
log.info("Service success. Test result: {}", payload);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
|
||||
return new JdbcTemplate(dataSource);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DataSource dataSource() {
|
||||
return new EmbeddedDatabaseBuilder()
|
||||
.setType(EmbeddedDatabaseType.H2)
|
||||
.addScript("classpath:table.sql")
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DataSourceTransactionManager txManager() {
|
||||
return new DataSourceTransactionManager(dataSource);
|
||||
}
|
||||
|
||||
public static void main(final String... args) {
|
||||
final AbstractApplicationContext context = new AnnotationConfigApplicationContext(TxIntegrationConfig.class);
|
||||
context.registerShutdownHook();
|
||||
|
||||
final Scanner scanner = new Scanner(System.in);
|
||||
System.out.print("Integration flow is running. Type q + <enter> to quit ");
|
||||
while (true) {
|
||||
final String input = scanner.nextLine();
|
||||
if ("q".equals(input.trim())) {
|
||||
context.close();
|
||||
scanner.close();
|
||||
break;
|
||||
}
|
||||
}
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:int="http://www.springframework.org/schema/integration"
|
||||
xmlns:context="http://www.springframework.org/schema/context"
|
||||
xmlns:int-file="http://www.springframework.org/schema/integration/file"
|
||||
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
|
||||
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/jdbc https://www.springframework.org/schema/jdbc/spring-jdbc.xsd
|
||||
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
|
||||
http://www.springframework.org/schema/integration/file https://www.springframework.org/schema/integration/file/spring-integration-file.xsd
|
||||
http://www.springframework.org/schema/integration/jdbc https://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
|
||||
|
||||
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
|
||||
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
|
||||
|
||||
<context:annotation-config/>
|
||||
|
||||
<context:property-placeholder/>
|
||||
|
||||
<int-file:inbound-channel-adapter
|
||||
channel="inputChannel"
|
||||
auto-create-directory="true"
|
||||
filename-pattern="*.txt"
|
||||
directory="${java.io.tmpdir}/tx/">
|
||||
<int:poller fixed-delay="500">
|
||||
<int:transactional transaction-manager="txManager" synchronization-factory="syncFactory"/>
|
||||
</int:poller>
|
||||
</int-file:inbound-channel-adapter>
|
||||
|
||||
<int:transaction-synchronization-factory id="syncFactory">
|
||||
<int:after-commit expression="payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"
|
||||
channel="infoLogger"/>
|
||||
<int:after-rollback expression="payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"
|
||||
channel="errorLogger"/>
|
||||
</int:transaction-synchronization-factory>
|
||||
|
||||
<int:channel id="inputChannel"/>
|
||||
|
||||
<int-file:file-to-string-transformer input-channel="inputChannel" output-channel="toServiceChannel"/>
|
||||
|
||||
<int:service-activator input-channel="toServiceChannel"
|
||||
ref="serviceActivator"
|
||||
method="checkTestResults"/>
|
||||
|
||||
<int:logging-channel-adapter id="infoLogger" level="INFO"/>
|
||||
<int:logging-channel-adapter id="errorLogger" level="ERROR"/>
|
||||
|
||||
<bean id="serviceActivator" class="com.baeldung.tx.ServiceActivator"/>
|
||||
|
||||
<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
|
||||
<property name="dataSource" ref="dataSource"/>
|
||||
</bean>
|
||||
|
||||
<jdbc:embedded-database id="dataSource" type="H2">
|
||||
<jdbc:script location="classpath*:table.sql"/>
|
||||
</jdbc:embedded-database>
|
||||
|
||||
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
|
||||
<property name="dataSource" ref="dataSource"/>
|
||||
</bean>
|
||||
|
||||
<int-jdbc:inbound-channel-adapter channel="infoLogger"
|
||||
query="select TEST from STUDENT" data-source="dataSource">
|
||||
<int:poller fixed-delay="5000"/>
|
||||
</int-jdbc:inbound-channel-adapter>
|
||||
</beans>
|
||||
4
spring-integration/src/main/resources/table.sql
Normal file
4
spring-integration/src/main/resources/table.sql
Normal file
@@ -0,0 +1,4 @@
|
||||
|
||||
CREATE TABLE IF NOT EXISTS STUDENT (
|
||||
TEST VARCHAR(256)
|
||||
);
|
||||
@@ -0,0 +1,56 @@
|
||||
package com.baeldung.tx;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.support.AbstractApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
|
||||
public final class TxIntegrationTest {
|
||||
|
||||
private static final String CONTEXT_CONFIG = "classpath:META-INF/spring/integration/spring-integration-tx-context.xml";
|
||||
|
||||
@Test
|
||||
public void whenFileDoesntStartWithFail_thenTxSuccessful() throws InterruptedException, IOException {
|
||||
final AbstractApplicationContext context =
|
||||
new ClassPathXmlApplicationContext(CONTEXT_CONFIG);
|
||||
|
||||
String fileName = System.getProperty("java.io.tmpdir") + "/tx/test1.txt";
|
||||
FileWriter fw = new FileWriter(fileName);
|
||||
fw.write("PASSED!");
|
||||
fw.close();
|
||||
|
||||
context.registerShutdownHook();
|
||||
Thread.sleep(5000);
|
||||
|
||||
File file = new File(fileName + ".PASSED");
|
||||
Assert.assertTrue(file.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFileStartsWithFail_thenTxFailed() {
|
||||
|
||||
String fileName = System.getProperty("java.io.tmpdir") + "/tx/test2.txt";
|
||||
|
||||
try {
|
||||
final AbstractApplicationContext context =
|
||||
new ClassPathXmlApplicationContext(CONTEXT_CONFIG);
|
||||
|
||||
FileWriter fw = new FileWriter(fileName);
|
||||
fw.write("FAILED!");
|
||||
fw.close();
|
||||
|
||||
context.registerShutdownHook();
|
||||
Thread.sleep(5000);
|
||||
} catch (Exception e) {
|
||||
// Exception is expected, do nothing
|
||||
}
|
||||
|
||||
File file = new File(fileName + ".FAILED");
|
||||
Assert.assertTrue(file.exists());
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user