diff --git a/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/BasicEventService.java b/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/BasicEventService.java index b653a9d..3e95d37 100755 --- a/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/BasicEventService.java +++ b/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/BasicEventService.java @@ -4,7 +4,6 @@ import demo.domain.Aggregate; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.client.loadbalancer.LoadBalanced; -import org.springframework.cloud.stream.messaging.Source; import org.springframework.data.domain.PageRequest; import org.springframework.hateoas.Link; import org.springframework.hateoas.MediaTypes; @@ -37,13 +36,13 @@ public class BasicEventService impleme private String eventsWorker; private final EventRepository eventRepository; - private final Source eventStream; + private final EventSource eventSource; private final RestTemplate restTemplate; - public BasicEventService(EventRepository eventRepository, Source eventStream, @LoadBalanced RestTemplate + public BasicEventService(EventRepository eventRepository, EventSource eventSource, @LoadBalanced RestTemplate restTemplate) { this.eventRepository = eventRepository; - this.eventStream = eventStream; + this.eventSource = eventSource; this.restTemplate = restTemplate; } @@ -69,7 +68,7 @@ public class BasicEventService impleme } public Boolean sendAsync(S event, Link... links) { - return eventStream.output() + return eventSource.getChannel() .send(MessageBuilder.withPayload(event) .setHeader("contentType", MediaType.APPLICATION_JSON_UTF8_VALUE) .build()); diff --git a/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/EventAutoConfig.java b/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/EventAutoConfig.java index 6319f2b..b39ab9b 100755 --- a/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/EventAutoConfig.java +++ b/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/EventAutoConfig.java @@ -20,18 +20,21 @@ import org.springframework.web.client.RestTemplate; public class EventAutoConfig { private EventRepository eventRepository; - private Source source; private RestTemplate restTemplate; - public EventAutoConfig(EventRepository eventRepository, Source source, RestTemplate restTemplate) { + public EventAutoConfig(EventRepository eventRepository, RestTemplate restTemplate) { this.eventRepository = eventRepository; - this.source = source; this.restTemplate = restTemplate; } @SuppressWarnings("unchecked") @Bean - public EventService eventService() { - return new BasicEventService(eventRepository, source, restTemplate); + public EventService eventService(EventSource eventSource) { + return new BasicEventService(eventRepository, eventSource, restTemplate); + } + + @Bean + public EventSource eventSource(Source source) { + return new EventSource(source.output()); } } diff --git a/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/EventSource.java b/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/EventSource.java new file mode 100644 index 0000000..7a5f47d --- /dev/null +++ b/spring-boot-starters/spring-boot-starter-data-events/src/main/java/demo/event/EventSource.java @@ -0,0 +1,16 @@ +package demo.event; + +import org.springframework.messaging.MessageChannel; + +public class EventSource { + + private MessageChannel channel; + + public EventSource(MessageChannel channel) { + this.channel = channel; + } + + public MessageChannel getChannel() { + return channel; + } +} diff --git a/warehouse/warehouse-web/src/main/java/demo/config/EventConfig.java b/warehouse/warehouse-web/src/main/java/demo/config/EventConfig.java index 013182b..64b19e4 100644 --- a/warehouse/warehouse-web/src/main/java/demo/config/EventConfig.java +++ b/warehouse/warehouse-web/src/main/java/demo/config/EventConfig.java @@ -1,5 +1,6 @@ package demo.config; +import demo.event.EventSource; import demo.inventory.config.InventoryEventSource; import demo.inventory.event.InventoryEventRepository; import demo.inventory.event.InventoryEventService; @@ -9,6 +10,7 @@ import demo.reservation.event.ReservationEventService; import demo.warehouse.config.WarehouseEventSource; import demo.warehouse.event.WarehouseEventRepository; import demo.warehouse.event.WarehouseEventService; +import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.client.RestTemplate; @@ -21,21 +23,39 @@ import org.springframework.web.client.RestTemplate; @Configuration public class EventConfig { + @Bean + public EventSource inventoryChannel(InventoryEventSource eventSource) { + return new EventSource(eventSource.output()); + } + + @Bean + public EventSource warehouseChannel(WarehouseEventSource eventSource) { + return new EventSource(eventSource.output()); + } + + @Bean + public EventSource reservationChannel(ReservationEventSource eventSource) { + return new EventSource(eventSource.output()); + } + @Bean public InventoryEventService inventoryEventService(RestTemplate restTemplate, InventoryEventRepository - inventoryEventRepository, InventoryEventSource eventStream) { - return new InventoryEventService(inventoryEventRepository, eventStream, restTemplate); + inventoryEventRepository, InventoryEventSource eventStream, Source source) { + return new InventoryEventService(inventoryEventRepository, inventoryChannel(eventStream), restTemplate, source); } @Bean public WarehouseEventService warehouseEventService(RestTemplate restTemplate, WarehouseEventRepository - warehouseEventRepository, WarehouseEventSource eventStream) { - return new WarehouseEventService(warehouseEventRepository, eventStream, restTemplate); + warehouseEventRepository, WarehouseEventSource eventStream, Source source) { + return new WarehouseEventService(warehouseEventRepository, warehouseChannel(eventStream), restTemplate, source); } @Bean public ReservationEventService reservationEventService(RestTemplate restTemplate, ReservationEventRepository - reservationEventRepository, ReservationEventSource eventStream) { - return new ReservationEventService(reservationEventRepository, eventStream, restTemplate); + reservationEventRepository, ReservationEventSource eventStream, Source source) { + return new ReservationEventService(reservationEventRepository, reservationChannel(eventStream), restTemplate, + source); } + + } diff --git a/warehouse/warehouse-web/src/main/java/demo/config/StreamConfig.java b/warehouse/warehouse-web/src/main/java/demo/config/StreamConfig.java new file mode 100644 index 0000000..2eaf902 --- /dev/null +++ b/warehouse/warehouse-web/src/main/java/demo/config/StreamConfig.java @@ -0,0 +1,10 @@ +package demo.config; + +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.context.annotation.Configuration; + +@Configuration +@EnableBinding(Source.class) +public class StreamConfig { +} diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/config/InventoryEventSource.java b/warehouse/warehouse-web/src/main/java/demo/inventory/config/InventoryEventSource.java index 0a7b3c2..00595eb 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/config/InventoryEventSource.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/config/InventoryEventSource.java @@ -1,13 +1,11 @@ package demo.inventory.config; import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; -public interface InventoryEventSource extends Source { +public interface InventoryEventSource { String OUTPUT = "inventory"; - @Override @Output(InventoryEventSource.OUTPUT) MessageChannel output(); } diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/event/InventoryEventService.java b/warehouse/warehouse-web/src/main/java/demo/inventory/event/InventoryEventService.java index d1adeae..c853add 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/event/InventoryEventService.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/event/InventoryEventService.java @@ -1,12 +1,30 @@ package demo.inventory.event; import demo.event.BasicEventService; -import demo.inventory.config.InventoryEventSource; +import demo.event.EventSource; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.hateoas.Link; +import org.springframework.http.MediaType; +import org.springframework.integration.support.MessageBuilder; import org.springframework.web.client.RestTemplate; public class InventoryEventService extends BasicEventService { - public InventoryEventService(InventoryEventRepository eventRepository, InventoryEventSource eventStream, - RestTemplate restTemplate) { + + private final Source source; + + public InventoryEventService(InventoryEventRepository eventRepository, EventSource eventStream, RestTemplate + restTemplate, Source source) { super(eventRepository, eventStream, restTemplate); + this.source = source; + } + + @Override + public Boolean sendAsync(S event, Link... links) { + // Send a duplicate event to the warehouse stream group output channel for data flow + source.output() + .send(MessageBuilder.withPayload(event) + .setHeader("contentType", MediaType.APPLICATION_JSON_UTF8_VALUE) + .build()); + return super.sendAsync(event, links); } } diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/config/ReservationEventSource.java b/warehouse/warehouse-web/src/main/java/demo/reservation/config/ReservationEventSource.java index 578fa7c..f813962 100644 --- a/warehouse/warehouse-web/src/main/java/demo/reservation/config/ReservationEventSource.java +++ b/warehouse/warehouse-web/src/main/java/demo/reservation/config/ReservationEventSource.java @@ -1,13 +1,11 @@ package demo.reservation.config; import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; -public interface ReservationEventSource extends Source { +public interface ReservationEventSource { String OUTPUT = "reservation"; - @Override @Output(ReservationEventSource.OUTPUT) MessageChannel output(); } diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/event/ReservationEventService.java b/warehouse/warehouse-web/src/main/java/demo/reservation/event/ReservationEventService.java index 46519a1..3f7bd29 100644 --- a/warehouse/warehouse-web/src/main/java/demo/reservation/event/ReservationEventService.java +++ b/warehouse/warehouse-web/src/main/java/demo/reservation/event/ReservationEventService.java @@ -1,12 +1,30 @@ package demo.reservation.event; import demo.event.BasicEventService; -import demo.reservation.config.ReservationEventSource; +import demo.event.EventSource; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.hateoas.Link; +import org.springframework.http.MediaType; +import org.springframework.integration.support.MessageBuilder; import org.springframework.web.client.RestTemplate; public class ReservationEventService extends BasicEventService { - public ReservationEventService(ReservationEventRepository eventRepository, ReservationEventSource eventStream, - RestTemplate restTemplate) { - super(eventRepository, eventStream, restTemplate); + + private final Source source; + + public ReservationEventService(ReservationEventRepository reservationEventRepository, EventSource eventSource, + RestTemplate restTemplate, Source source) { + super(reservationEventRepository, eventSource, restTemplate); + this.source = source; + } + + @Override + public Boolean sendAsync(S event, Link... links) { + // Send a duplicate event to the warehouse stream group output channel for data flow + source.output() + .send(MessageBuilder.withPayload(event) + .setHeader("contentType", MediaType.APPLICATION_JSON_UTF8_VALUE) + .build()); + return super.sendAsync(event, links); } } diff --git a/warehouse/warehouse-web/src/main/java/demo/warehouse/config/WarehouseEventSource.java b/warehouse/warehouse-web/src/main/java/demo/warehouse/config/WarehouseEventSource.java index 4c859a1..44e08ba 100644 --- a/warehouse/warehouse-web/src/main/java/demo/warehouse/config/WarehouseEventSource.java +++ b/warehouse/warehouse-web/src/main/java/demo/warehouse/config/WarehouseEventSource.java @@ -1,13 +1,11 @@ package demo.warehouse.config; import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; -public interface WarehouseEventSource extends Source { +public interface WarehouseEventSource { String OUTPUT = "warehouse"; - @Override @Output(WarehouseEventSource.OUTPUT) MessageChannel output(); } diff --git a/warehouse/warehouse-web/src/main/java/demo/warehouse/event/WarehouseEventService.java b/warehouse/warehouse-web/src/main/java/demo/warehouse/event/WarehouseEventService.java index 6d616fc..68894f9 100644 --- a/warehouse/warehouse-web/src/main/java/demo/warehouse/event/WarehouseEventService.java +++ b/warehouse/warehouse-web/src/main/java/demo/warehouse/event/WarehouseEventService.java @@ -1,12 +1,30 @@ package demo.warehouse.event; import demo.event.BasicEventService; -import demo.warehouse.config.WarehouseEventSource; +import demo.event.EventSource; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.hateoas.Link; +import org.springframework.http.MediaType; +import org.springframework.integration.support.MessageBuilder; import org.springframework.web.client.RestTemplate; public class WarehouseEventService extends BasicEventService { - public WarehouseEventService(WarehouseEventRepository eventRepository, WarehouseEventSource eventStream, - RestTemplate restTemplate) { - super(eventRepository, eventStream, restTemplate); + + private final Source source; + + public WarehouseEventService(WarehouseEventRepository warehouseEventRepository, EventSource eventSource, + RestTemplate restTemplate, Source source) { + super(warehouseEventRepository, eventSource, restTemplate); + this.source = source; + } + + @Override + public Boolean sendAsync(S event, Link... links) { + // Send a duplicate event to the warehouse stream group output channel for data flow + source.output() + .send(MessageBuilder.withPayload(event) + .setHeader("contentType", MediaType.APPLICATION_JSON_UTF8_VALUE) + .build()); + return super.sendAsync(event, links); } } diff --git a/warehouse/warehouse-web/src/main/resources/application.yml b/warehouse/warehouse-web/src/main/resources/application.yml index 5f7b3e5..7ef22d3 100644 --- a/warehouse/warehouse-web/src/main/resources/application.yml +++ b/warehouse/warehouse-web/src/main/resources/application.yml @@ -20,6 +20,9 @@ spring: inventory: contentType: 'application/json' destination: inventory + output: + contentType: 'application/json' + destination: warehouse-stream jpa: show_sql: false database: H2 @@ -47,6 +50,9 @@ spring: inventory: contentType: 'application/json' destination: inventory + output: + contentType: 'application/json' + destination: warehouse-stream datasource: url: jdbc:mysql://${DOCKER_IP:192.168.99.100}:3306/dev username: root @@ -93,6 +99,9 @@ spring: inventory: contentType: 'application/json' destination: inventory + output: + contentType: 'application/json' + destination: warehouse-stream eureka: instance: hostname: ${vcap.application.uris[0]:localhost} diff --git a/warehouse/warehouse-worker/src/main/resources/application.yml b/warehouse/warehouse-worker/src/main/resources/application.yml index 68d667f..023a4ac 100644 --- a/warehouse/warehouse-worker/src/main/resources/application.yml +++ b/warehouse/warehouse-worker/src/main/resources/application.yml @@ -7,29 +7,29 @@ server: spring: profiles: development cloud: - stream: - bindings: - warehouse: - contentType: 'application/json' - destination: warehouse - group: warehouse-group - consumer: - durableSubscription: true - concurrency: 5 - reservation: - contentType: 'application/json' - destination: reservation - group: reservation-group - consumer: - durableSubscription: true - concurrency: 5 - inventory: - contentType: 'application/json' - destination: inventory - group: inventory-group - consumer: - durableSubscription: true - concurrency: 5 + stream: + bindings: + warehouse: + contentType: 'application/json' + destination: warehouse + group: warehouse-group + consumer: + durableSubscription: true + concurrency: 5 + reservation: + contentType: 'application/json' + destination: reservation + group: reservation-group + consumer: + durableSubscription: true + concurrency: 5 + inventory: + contentType: 'application/json' + destination: inventory + group: inventory-group + consumer: + durableSubscription: true + concurrency: 5 --- spring: profiles: docker @@ -37,29 +37,29 @@ spring: host: ${DOCKER_IP:192.168.99.100} port: 5672 cloud: - stream: - bindings: - warehouse: - contentType: 'application/json' - destination: warehouse - group: warehouse-group - consumer: - durableSubscription: true - concurrency: 20 - reservation: - contentType: 'application/json' - destination: reservation - group: reservation-group - consumer: - durableSubscription: true - concurrency: 20 - inventory: - contentType: 'application/json' - destination: inventory - group: inventory-group - consumer: - durableSubscription: true - concurrency: 20 + stream: + bindings: + warehouse: + contentType: 'application/json' + destination: warehouse + group: warehouse-group + consumer: + durableSubscription: true + concurrency: 20 + reservation: + contentType: 'application/json' + destination: reservation + group: reservation-group + consumer: + durableSubscription: true + concurrency: 20 + inventory: + contentType: 'application/json' + destination: inventory + group: inventory-group + consumer: + durableSubscription: true + concurrency: 20 eureka: client: service-url: @@ -79,29 +79,29 @@ eureka: spring: profiles: cloud cloud: - stream: - bindings: - warehouse: - contentType: 'application/json' - destination: warehouse - group: warehouse-group - consumer: - durableSubscription: true - concurrency: 5 - reservation: - contentType: 'application/json' - destination: reservation - group: reservation-group - consumer: - durableSubscription: true - concurrency: 5 - inventory: - contentType: 'application/json' - destination: inventory - group: inventory-group - consumer: - durableSubscription: true - concurrency: 5 + stream: + bindings: + warehouse: + contentType: 'application/json' + destination: warehouse + group: warehouse-group + consumer: + durableSubscription: true + concurrency: 5 + reservation: + contentType: 'application/json' + destination: reservation + group: reservation-group + consumer: + durableSubscription: true + concurrency: 5 + inventory: + contentType: 'application/json' + destination: inventory + group: inventory-group + consumer: + durableSubscription: true + concurrency: 5 eureka: instance: hostname: ${vcap.application.uris[0]:localhost}