Transaction sample changes

Add @Transactional on the runtime code in the lambda expression,
not on the functional bean. If it is set on the function bean, then
that will only be invoked at application initialization, thus losing
the transactional semantics on the actual runtme code.

Resolves https://github.com/spring-cloud/spring-cloud-stream-samples/issues/212
This commit is contained in:
Soby Chacko
2021-10-01 16:21:12 -04:00
parent becc034169
commit 1998a60a5f

View File

@@ -30,6 +30,7 @@ import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.util.backoff.FixedBackOff;
@@ -40,83 +41,89 @@ import org.springframework.util.backoff.FixedBackOff;
@EnableTransactionManagement
public class ProcessorApplication {
private Logger logger = LoggerFactory.getLogger(this.getClass().getName());
private Logger logger = LoggerFactory.getLogger(this.getClass().getName());
private AtomicBoolean shouldFail= new AtomicBoolean(false);
private PersonRepository repository;
private AtomicBoolean shouldFail = new AtomicBoolean(false);
private PersonRepository repository;
public ProcessorApplication(PersonRepository repository) {
public ProcessorApplication(PersonRepository repository) {
this.repository = repository;
}
this.repository = repository;
}
public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
@Bean
public Function<PersonEvent, PersonEvent> process(TxCode txCode) {
return pe -> txCode.run(pe);
}
@Transactional
@Bean
public Function<PersonEvent, PersonEvent> process() {
return pe -> {
logger.info("Received event={}", pe);
Person person = new Person();
person.setName(pe.getName());
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
// Disable retry in the AfterRollbackProcessor
return (container, destination, group) -> container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<byte[], byte[]>(
(record, exception) -> System.out.println("Discarding failed record: " + record),
new FixedBackOff(0L, 0)));
}
if (shouldFail.get()) {
shouldFail.set(false);
throw new RuntimeException("Simulated network error");
} else {
//We fail every other request as a test
shouldFail.set(true);
}
logger.info("Saving person={}", person);
static class PersonEvent {
Person savedPerson = repository.save(person);
String name;
String type;
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
logger.info("Sent event={}", event);
return event;
};
}
public String getName() {
return name;
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
// Disable retry in the AfterRollbackProcessor
return (container, destination, group) -> container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<byte[], byte[]>(
(record, exception) -> System.out.println("Discarding failed record: " + record),
new FixedBackOff(0L, 0)));
}
public void setName(String name) {
this.name = name;
}
static class PersonEvent {
public String getType() {
return type;
}
String name;
String type;
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
@Override
public String toString() {
return new StringJoiner(", ", PersonEvent.class.getSimpleName() + "[", "]")
.add("name='" + name + "'")
.add("type='" + type + "'")
.toString();
}
}
public void setName(String name) {
this.name = name;
}
@Component
class TxCode {
public String getType() {
return type;
}
@Transactional
PersonEvent run(PersonEvent pe) {
logger.info("Received event={}", pe);
Person person = new Person();
person.setName(pe.getName());
public void setType(String type) {
this.type = type;
}
if (shouldFail.get()) {
shouldFail.set(false);
throw new RuntimeException("Simulated network error");
}
else {
//We fail every other request as a test
shouldFail.set(true);
}
logger.info("Saving person={}", person);
@Override
public String toString() {
return new StringJoiner(", ", PersonEvent.class.getSimpleName() + "[", "]")
.add("name='" + name + "'")
.add("type='" + type + "'")
.toString();
}
}
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
logger.info("Sent event={}", event);
return event;
}
}
}