Fixes a couple of things that broke down with the update
This commit is contained in:
@@ -1,7 +1,5 @@
|
||||
package net.mguenther.gtd;
|
||||
|
||||
import com.netflix.discovery.DiscoveryClient;
|
||||
import org.springframework.cloud.gateway.discovery.DiscoveryClientRouteDefinitionLocator;
|
||||
import org.springframework.cloud.gateway.route.RouteLocator;
|
||||
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
@@ -35,7 +35,7 @@ public class ItemEventConsumer {
|
||||
this.eventHandler = eventHandler;
|
||||
}
|
||||
|
||||
@KafkaListener(topics = "${gtd.topic}", groupId = "getting-things-done")
|
||||
@KafkaListener(topics = "${gtd.topic}", groupId = "${gtd.groupId}")
|
||||
public void consume(final AvroItemEvent itemEvent, final Acknowledgment ack) {
|
||||
final ItemEvent event = converter.to(itemEvent);
|
||||
log.debug("Received event {}. Trying to apply it to the latest state of aggregate with ID {}.", event, event.getItemId());
|
||||
|
||||
@@ -24,7 +24,7 @@ public class TransactionalItemEventPublisherConfig {
|
||||
@Bean
|
||||
public ProducerFactory<String, AvroItemEvent> producerFactory() {
|
||||
final Map<String, Object> config = new HashMap<>();
|
||||
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
|
||||
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ItemEventSerializer.class);
|
||||
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
|
||||
|
||||
@@ -13,7 +13,6 @@ spring:
|
||||
kafka:
|
||||
bootstrapServers: localhost:9092
|
||||
consumer:
|
||||
groupId: getting-things-done-command
|
||||
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
valueDeserializer: net.mguenther.gtd.kafka.serialization.ItemEventDeserializer
|
||||
autoOffsetReset: earliest
|
||||
@@ -21,9 +20,13 @@ spring:
|
||||
isolationLevel: read_committed
|
||||
listener:
|
||||
ackMode: MANUAL
|
||||
producer:
|
||||
bootstrapServers: localhost:9092
|
||||
transaction-id-prefix: gtd-es
|
||||
|
||||
gtd:
|
||||
topic: topic-getting-things-done
|
||||
groupId: getting-thigns-done-command
|
||||
|
||||
eureka:
|
||||
client:
|
||||
|
||||
@@ -30,7 +30,7 @@ public class ItemQueryResource {
|
||||
this.itemView = itemView;
|
||||
}
|
||||
|
||||
@RequestMapping(path = "/items/{itemId}", method = RequestMethod.GET, produces = "application/json")
|
||||
/*@RequestMapping(path = "/items/{itemId}", method = RequestMethod.GET, produces = "application/json")
|
||||
public CompletableFuture<ResponseEntity<Item>> showItem(@PathVariable("itemId") String itemId) {
|
||||
|
||||
log.info("Received a show item request for item with ID {}.", itemId);
|
||||
@@ -38,5 +38,5 @@ public class ItemQueryResource {
|
||||
return itemView.getItem(itemId)
|
||||
.thenApply(optionalItem -> optionalItem.map(ResponseEntity::ok).orElse(ResponseEntity.notFound().build()))
|
||||
.exceptionally(e -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
@@ -40,4 +41,14 @@ public class ItemsQueryResource {
|
||||
.thenApply(ResponseEntity::ok)
|
||||
.exceptionally(e -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
|
||||
}
|
||||
|
||||
@RequestMapping(path = "/items/{itemId}", method = RequestMethod.GET, produces = "application/json")
|
||||
public CompletableFuture<ResponseEntity<Item>> showItem(@PathVariable("itemId") String itemId) {
|
||||
|
||||
log.info("Received a show item request for item with ID {}.", itemId);
|
||||
|
||||
return itemView.getItem(itemId)
|
||||
.thenApply(optionalItem -> optionalItem.map(ResponseEntity::ok).orElse(ResponseEntity.notFound().build()))
|
||||
.exceptionally(e -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,6 @@ public class DefaultItemView implements ItemView {
|
||||
@Override
|
||||
public CompletableFuture<Optional<Item>> getItem(final String itemId) {
|
||||
|
||||
return CompletableFuture.supplyAsync(() -> Optional.of(repository.getOne(itemId)));
|
||||
return CompletableFuture.supplyAsync(() -> repository.findById(itemId));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,9 +44,10 @@ public class ItemUpdater implements EventHandler {
|
||||
}
|
||||
|
||||
private void modifyExistingItem(final ItemEvent event) {
|
||||
final Item item = repository.getOne(event.getItemId());
|
||||
item.project(event);
|
||||
final Item updatedItem = repository.save(item);
|
||||
log.info("Applied event {} to the aggregate with ID {} and current state {}.", event, event.getItemId(), updatedItem);
|
||||
repository.findById(event.getItemId()).ifPresent(item -> {
|
||||
item.project(event);
|
||||
final Item updatedItem = repository.save(item);
|
||||
log.info("Applied event {} to the aggregate with ID {} and current state {}.", event, event.getItemId(), updatedItem);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ public class ItemEventConsumer {
|
||||
this.eventHandler = eventHandler;
|
||||
}
|
||||
|
||||
@KafkaListener(topics = "${gtd.topic}", groupId = "getting-things-done")
|
||||
@KafkaListener(topics = "${gtd.topic}", groupId = "${gtd.groupId}")
|
||||
public void consume(final AvroItemEvent itemEvent, final Acknowledgment ack) {
|
||||
final ItemEvent event = converter.to(itemEvent);
|
||||
log.debug("Received event {}. Trying to apply it to the latest state of aggregate with ID {}.", event, event.getItemId());
|
||||
|
||||
@@ -9,7 +9,6 @@ spring:
|
||||
kafka:
|
||||
bootstrapServers: localhost:9092
|
||||
consumer:
|
||||
groupId: getting-things-done-query
|
||||
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
valueDeserializer: net.mguenther.gtd.kafka.serialization.ItemEventDeserializer
|
||||
autoOffsetReset: latest
|
||||
@@ -20,6 +19,7 @@ spring:
|
||||
|
||||
gtd:
|
||||
topic: topic-getting-things-done
|
||||
groupId: getting-thigns-done-query
|
||||
|
||||
management:
|
||||
security:
|
||||
|
||||
Reference in New Issue
Block a user