Fixed warehouse data flow integration

This commit is contained in:
Kenny Bastani
2017-01-12 05:05:14 -08:00
parent 121a56ea95
commit c950f1b649
13 changed files with 210 additions and 105 deletions

View File

@@ -4,7 +4,6 @@ import demo.domain.Aggregate;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.PageRequest;
import org.springframework.hateoas.Link; import org.springframework.hateoas.Link;
import org.springframework.hateoas.MediaTypes; import org.springframework.hateoas.MediaTypes;
@@ -37,13 +36,13 @@ public class BasicEventService<T extends Event, ID extends Serializable> impleme
private String eventsWorker; private String eventsWorker;
private final EventRepository<T, ID> eventRepository; private final EventRepository<T, ID> eventRepository;
private final Source eventStream; private final EventSource eventSource;
private final RestTemplate restTemplate; private final RestTemplate restTemplate;
public BasicEventService(EventRepository<T, ID> eventRepository, Source eventStream, @LoadBalanced RestTemplate public BasicEventService(EventRepository<T, ID> eventRepository, EventSource eventSource, @LoadBalanced RestTemplate
restTemplate) { restTemplate) {
this.eventRepository = eventRepository; this.eventRepository = eventRepository;
this.eventStream = eventStream; this.eventSource = eventSource;
this.restTemplate = restTemplate; this.restTemplate = restTemplate;
} }
@@ -69,7 +68,7 @@ public class BasicEventService<T extends Event, ID extends Serializable> impleme
} }
public <S extends T> Boolean sendAsync(S event, Link... links) { public <S extends T> Boolean sendAsync(S event, Link... links) {
return eventStream.output() return eventSource.getChannel()
.send(MessageBuilder.withPayload(event) .send(MessageBuilder.withPayload(event)
.setHeader("contentType", MediaType.APPLICATION_JSON_UTF8_VALUE) .setHeader("contentType", MediaType.APPLICATION_JSON_UTF8_VALUE)
.build()); .build());

View File

@@ -20,18 +20,21 @@ import org.springframework.web.client.RestTemplate;
public class EventAutoConfig { public class EventAutoConfig {
private EventRepository eventRepository; private EventRepository eventRepository;
private Source source;
private RestTemplate restTemplate; private RestTemplate restTemplate;
public EventAutoConfig(EventRepository eventRepository, Source source, RestTemplate restTemplate) { public EventAutoConfig(EventRepository eventRepository, RestTemplate restTemplate) {
this.eventRepository = eventRepository; this.eventRepository = eventRepository;
this.source = source;
this.restTemplate = restTemplate; this.restTemplate = restTemplate;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Bean @Bean
public EventService eventService() { public EventService eventService(EventSource eventSource) {
return new BasicEventService(eventRepository, source, restTemplate); return new BasicEventService(eventRepository, eventSource, restTemplate);
}
@Bean
public EventSource eventSource(Source source) {
return new EventSource(source.output());
} }
} }

View File

@@ -0,0 +1,16 @@
package demo.event;
import org.springframework.messaging.MessageChannel;
public class EventSource {
private MessageChannel channel;
public EventSource(MessageChannel channel) {
this.channel = channel;
}
public MessageChannel getChannel() {
return channel;
}
}

View File

@@ -1,5 +1,6 @@
package demo.config; package demo.config;
import demo.event.EventSource;
import demo.inventory.config.InventoryEventSource; import demo.inventory.config.InventoryEventSource;
import demo.inventory.event.InventoryEventRepository; import demo.inventory.event.InventoryEventRepository;
import demo.inventory.event.InventoryEventService; import demo.inventory.event.InventoryEventService;
@@ -9,6 +10,7 @@ import demo.reservation.event.ReservationEventService;
import demo.warehouse.config.WarehouseEventSource; import demo.warehouse.config.WarehouseEventSource;
import demo.warehouse.event.WarehouseEventRepository; import demo.warehouse.event.WarehouseEventRepository;
import demo.warehouse.event.WarehouseEventService; import demo.warehouse.event.WarehouseEventService;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
@@ -21,21 +23,39 @@ import org.springframework.web.client.RestTemplate;
@Configuration @Configuration
public class EventConfig { public class EventConfig {
@Bean
public EventSource inventoryChannel(InventoryEventSource eventSource) {
return new EventSource(eventSource.output());
}
@Bean
public EventSource warehouseChannel(WarehouseEventSource eventSource) {
return new EventSource(eventSource.output());
}
@Bean
public EventSource reservationChannel(ReservationEventSource eventSource) {
return new EventSource(eventSource.output());
}
@Bean @Bean
public InventoryEventService inventoryEventService(RestTemplate restTemplate, InventoryEventRepository public InventoryEventService inventoryEventService(RestTemplate restTemplate, InventoryEventRepository
inventoryEventRepository, InventoryEventSource eventStream) { inventoryEventRepository, InventoryEventSource eventStream, Source source) {
return new InventoryEventService(inventoryEventRepository, eventStream, restTemplate); return new InventoryEventService(inventoryEventRepository, inventoryChannel(eventStream), restTemplate, source);
} }
@Bean @Bean
public WarehouseEventService warehouseEventService(RestTemplate restTemplate, WarehouseEventRepository public WarehouseEventService warehouseEventService(RestTemplate restTemplate, WarehouseEventRepository
warehouseEventRepository, WarehouseEventSource eventStream) { warehouseEventRepository, WarehouseEventSource eventStream, Source source) {
return new WarehouseEventService(warehouseEventRepository, eventStream, restTemplate); return new WarehouseEventService(warehouseEventRepository, warehouseChannel(eventStream), restTemplate, source);
} }
@Bean @Bean
public ReservationEventService reservationEventService(RestTemplate restTemplate, ReservationEventRepository public ReservationEventService reservationEventService(RestTemplate restTemplate, ReservationEventRepository
reservationEventRepository, ReservationEventSource eventStream) { reservationEventRepository, ReservationEventSource eventStream, Source source) {
return new ReservationEventService(reservationEventRepository, eventStream, restTemplate); return new ReservationEventService(reservationEventRepository, reservationChannel(eventStream), restTemplate,
source);
} }
} }

View File

@@ -0,0 +1,10 @@
package demo.config;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBinding(Source.class)
public class StreamConfig {
}

View File

@@ -1,13 +1,11 @@
package demo.inventory.config; package demo.inventory.config;
import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
public interface InventoryEventSource extends Source { public interface InventoryEventSource {
String OUTPUT = "inventory"; String OUTPUT = "inventory";
@Override
@Output(InventoryEventSource.OUTPUT) @Output(InventoryEventSource.OUTPUT)
MessageChannel output(); MessageChannel output();
} }

View File

@@ -1,12 +1,30 @@
package demo.inventory.event; package demo.inventory.event;
import demo.event.BasicEventService; import demo.event.BasicEventService;
import demo.inventory.config.InventoryEventSource; import demo.event.EventSource;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.hateoas.Link;
import org.springframework.http.MediaType;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
public class InventoryEventService extends BasicEventService<InventoryEvent, Long> { public class InventoryEventService extends BasicEventService<InventoryEvent, Long> {
public InventoryEventService(InventoryEventRepository eventRepository, InventoryEventSource eventStream,
RestTemplate restTemplate) { private final Source source;
public InventoryEventService(InventoryEventRepository eventRepository, EventSource eventStream, RestTemplate
restTemplate, Source source) {
super(eventRepository, eventStream, restTemplate); super(eventRepository, eventStream, restTemplate);
this.source = source;
}
@Override
public <S extends InventoryEvent> Boolean sendAsync(S event, Link... links) {
// Send a duplicate event to the warehouse stream group output channel for data flow
source.output()
.send(MessageBuilder.withPayload(event)
.setHeader("contentType", MediaType.APPLICATION_JSON_UTF8_VALUE)
.build());
return super.sendAsync(event, links);
} }
} }

View File

@@ -1,13 +1,11 @@
package demo.reservation.config; package demo.reservation.config;
import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
public interface ReservationEventSource extends Source { public interface ReservationEventSource {
String OUTPUT = "reservation"; String OUTPUT = "reservation";
@Override
@Output(ReservationEventSource.OUTPUT) @Output(ReservationEventSource.OUTPUT)
MessageChannel output(); MessageChannel output();
} }

View File

@@ -1,12 +1,30 @@
package demo.reservation.event; package demo.reservation.event;
import demo.event.BasicEventService; import demo.event.BasicEventService;
import demo.reservation.config.ReservationEventSource; import demo.event.EventSource;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.hateoas.Link;
import org.springframework.http.MediaType;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
public class ReservationEventService extends BasicEventService<ReservationEvent, Long> { public class ReservationEventService extends BasicEventService<ReservationEvent, Long> {
public ReservationEventService(ReservationEventRepository eventRepository, ReservationEventSource eventStream,
RestTemplate restTemplate) { private final Source source;
super(eventRepository, eventStream, restTemplate);
public ReservationEventService(ReservationEventRepository reservationEventRepository, EventSource eventSource,
RestTemplate restTemplate, Source source) {
super(reservationEventRepository, eventSource, restTemplate);
this.source = source;
}
@Override
public <S extends ReservationEvent> Boolean sendAsync(S event, Link... links) {
// Send a duplicate event to the warehouse stream group output channel for data flow
source.output()
.send(MessageBuilder.withPayload(event)
.setHeader("contentType", MediaType.APPLICATION_JSON_UTF8_VALUE)
.build());
return super.sendAsync(event, links);
} }
} }

View File

@@ -1,13 +1,11 @@
package demo.warehouse.config; package demo.warehouse.config;
import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
public interface WarehouseEventSource extends Source { public interface WarehouseEventSource {
String OUTPUT = "warehouse"; String OUTPUT = "warehouse";
@Override
@Output(WarehouseEventSource.OUTPUT) @Output(WarehouseEventSource.OUTPUT)
MessageChannel output(); MessageChannel output();
} }

View File

@@ -1,12 +1,30 @@
package demo.warehouse.event; package demo.warehouse.event;
import demo.event.BasicEventService; import demo.event.BasicEventService;
import demo.warehouse.config.WarehouseEventSource; import demo.event.EventSource;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.hateoas.Link;
import org.springframework.http.MediaType;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
public class WarehouseEventService extends BasicEventService<WarehouseEvent, Long> { public class WarehouseEventService extends BasicEventService<WarehouseEvent, Long> {
public WarehouseEventService(WarehouseEventRepository eventRepository, WarehouseEventSource eventStream,
RestTemplate restTemplate) { private final Source source;
super(eventRepository, eventStream, restTemplate);
public WarehouseEventService(WarehouseEventRepository warehouseEventRepository, EventSource eventSource,
RestTemplate restTemplate, Source source) {
super(warehouseEventRepository, eventSource, restTemplate);
this.source = source;
}
@Override
public <S extends WarehouseEvent> Boolean sendAsync(S event, Link... links) {
// Send a duplicate event to the warehouse stream group output channel for data flow
source.output()
.send(MessageBuilder.withPayload(event)
.setHeader("contentType", MediaType.APPLICATION_JSON_UTF8_VALUE)
.build());
return super.sendAsync(event, links);
} }
} }

View File

@@ -20,6 +20,9 @@ spring:
inventory: inventory:
contentType: 'application/json' contentType: 'application/json'
destination: inventory destination: inventory
output:
contentType: 'application/json'
destination: warehouse-stream
jpa: jpa:
show_sql: false show_sql: false
database: H2 database: H2
@@ -47,6 +50,9 @@ spring:
inventory: inventory:
contentType: 'application/json' contentType: 'application/json'
destination: inventory destination: inventory
output:
contentType: 'application/json'
destination: warehouse-stream
datasource: datasource:
url: jdbc:mysql://${DOCKER_IP:192.168.99.100}:3306/dev url: jdbc:mysql://${DOCKER_IP:192.168.99.100}:3306/dev
username: root username: root
@@ -93,6 +99,9 @@ spring:
inventory: inventory:
contentType: 'application/json' contentType: 'application/json'
destination: inventory destination: inventory
output:
contentType: 'application/json'
destination: warehouse-stream
eureka: eureka:
instance: instance:
hostname: ${vcap.application.uris[0]:localhost} hostname: ${vcap.application.uris[0]:localhost}

View File

@@ -7,29 +7,29 @@ server:
spring: spring:
profiles: development profiles: development
cloud: cloud:
stream: stream:
bindings: bindings:
warehouse: warehouse:
contentType: 'application/json' contentType: 'application/json'
destination: warehouse destination: warehouse
group: warehouse-group group: warehouse-group
consumer: consumer:
durableSubscription: true durableSubscription: true
concurrency: 5 concurrency: 5
reservation: reservation:
contentType: 'application/json' contentType: 'application/json'
destination: reservation destination: reservation
group: reservation-group group: reservation-group
consumer: consumer:
durableSubscription: true durableSubscription: true
concurrency: 5 concurrency: 5
inventory: inventory:
contentType: 'application/json' contentType: 'application/json'
destination: inventory destination: inventory
group: inventory-group group: inventory-group
consumer: consumer:
durableSubscription: true durableSubscription: true
concurrency: 5 concurrency: 5
--- ---
spring: spring:
profiles: docker profiles: docker
@@ -37,29 +37,29 @@ spring:
host: ${DOCKER_IP:192.168.99.100} host: ${DOCKER_IP:192.168.99.100}
port: 5672 port: 5672
cloud: cloud:
stream: stream:
bindings: bindings:
warehouse: warehouse:
contentType: 'application/json' contentType: 'application/json'
destination: warehouse destination: warehouse
group: warehouse-group group: warehouse-group
consumer: consumer:
durableSubscription: true durableSubscription: true
concurrency: 20 concurrency: 20
reservation: reservation:
contentType: 'application/json' contentType: 'application/json'
destination: reservation destination: reservation
group: reservation-group group: reservation-group
consumer: consumer:
durableSubscription: true durableSubscription: true
concurrency: 20 concurrency: 20
inventory: inventory:
contentType: 'application/json' contentType: 'application/json'
destination: inventory destination: inventory
group: inventory-group group: inventory-group
consumer: consumer:
durableSubscription: true durableSubscription: true
concurrency: 20 concurrency: 20
eureka: eureka:
client: client:
service-url: service-url:
@@ -79,29 +79,29 @@ eureka:
spring: spring:
profiles: cloud profiles: cloud
cloud: cloud:
stream: stream:
bindings: bindings:
warehouse: warehouse:
contentType: 'application/json' contentType: 'application/json'
destination: warehouse destination: warehouse
group: warehouse-group group: warehouse-group
consumer: consumer:
durableSubscription: true durableSubscription: true
concurrency: 5 concurrency: 5
reservation: reservation:
contentType: 'application/json' contentType: 'application/json'
destination: reservation destination: reservation
group: reservation-group group: reservation-group
consumer: consumer:
durableSubscription: true durableSubscription: true
concurrency: 5 concurrency: 5
inventory: inventory:
contentType: 'application/json' contentType: 'application/json'
destination: inventory destination: inventory
group: inventory-group group: inventory-group
consumer: consumer:
durableSubscription: true durableSubscription: true
concurrency: 5 concurrency: 5
eureka: eureka:
instance: instance:
hostname: ${vcap.application.uris[0]:localhost} hostname: ${vcap.application.uris[0]:localhost}