diff --git a/spring-core-4/pom.xml b/spring-core-4/pom.xml index 53f7ca6912..fbec5ea9eb 100644 --- a/spring-core-4/pom.xml +++ b/spring-core-4/pom.xml @@ -24,6 +24,16 @@ spring-core ${spring.version} + + org.springframework + spring-expression + ${spring.version} + + + com.google.guava + guava + 28.2-jre + org.springframework spring-test @@ -42,6 +52,18 @@ ${junit-jupiter.version} test + + org.awaitility + awaitility + 4.0.2 + test + + + org.assertj + assertj-core + 2.9.1 + test + @@ -60,4 +82,4 @@ 2.2.2.RELEASE - \ No newline at end of file + diff --git a/spring-core-4/src/main/java/com/baeldung/postprocessor/GlobalEventBus.java b/spring-core-4/src/main/java/com/baeldung/postprocessor/GlobalEventBus.java new file mode 100644 index 0000000000..8b95ea7c6f --- /dev/null +++ b/spring-core-4/src/main/java/com/baeldung/postprocessor/GlobalEventBus.java @@ -0,0 +1,39 @@ +package com.baeldung.postprocessor; + +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; + +import java.util.concurrent.Executors; + +@SuppressWarnings("ALL") +public final class GlobalEventBus { + + public static final String GLOBAL_EVENT_BUS_EXPRESSION = "T(com.baeldung.postprocessor.GlobalEventBus).getEventBus()"; + + private static final String IDENTIFIER = "global-event-bus"; + + private static final GlobalEventBus GLOBAL_EVENT_BUS = new GlobalEventBus(); + + private final EventBus eventBus = new AsyncEventBus(IDENTIFIER, Executors.newCachedThreadPool()); + + private GlobalEventBus() { + } + + public static GlobalEventBus getInstance() { + return GlobalEventBus.GLOBAL_EVENT_BUS; + } + + public static EventBus getEventBus() { + return GlobalEventBus.GLOBAL_EVENT_BUS.eventBus; + } + + public static void subscribe(Object obj) { + getEventBus().register(obj); + } + public static void unsubscribe(Object obj) { + getEventBus().unregister(obj); + } + public static void post(Object event) { + getEventBus().post(event); + } +} diff --git a/spring-core-4/src/main/java/com/baeldung/postprocessor/GuavaEventBusBeanFactoryPostProcessor.java b/spring-core-4/src/main/java/com/baeldung/postprocessor/GuavaEventBusBeanFactoryPostProcessor.java new file mode 100644 index 0000000000..fba31fde6a --- /dev/null +++ b/spring-core-4/src/main/java/com/baeldung/postprocessor/GuavaEventBusBeanFactoryPostProcessor.java @@ -0,0 +1,63 @@ +package com.baeldung.postprocessor; + +import com.google.common.eventbus.EventBus; + +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.aop.framework.Advised; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.FatalBeanException; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionException; +import org.springframework.expression.spel.standard.SpelExpressionParser; + +@SuppressWarnings("ALL") +public class GuavaEventBusBeanFactoryPostProcessor implements BeanFactoryPostProcessor { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final SpelExpressionParser expressionParser = new SpelExpressionParser(); + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { + for (Iterator names = beanFactory.getBeanNamesIterator(); names.hasNext(); ) { + Object proxy = this.getTargetObject(beanFactory.getBean(names.next())); + final Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class); + if (annotation == null) + continue; + this.logger.info("{}: processing bean of type {} during initialization", this.getClass().getSimpleName(), + proxy.getClass().getName()); + final String annotationValue = annotation.value(); + try { + final Expression expression = this.expressionParser.parseExpression(annotationValue); + final Object value = expression.getValue(); + if (!(value instanceof EventBus)) { + this.logger.error("{}: expression {} did not evaluate to an instance of EventBus for bean of type {}", + this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName()); + return; + } + final EventBus eventBus = (EventBus)value; + eventBus.register(proxy); + } catch (ExpressionException ex) { + this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}", this.getClass().getSimpleName(), + annotationValue, proxy.getClass().getName()); + } + } + } + + private Object getTargetObject(Object proxy) throws BeansException { + if (AopUtils.isJdkDynamicProxy(proxy)) { + try { + return ((Advised)proxy).getTargetSource().getTarget(); + } catch (Exception e) { + throw new FatalBeanException("Error getting target of JDK proxy", e); + } + } + return proxy; + } +} diff --git a/spring-core-4/src/main/java/com/baeldung/postprocessor/GuavaEventBusBeanPostProcessor.java b/spring-core-4/src/main/java/com/baeldung/postprocessor/GuavaEventBusBeanPostProcessor.java new file mode 100644 index 0000000000..677c839444 --- /dev/null +++ b/spring-core-4/src/main/java/com/baeldung/postprocessor/GuavaEventBusBeanPostProcessor.java @@ -0,0 +1,87 @@ +package com.baeldung.postprocessor; + +import com.google.common.eventbus.EventBus; + +import java.util.function.BiConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.aop.framework.Advised; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.FatalBeanException; +import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionException; +import org.springframework.expression.spel.standard.SpelExpressionParser; + +/** + * A {@link DestructionAwareBeanPostProcessor} which registers/un-registers subscribers to a Guava {@link EventBus}. The class must + * be annotated with {@link Subscriber} and each subscribing method must be annotated with + * {@link com.google.common.eventbus.Subscribe}. + */ +@SuppressWarnings("ALL") +public class GuavaEventBusBeanPostProcessor implements DestructionAwareBeanPostProcessor { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final SpelExpressionParser expressionParser = new SpelExpressionParser(); + + @Override + public void postProcessBeforeDestruction(final Object bean, final String beanName) throws BeansException { + this.process(bean, EventBus::unregister, "destruction"); + } + + @Override + public boolean requiresDestruction(Object bean) { + return true; + } + + @Override + public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException { + return bean; + } + + @Override + public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { + this.process(bean, EventBus::register, "initialization"); + return bean; + } + + private void process(final Object bean, final BiConsumer consumer, final String action) { + Object proxy = this.getTargetObject(bean); + final Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class); + if (annotation == null) + return; + this.logger.info("{}: processing bean of type {} during {}", this.getClass().getSimpleName(), proxy.getClass().getName(), + action); + final String annotationValue = annotation.value(); + try { + final Expression expression = this.expressionParser.parseExpression(annotationValue); + final Object value = expression.getValue(); + if (!(value instanceof EventBus)) { + this.logger.error("{}: expression {} did not evaluate to an instance of EventBus for bean of type {}", + this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName()); + return; + } + final EventBus eventBus = (EventBus)value; + consumer.accept(eventBus, proxy); + } catch (ExpressionException ex) { + this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}", this.getClass().getSimpleName(), + annotationValue, proxy.getClass().getName()); + } + } + + private Object getTargetObject(Object proxy) throws BeansException { + if (AopUtils.isJdkDynamicProxy(proxy)) { + try { + return ((Advised)proxy).getTargetSource().getTarget(); + } catch (Exception e) { + throw new FatalBeanException("Error getting target of JDK proxy", e); + } + } + return proxy; + } +} + + diff --git a/spring-core-4/src/main/java/com/baeldung/postprocessor/StockTrade.java b/spring-core-4/src/main/java/com/baeldung/postprocessor/StockTrade.java new file mode 100644 index 0000000000..7711cf7101 --- /dev/null +++ b/spring-core-4/src/main/java/com/baeldung/postprocessor/StockTrade.java @@ -0,0 +1,34 @@ +package com.baeldung.postprocessor; + +import java.util.Date; + +public class StockTrade { + + private final String symbol; + private final int quantity; + private final double price; + private final Date tradeDate; + + public StockTrade(String symbol, int quantity, double price, Date tradeDate) { + this.symbol = symbol; + this.quantity = quantity; + this.price = price; + this.tradeDate = tradeDate; + } + + public String getSymbol() { + return this.symbol; + } + + public int getQuantity() { + return this.quantity; + } + + public double getPrice() { + return this.price; + } + + public Date getTradeDate() { + return this.tradeDate; + } +} diff --git a/spring-core-4/src/main/java/com/baeldung/postprocessor/StockTradeListener.java b/spring-core-4/src/main/java/com/baeldung/postprocessor/StockTradeListener.java new file mode 100644 index 0000000000..bf34d66f24 --- /dev/null +++ b/spring-core-4/src/main/java/com/baeldung/postprocessor/StockTradeListener.java @@ -0,0 +1,7 @@ +package com.baeldung.postprocessor; + +@FunctionalInterface +public interface StockTradeListener { + + void stockTradePublished(StockTrade trade); +} diff --git a/spring-core-4/src/main/java/com/baeldung/postprocessor/StockTradePublisher.java b/spring-core-4/src/main/java/com/baeldung/postprocessor/StockTradePublisher.java new file mode 100644 index 0000000000..bf339872d9 --- /dev/null +++ b/spring-core-4/src/main/java/com/baeldung/postprocessor/StockTradePublisher.java @@ -0,0 +1,36 @@ +package com.baeldung.postprocessor; + +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; + +import java.util.HashSet; +import java.util.Set; + +@Subscriber +public class StockTradePublisher { + + private final Set stockTradeListeners = new HashSet<>(); + + public void addStockTradeListener(StockTradeListener listener) { + synchronized (this.stockTradeListeners) { + this.stockTradeListeners.add(listener); + } + } + + public void removeStockTradeListener(StockTradeListener listener) { + synchronized (this.stockTradeListeners) { + this.stockTradeListeners.remove(listener); + } + } + + @Subscribe + @AllowConcurrentEvents + private void handleNewStockTradeEvent(StockTrade trade) { + // publish to DB, send to PubNub, whatever you want here + final Set listeners; + synchronized (this.stockTradeListeners) { + listeners = new HashSet<>(this.stockTradeListeners); + } + listeners.forEach(li -> li.stockTradePublished(trade)); + } +} diff --git a/spring-core-4/src/main/java/com/baeldung/postprocessor/Subscriber.java b/spring-core-4/src/main/java/com/baeldung/postprocessor/Subscriber.java new file mode 100644 index 0000000000..bef38333d6 --- /dev/null +++ b/spring-core-4/src/main/java/com/baeldung/postprocessor/Subscriber.java @@ -0,0 +1,21 @@ +package com.baeldung.postprocessor; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * An annotation which indicates which Guava {@link com.google.common.eventbus.EventBus} a Spring bean wishes to subscribe to. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@Inherited +public @interface Subscriber { + + /** + * A SpEL expression which selects the {@link com.google.common.eventbus.EventBus}. + */ + String value() default GlobalEventBus.GLOBAL_EVENT_BUS_EXPRESSION; +} diff --git a/spring-core-4/src/test/java/com/baeldung/postprocessor/PostProcessorConfiguration.java b/spring-core-4/src/test/java/com/baeldung/postprocessor/PostProcessorConfiguration.java new file mode 100644 index 0000000000..b28e36663a --- /dev/null +++ b/spring-core-4/src/test/java/com/baeldung/postprocessor/PostProcessorConfiguration.java @@ -0,0 +1,23 @@ +package com.baeldung.postprocessor; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class PostProcessorConfiguration { + + @Bean + public GlobalEventBus eventBus() { + return GlobalEventBus.getInstance(); + } + + @Bean + public GuavaEventBusBeanPostProcessor eventBusBeanPostProcessor() { + return new GuavaEventBusBeanPostProcessor(); + } + + @Bean + public StockTradePublisher stockTradePublisher() { + return new StockTradePublisher(); + } +} diff --git a/spring-core-4/src/test/java/com/baeldung/postprocessor/StockTradeIntegrationTest.java b/spring-core-4/src/test/java/com/baeldung/postprocessor/StockTradeIntegrationTest.java new file mode 100644 index 0000000000..ae3cd968dc --- /dev/null +++ b/spring-core-4/src/test/java/com/baeldung/postprocessor/StockTradeIntegrationTest.java @@ -0,0 +1,46 @@ +package com.baeldung.postprocessor; + +import java.time.Duration; +import java.util.Date; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {PostProcessorConfiguration.class}) +public class StockTradeIntegrationTest { + + @Autowired + private StockTradePublisher stockTradePublisher; + + @Test + public void givenValidConfig_whenTradePublished_thenTradeReceived() { + Date tradeDate = new Date(); + StockTrade stockTrade = new StockTrade("AMZN", 100, 2483.52d, tradeDate); + AtomicBoolean assertionsPassed = new AtomicBoolean(false); + StockTradeListener listener = trade -> assertionsPassed.set(this.verifyExact(stockTrade, trade)); + this.stockTradePublisher.addStockTradeListener(listener); + try { + GlobalEventBus.post(stockTrade); + await().atMost(Duration.ofSeconds(2L)) + .untilAsserted(() -> assertThat(assertionsPassed.get()).isTrue()); + } finally { + this.stockTradePublisher.removeStockTradeListener(listener); + } + } + + private boolean verifyExact(StockTrade stockTrade, StockTrade trade) { + return Objects.equals(stockTrade.getSymbol(), trade.getSymbol()) + && Objects.equals(stockTrade.getTradeDate(), trade.getTradeDate()) + && stockTrade.getQuantity() == trade.getQuantity() + && stockTrade.getPrice() == trade.getPrice(); + } +}