9 Commits

Author SHA1 Message Date
Markus Günther
67ead6e5a1 Disables the E2E test as it requires a working external cluster 2021-02-19 17:15:16 +01:00
Markus Günther
a4474364d0 Adds an example for an E2E system test that leverages Kafka for JUnit 2021-02-19 16:50:10 +01:00
Markus Günther
4fd05cfbc7 Uses OpenJDK 8 for Travis CI builds 2021-02-06 12:50:35 +01:00
Markus Günther
19757a7733 Fixes a couple of things that broke down with the update 2021-02-05 22:53:00 +01:00
Markus Günther
847a5a34d5 Fixes Spring Cloud Gateway configuration 2021-02-05 21:40:38 +01:00
Markus Günther
6e72c0e649 Updates dependencies to latest versions of Spring Boot, Spring for Apache Kafka, Kafka Clients, ... 2021-02-05 20:05:10 +01:00
Markus Günther
0ab26c6930 Puts consumers into read_committed mode 2017-12-08 16:49:19 +01:00
Markus Günther
dccec0673a Adds Travis CI build configuration 2017-12-08 16:48:28 +01:00
Markus Günther
89335ee919 Issue #003: Improve documentation of service documents 2017-11-10 11:07:50 +01:00
44 changed files with 614 additions and 411 deletions

3
.travis.yml Normal file
View File

@@ -0,0 +1,3 @@
language: java
jdk:
- openjdk8

View File

@@ -1,6 +1,8 @@
# Event Sourcing using Spring Kafka
This repository contains a sample application that demonstrates how to implement an Event-sourced systems using the CQRS architectural style. The solution uses Apache Kafka, which we easily integrate into a Spring Boot based application using Spring Kafka, Apache Avro for event serialization and deserialization and uses an in-memory H2 database that contributes to the query side of our CQRS-based system. The application itself is minimal and implements a subset of David Allen's Getting Things Done time management method.
[![Build Status](https://travis-ci.org/mguenther/spring-kafka-event-sourcing-sampler.svg?branch=master)](https://travis-ci.org/mguenther/spring-kafka-event-sourcing-sampler.svg)
This repository contains a sample application that demonstrates how to implement an Event-sourced systems using the CQRS architectural style. The solution uses Apache Kafka, which we easily integrate into a Spring Boot based application using [Spring for Apache Kafka](https://spring.io/projects/spring-kafka) (2.6.5), Apache Avro for event serialization and deserialization and uses an in-memory H2 database that contributes to the query side of our CQRS-based system. The application itself is minimal and implements a subset of David Allen's Getting Things Done time management method.
The code presented in this repository is the joint work of [Boris Fresow](mailto://bfresow@gmail.com) and [Markus Günther](mailto://markus.guenther@gmail.com) as part of an article series on **Building Event-based applications with Spring Kafka** for the German [JavaMagazin](https://jaxenter.de/magazine/java-magazin).
@@ -12,8 +14,8 @@ Running the showcase requires a working installation of Apache ZooKeeper and Apa
| Application | Version | Docker Image |
| ------------------- | --------- | ----------------------- |
| Apache Kafka | 0.11.0.0 | kafka-sampler/kafka |
| Apache ZooKeeper | 3.4.8-1 | kafka-sampler/zookeeper |
| Apache Kafka | 2.6.0 | wurstmeister/kafka:2.13-2.6.0 |
| Apache ZooKeeper | 3.4.13 | wurstmeister/zookeeper |
### Building and Running the Containers

View File

@@ -1,13 +1,14 @@
version: '2'
version: '2.2'
services:
zookeeper:
image: getting-things-done/zookeeper
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
- 2181:2181
kafka:
image: getting-things-done/kafka
links:
- zookeeper
image: wurstmeister/kafka:2.13-2.6.0
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ADVERTISED_HOST_NAME: kafka
@@ -15,6 +16,9 @@ services:
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 5
KAFKA_CREATE_TOPICS: "topic-getting-things-done:5:1"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
volumes:
- /var/run/docker.sock:/var/run/docker.sock
discovery:

View File

@@ -1,32 +0,0 @@
FROM phusion/baseimage:latest
MAINTAINER Markus Günther <markus.guenther@gmail.com>
ENV DEBIAN_FRONTEND noninteractive
ENV SCALA_VERSION 2.11
ENV KAFKA_VERSION 0.11.0.0
ENV KAFKA_HOME /opt/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION"
# Install Oracle Java 8, some utilities and Kafka
RUN \
echo oracle-java8-installer shared/accepted-oracle-license-v1-1 select true | debconf-set-selections && \
add-apt-repository -y ppa:webupd8team/java && \
apt-get update && \
apt-get install -y oracle-java8-installer && \
apt-get install -y wget supervisor dnsutils curl jq coreutils docker net-tools && \
rm -rf /var/lib/apt/lists/* && \
rm -rf /var/cache/oracle-jdk8-installer && \
apt-get clean && \
wget -q http://apache.mirrors.spacedump.net/kafka/"$KAFKA_VERSION"/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && \
tar xfz /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -C /opt && \
rm /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz
VOLUME ["/kafka"]
ADD kafka-start /usr/bin/kafka-start
ADD kafka-create-topics /usr/bin/kafka-create-topics
ADD supervisord.conf /etc/supervisor/conf.d/
EXPOSE 9092
CMD ["/sbin/my_init", "kafka-start"]

View File

@@ -1,36 +0,0 @@
#!/bin/bash
if [[ -z "$START_TIMEOUT" ]]; then
START_TIMEOUT=600
fi
start_timeout_exceeded=false
count=0
step=10
while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
echo "waiting for kafka to be ready"
sleep $step;
count=$(expr $count + $step)
if [ $count -gt $START_TIMEOUT ]; then
start_timeout_exceeded=true
break
fi
done
if $start_timeout_exceeded; then
echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
exit 1
fi
if [[ -n $KAFKA_CREATE_TOPICS ]]; then
IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
echo "creating topics: $topicToCreate"
IFS=':' read -a topicConfig <<< "$topicToCreate"
if [ ${topicConfig[3]} ]; then
JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --config cleanup.policy="${topicConfig[3]}" --if-not-exists
else
JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --if-not-exists
fi
done
fi

View File

@@ -1,138 +0,0 @@
#!/bin/bash
if [[ -z "$KAFKA_PORT" ]]; then
export KAFKA_PORT=9092
fi
create-topics.sh &
if [[ -z "$KAFKA_ADVERTISED_PORT" && \
-z "$KAFKA_LISTENERS" && \
-z "$KAFKA_ADVERTISED_LISTENERS" && \
-S /var/run/docker.sock ]]; then
export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
fi
if [[ -z "$KAFKA_BROKER_ID" ]]; then
if [[ -n "$BROKER_ID_COMMAND" ]]; then
export KAFKA_BROKER_ID=$(eval $BROKER_ID_COMMAND)
else
# By default auto allocate broker ID
export KAFKA_BROKER_ID=-1
fi
fi
if [[ -z "$KAFKA_LOG_DIRS" ]]; then
export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
fi
if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
fi
if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
unset KAFKA_HEAP_OPTS
fi
if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then
export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND)
fi
if [[ -n "$KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" ]]; then
if [[ -n "$KAFKA_ADVERTISED_PORT" && -n "$KAFKA_ADVERTISED_PROTOCOL_NAME" ]]; then
export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_PROTOCOL_NAME}://${KAFKA_ADVERTISED_HOST_NAME-}:${KAFKA_ADVERTISED_PORT}"
export KAFKA_LISTENERS="$KAFKA_ADVERTISED_PROTOCOL_NAME://:$KAFKA_ADVERTISED_PORT"
fi
if [[ -z "$KAFKA_PROTOCOL_NAME" ]]; then
export KAFKA_PROTOCOL_NAME="${KAFKA_ADVERTISED_PROTOCOL_NAME}"
fi
if [[ -n "$KAFKA_PORT" && -n "$KAFKA_PROTOCOL_NAME" ]]; then
export ADD_LISTENER="${KAFKA_PROTOCOL_NAME}://${KAFKA_HOST_NAME-}:${KAFKA_PORT}"
fi
if [[ -z "$KAFKA_INTER_BROKER_LISTENER_NAME" ]]; then
export KAFKA_INTER_BROKER_LISTENER_NAME=$KAFKA_PROTOCOL_NAME
fi
else
#DEFAULT LISTENERS
export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME-}:${KAFKA_ADVERTISED_PORT-$KAFKA_PORT}"
export KAFKA_LISTENERS="PLAINTEXT://${KAFKA_HOST_NAME-}:${KAFKA_PORT-9092}"
fi
if [[ -n "$ADD_LISTENER" && -n "$KAFKA_LISTENERS" ]]; then
export KAFKA_LISTENERS="${KAFKA_LISTENERS},${ADD_LISTENER}"
fi
if [[ -n "$ADD_LISTENER" && -z "$KAFKA_LISTENERS" ]]; then
export KAFKA_LISTENERS="${ADD_LISTENER}"
fi
if [[ -n "$ADD_LISTENER" && -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_LISTENERS},${ADD_LISTENER}"
fi
if [[ -n "$ADD_LISTENER" && -z "$KAFKA_ADVERTISED_LISTENERS" ]]; then
export KAFKA_ADVERTISED_LISTENERS="${ADD_LISTENER}"
fi
if [[ -n "$KAFKA_INTER_BROKER_LISTENER_NAME" && ! "$KAFKA_INTER_BROKER_LISTENER_NAME"X = "$KAFKA_PROTOCOL_NAME"X ]]; then
if [[ -n "$KAFKA_INTER_BROKER_PORT" ]]; then
export KAFKA_INTER_BROKER_PORT=$(( $KAFKA_PORT + 1 ))
fi
export INTER_BROKER_LISTENER="${KAFKA_INTER_BROKER_LISTENER_NAME}://:${KAFKA_INTER_BROKER_PORT}"
export KAFKA_LISTENERS="${KAFKA_LISTENERS},${INTER_BROKER_LISTENER}"
export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_LISTENERS},${INTER_BROKER_LISTENER}"
unset KAFKA_INTER_BROKER_PORT
unset KAFKA_SECURITY_INTER_BROKER_PROTOCOL
unset INTER_BROKER_LISTENER
fi
if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then
export KAFKA_BROKER_RACK=$(eval $RACK_COMMAND)
fi
#Issue newline to config file in case there is not one already
echo -e "\n" >> $KAFKA_HOME/config/server.properties
unset KAFKA_CREATE_TOPICS
unset KAFKA_ADVERTISED_PROTOCOL_NAME
unset KAFKA_PROTOCOL_NAME
if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
unset KAFKA_ADVERTISED_PORT
unset KAFKA_ADVERTISED_HOST_NAME
fi
if [[ -n "$KAFKA_LISTENERS" ]]; then
unset KAFKA_PORT
unset KAFKA_HOST_NAME
fi
for VAR in `env`
do
if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
else
echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
fi
fi
if [[ $VAR =~ ^LOG4J_ ]]; then
log4j_name=`echo "$VAR" | sed -r "s/(LOG4J_.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
log4j_env=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
if egrep -q "(^|^#)$log4j_name=" $KAFKA_HOME/config/log4j.properties; then
sed -r -i "s@(^|^#)($log4j_name)=(.*)@\2=${!log4j_env}@g" $KAFKA_HOME/config/log4j.properties #note that no config values may contain an '@' char
else
echo "$log4j_name=${!log4j_env}" >> $KAFKA_HOME/config/log4j.properties
fi
fi
done
if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
eval $CUSTOM_INIT_SCRIPT
fi
exec $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

View File

@@ -1,7 +0,0 @@
[supervisord]
nodaemon=true
[program:kafka]
command=/usr/bin/kafka-start
autostart=true
autorestart=true

View File

@@ -1,20 +0,0 @@
FROM phusion/baseimage:latest
MAINTAINER Markus Guenther <markus.guenther@gmail.com>
ENV ZK_VERSION 3.4.8-1
ENV ZK_HOME /usr/share/zookeeper
RUN apt-get update && \
apt-get install -y zookeeper=$ZK_VERSION supervisor dnsutils && \
rm -rf /var/lib/apt/lists/* && \
apt-get clean && \
apt-cache policy zookeeper
VOLUME ["/zookeeper"]
ADD supervisord.conf /etc/supervisor/conf.d/
EXPOSE 2181 2888 3888
CMD ["/sbin/my_init", "supervisord"]

View File

@@ -1,7 +0,0 @@
[supervisord]
nodaemon=true
[program:zookeeper]
command=/usr/share/zookeeper/bin/zkServer.sh start-foreground
autostart=true
autorestart=true

View File

@@ -24,12 +24,20 @@
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zuul</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -1,26 +1,20 @@
package net.mguenther.gtd;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.zuul.EnableZuulProxy;
import org.springframework.context.annotation.Bean;
/**
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/
@SpringBootApplication
@EnableZuulProxy
@EnableDiscoveryClient
@EnableEurekaClient
public class GtdApiGatewayApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(GtdApiGatewayApplication.class).web(true).run(args);
}
@Bean
public OnMethodFilter onMethodZuulFilter() {
return new OnMethodFilter();
SpringApplication.run(GtdApiGatewayApplication.class, args);
}
}

View File

@@ -1,61 +0,0 @@
package net.mguenther.gtd;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.netflix.zuul.filters.support.FilterConstants;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
/**
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/
public class OnMethodFilter extends ZuulFilter {
private static final Logger log = LoggerFactory.getLogger(OnMethodFilter.class);
private static final List<String> methodsForCommands = Arrays.asList("POST", "PUT", "PATCH", "DELETE");
@Override
public String filterType() {
return FilterConstants.PRE_TYPE;
}
@Override
public int filterOrder() {
return FilterConstants.PRE_DECORATION_FILTER_ORDER - 1;
}
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run() {
final RequestContext ctx = RequestContext.getCurrentContext();
final String method = ctx.getRequest().getMethod();
if (isCommand(ctx)) {
log.info("Resolved incoming request using method {} to service ID 'gtd-es-command-side'.", method);
ctx.set("serviceId", "gtd-es-command-side");
ctx.setRouteHost(null);
ctx.addOriginResponseHeader("X-Zuul-ServiceId", UUID.randomUUID().toString());
} else {
log.info("Resolved incoming request using method {} to service ID 'gtd-es-query-side'.", method);
ctx.set("serviceId", "gtd-es-query-side");
ctx.setRouteHost(null);
ctx.addOriginResponseHeader("X-Zuul-ServiceId", UUID.randomUUID().toString());
}
return null;
}
private boolean isCommand(final RequestContext ctx) {
return
StringUtils.isNotEmpty(ctx.getRequest().getMethod()) &&
methodsForCommands.contains(ctx.getRequest().getMethod().toUpperCase());
}
}

View File

@@ -0,0 +1,21 @@
package net.mguenther.gtd;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
@Configuration
public class SpringCloudConfig {
@Bean
public RouteLocator gatewayRoutes(RouteLocatorBuilder builder) {
return builder.routes()
.route(r -> r.method(HttpMethod.DELETE, HttpMethod.PUT, HttpMethod.POST, HttpMethod.PATCH)
.uri("lb://gtd-es-command-side"))
.route(r -> r.method(HttpMethod.GET)
.uri("lb://gtd-es-query-side"))
.build();
}
}

View File

@@ -1,19 +0,0 @@
spring.application.name=gtd-es-api-gateway
info.component=Edge Server
endpoints.restart.enabled=true
endpoints.shutdown.enabled=true
endpoints.health.sensitive=false
ribbon.eureka.enabled=true
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
eureka.client.registerWithEureka=false
server.port=8765
zuul.prefix=/api
zuul.routes.gtd.path: /**
management.security.enabled=false

View File

@@ -5,9 +5,6 @@ management:
security:
enabled: false
info:
component: Edge Server
endpoints:
restart:
enabled: true
@@ -16,9 +13,6 @@ endpoints:
health:
sensitive: false
ribbon:
eurekaEnabled: true
eureka:
client:
enabled: true
@@ -26,12 +20,23 @@ eureka:
enabled: true
serviceUrl:
defaultZone: ${EUREKA_CLIENT_SERVICEURL_DEFAULTZONE:http://localhost:8761/eureka/}
register-with-eureka: true
fetch-registry: true
instance:
appname: gtd-es-api-gateway
preferIpAddress: true
zuul:
prefix: /api
routes:
gtd:
path: /**
spring:
application:
name: gtd-es-api-gateway
cloud:
discovery:
enabled: true
client:
health-indicator:
enabled: true
gateway:
discovery:
locator:
lower-case-service-id: true
enabled: true

View File

@@ -14,22 +14,25 @@ import java.time.Instant;
import java.util.Date;
/**
* Converts bidirectionally between domain events and their respective Avro representation.
* This is a bit of a mess, but we have to cope with it due to the lack of polymorphy and
* inheritance in Avro.
*
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/
@Component
public class ItemEventConverter {
private AvroItemEvent wrap(final ItemEvent event, final Object eventPayload) {
return AvroItemEvent
.newBuilder()
.setEventId(event.getEventId())
.setTimestamp(event.getTimestamp())
.setData(eventPayload)
.build();
}
/**
* Consumes a domain event of type {@code ItemEvent} and returns its corresponding
* Avro type (cf. {@code AvroItemEvent}).
*
* @param event
* the domain event that ought to be converted
* @return
* instance of {@code AvroItemEvent} that mirrors the domain event
*/
public AvroItemEvent from(final ItemEvent event) {
if (event instanceof ItemCreated) return from((ItemCreated) event);
@@ -111,6 +114,25 @@ public class ItemEventConverter {
return wrap(event, avroEvent);
}
private AvroItemEvent wrap(final ItemEvent event, final Object eventPayload) {
return AvroItemEvent
.newBuilder()
.setEventId(event.getEventId())
.setTimestamp(event.getTimestamp())
.setData(eventPayload)
.build();
}
/**
* Consumes an Avro event of type {@code AvroItemEvent} and returns its corresponding
* domain event (cf. {@code ItemEvent}).
*
* @param event
* the Avro event that ought to be converted
* @return
* instance of {@code ItemEvent} that mirrros the Avro event
*/
public ItemEvent to(final AvroItemEvent event) {
final String eventId = String.valueOf(event.getEventId());

View File

@@ -11,6 +11,9 @@ import java.io.IOException;
import java.util.Map;
/**
* Simple {@code Deserializer} that operates on {@code AvroItemEvent}. This {@code Deserializer} does
* not support multiple versions of the same schema.
*
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/

View File

@@ -12,6 +12,9 @@ import java.io.IOException;
import java.util.Map;
/**
* Simple {@code Serializer} that operates on {@code AvroItemEvent}. This {@code Serializer} does
* not support multiple versions of the same schema.
*
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/

View File

@@ -78,7 +78,7 @@
<!-- Spring Cloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -14,6 +14,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
/**
@@ -37,9 +38,11 @@ public class ItemsCommandResource {
log.info("Received a create item request with data {}.", createItem);
ItemCommand createItemCommand = commandsFor(createItem);
return commandHandler
.onCommand(commandsFor(createItem))
.thenApply(dontCare -> ResponseEntity.accepted().build())
.onCommand(createItemCommand)
.thenApply(dontCare -> ResponseEntity.created(itemUri(createItemCommand.getItemId())).build())
.exceptionally(e -> {
log.warn("Caught an exception at the service boundary.", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
@@ -49,4 +52,8 @@ public class ItemsCommandResource {
private ItemCommand commandsFor(final CreateItemRequest createItem) {
return new CreateItem(createItem.getDescription());
}
private URI itemUri(final String itemId) {
return URI.create("/items/" + itemId);
}
}

View File

@@ -6,6 +6,10 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Accepts a command (or a list of commands) and attempts to validate them against the current
* state of the referenced aggregate. If the validation holds, the {@code CommandHandler} emits
* corresponding events to the event log.
*
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/

View File

@@ -3,6 +3,8 @@ package net.mguenther.gtd.domain;
import net.mguenther.gtd.domain.event.ItemEvent;
/**
* Publishes a given {@code ItemEvent} to an event log.
*
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/

View File

@@ -12,6 +12,10 @@ import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* Consumes {@code AvroItemEvent}s that update the internal state of domain for the command-side. This is
* essential so that the validation that a {@code CommandHandler} performs always goes against the most
* recent state.
*
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/
@@ -31,7 +35,7 @@ public class ItemEventConsumer {
this.eventHandler = eventHandler;
}
@KafkaListener(topics = "${gtd.topic}", group = "getting-things-done")
@KafkaListener(topics = "${gtd.topic}", groupId = "${gtd.groupId}")
public void consume(final AvroItemEvent itemEvent, final Acknowledgment ack) {
final ItemEvent event = converter.to(itemEvent);
log.debug("Received event {}. Trying to apply it to the latest state of aggregate with ID {}.", event, event.getItemId());

View File

@@ -13,6 +13,8 @@ import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
/**
* Publisher for {@code ItemEvent}s that exhibits transactional guarantees.
*
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/

View File

@@ -24,8 +24,7 @@ public class TransactionalItemEventPublisherConfig {
@Bean
public ProducerFactory<String, AvroItemEvent> producerFactory() {
final Map<String, Object> config = new HashMap<>();
// TODO (mgu): Extract to config value
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ItemEventSerializer.class);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

View File

@@ -1,25 +1,32 @@
server:
contextPath: /api
port: 8089
servlet:
context-path: /api
management:
security:
enabled: false
spring:
application:
name: gtd-es-command-side
kafka:
bootstrapServers: localhost:9092
consumer:
groupId: getting-things-done-command
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: net.mguenther.gtd.kafka.serialization.ItemEventDeserializer
autoOffsetReset: earliest
enableAutoCommit: false
isolationLevel: read_committed
listener:
ackMode: MANUAL
producer:
bootstrapServers: localhost:9092
transaction-id-prefix: gtd-es
gtd:
topic: topic-getting-things-done
groupId: getting-thigns-done-command
eureka:
client:
@@ -28,6 +35,7 @@ eureka:
enabled: true
serviceUrl:
defaultZone: ${EUREKA_CLIENT_SERVICEURL_DEFAULTZONE:http://localhost:8761/eureka/}
register-with-eureka: true
instance:
appname: gtd-es-command-side
preferIpAddress: true

View File

@@ -1,3 +1,9 @@
spring:
application:
name: gtd-es-command-side
cloud:
discovery:
enabled: true
client:
health-indicator:
enabled: true

View File

@@ -5,6 +5,8 @@ import net.mguenther.gtd.domain.event.ItemEvent;
import java.util.concurrent.CompletableFuture;
/**
* Consumes {@code ItemEvent}s and acts upon them.
*
* @author Markus Günther (markus.guenther@gmail.com)
* @author Boris Fresow (bfresow@gmail.com)
*/

View File

@@ -91,6 +91,13 @@ public class Item implements Serializable {
return associatedList;
}
/**
* Mutates the state of this {@code Item} in compliance with the given {@code ItemEvent}.
*
* @param event
* an event that occured in the system and that signals a change of state
* for the aggregate {@code Item}
*/
public void project(final ItemEvent event) {
if (event instanceof DueDateAssigned) project((DueDateAssigned) event);
else if (event instanceof RequiredTimeAssigned) project((RequiredTimeAssigned) event);
@@ -101,15 +108,15 @@ public class Item implements Serializable {
else throw new IllegalStateException("Unrecognized event: " + event.toString());
}
public void project(final DueDateAssigned event) {
private void project(final DueDateAssigned event) {
this.dueDate = event.getDueDate();
}
public void project(final RequiredTimeAssigned event) {
private void project(final RequiredTimeAssigned event) {
this.requiredTime = event.getRequiredTime();
}
public void project(final TagAssigned event) {
private void project(final TagAssigned event) {
synchronized (this) {
if (!tags.contains(event.getTag())) {
tags.add(event.getTag());
@@ -117,17 +124,17 @@ public class Item implements Serializable {
}
}
public void project(final TagRemoved event) {
private void project(final TagRemoved event) {
synchronized (this) {
tags.remove(event.getTag());
}
}
public void project(final ItemConcluded event) {
private void project(final ItemConcluded event) {
this.done = true;
}
public void project(final ItemMovedToList event) {
private void project(final ItemMovedToList event) {
this.associatedList = event.getList();
}
}

View File

@@ -25,7 +25,7 @@
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

88
gtd-e2e-tests/pom.xml Normal file
View File

@@ -0,0 +1,88 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>net.mguenther.gtd</groupId>
<artifactId>gtd-parent</artifactId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<artifactId>gtd-e2e-tests</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>gtd-e2e-tests</name>
<dependencies>
<!-- Intra-project dependencies -->
<dependency>
<groupId>net.mguenther.gtd</groupId>
<artifactId>gtd-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.mguenther.gtd</groupId>
<artifactId>gtd-codec</artifactId>
<version>${project.version}</version>
</dependency>
<!-- HTTP client -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-core</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-slf4j</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-jaxrs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Testing -->
<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>${kafka.junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,24 @@
package net.mguenther.gtd.client;
/**
* @author Markus Günther (markus.guenther@gmail.com)
*/
public class CreateItem {
private final String description;
public CreateItem(final String description) {
this.description = description;
}
public String getDescription() {
return description;
}
@Override
public String toString() {
return "CreateItemRequest{" +
"description='" + description + '\'' +
'}';
}
}

View File

@@ -0,0 +1,27 @@
package net.mguenther.gtd.client;
import feign.Headers;
import feign.Param;
import feign.RequestLine;
import feign.Response;
import java.util.List;
public interface GettingThingsDone {
@RequestLine("POST /items")
@Headers("Content-Type: application/json")
Response createItem(CreateItem payload);
@RequestLine("PUT /items/{itemId}")
@Headers("Content-Type: application/json")
Response updateItem(@Param("itemId") String itemId, UpdateItem payload);
@RequestLine("GET /items/{itemId}")
@Headers("Accept: application/json")
Item getItem(@Param("itemId") String itemId);
@RequestLine("GET /items")
@Headers("Accept: application/json")
List<Item> getItems();
}

View File

@@ -0,0 +1,53 @@
package net.mguenther.gtd.client;
import java.util.Collections;
import java.util.Date;
import java.util.List;
/**
* @author Markus Günther (markus.guenther@gmail.com)
*/
public class Item {
private String id;
private String description;
private int requiredTime;
private Date dueDate;
private List<String> tags;
private String associatedList;
private boolean done;
public String getId() {
return id;
}
public boolean isDone() {
return done;
}
public String getDescription() {
return description;
}
public int getRequiredTime() {
return requiredTime;
}
public Date getDueDate() {
return dueDate;
}
public List<String> getTags() {
return Collections.unmodifiableList(tags);
}
public String getAssociatedList() {
return associatedList;
}
}

View File

@@ -0,0 +1,59 @@
package net.mguenther.gtd.client;
import java.util.List;
/**
* @author Markus Günther (markus.guenther@gmail.com)
*/
public class UpdateItem {
private Long dueDate;
private Integer requiredTime;
private List<String> tags;
private String associatedList;
public Long getDueDate() {
return dueDate;
}
public void setDueDate(long dueDate) {
this.dueDate = dueDate;
}
public Integer getRequiredTime() {
return requiredTime;
}
public void setRequiredTime(int requiredTime) {
this.requiredTime = requiredTime;
}
public List<String> getTags() {
return tags;
}
public void setTags(List<String> tags) {
this.tags = tags;
}
public String getAssociatedList() {
return associatedList;
}
public void setAssociatedList(String associatedList) {
this.associatedList = associatedList;
}
@Override
public String toString() {
return "UpdateItemRequest{" +
"dueDate=" + dueDate +
", requiredTime=" + requiredTime +
", tags=" + tags +
", associatedList='" + associatedList + '\'' +
'}';
}
}

View File

@@ -0,0 +1,76 @@
package net.mguenther.gtd;
import feign.Feign;
import feign.Logger;
import feign.Response;
import feign.httpclient.ApacheHttpClient;
import feign.jackson.JacksonDecoder;
import feign.jackson.JacksonEncoder;
import feign.slf4j.Slf4jLogger;
import net.mguenther.gtd.client.CreateItem;
import net.mguenther.gtd.client.GettingThingsDone;
import net.mguenther.gtd.domain.event.ItemCreated;
import net.mguenther.gtd.kafka.serialization.AvroItemEvent;
import net.mguenther.gtd.kafka.serialization.ItemEventConverter;
import net.mguenther.gtd.kafka.serialization.ItemEventDeserializer;
import net.mguenther.kafka.junit.ExternalKafkaCluster;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static net.mguenther.kafka.junit.ObserveKeyValues.on;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@Disabled
public class EventPublicationTest {
private static final String URL = "http://localhost:8765/api";
private final ItemEventConverter converter = new ItemEventConverter();
@Test
public void anItemCreatedEventShouldBePublishedAfterCreatingNewItem() throws Exception {
ExternalKafkaCluster kafka = ExternalKafkaCluster.at("http://localhost:9092");
GettingThingsDone gtd = createGetthingThingsDoneClient();
String itemId = extractItemId(gtd.createItem(new CreateItem("I gotta do my homework!")));
List<AvroItemEvent> publishedEvents = kafka
.observeValues(on("topic-getting-things-done", 1, AvroItemEvent.class)
.observeFor(10, TimeUnit.SECONDS)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ItemEventDeserializer.class)
.filterOnKeys(aggregateId -> aggregateId.equals(itemId)));
ItemCreated itemCreatedEvent = publishedEvents.stream()
.findFirst()
.map(converter::to)
.map(e -> (ItemCreated) e)
.orElseThrow(AssertionError::new);
assertThat(itemCreatedEvent.getItemId(), equalTo(itemId));
assertThat(itemCreatedEvent.getDescription(), equalTo("I gotta do my homework!"));
}
private GettingThingsDone createGetthingThingsDoneClient() {
return Feign.builder()
.client(new ApacheHttpClient())
.encoder(new JacksonEncoder())
.decoder(new JacksonDecoder())
.logger(new Slf4jLogger(GettingThingsDone.class))
.logLevel(Logger.Level.FULL)
.target(GettingThingsDone.class, URL);
}
private String extractItemId(final Response response) {
return response.headers()
.get("Location")
.stream()
.findFirst()
.map(s -> s.replace("/items/", ""))
.orElseThrow(AssertionError::new);
}
}

View File

@@ -82,7 +82,7 @@
<!-- Spring Cloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -8,6 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@@ -40,4 +41,6 @@ public class ItemsQueryResource {
.thenApply(ResponseEntity::ok)
.exceptionally(e -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
}
}

View File

@@ -27,8 +27,7 @@ public class DefaultItemView implements ItemView {
public CompletableFuture<List<Item>> getItems() {
return CompletableFuture.supplyAsync(() -> {
final List<Item> items = new ArrayList<>();
items.addAll(repository.findAll());
final List<Item> items = new ArrayList<>(repository.findAll());
return Collections.unmodifiableList(items);
});
}
@@ -36,6 +35,6 @@ public class DefaultItemView implements ItemView {
@Override
public CompletableFuture<Optional<Item>> getItem(final String itemId) {
return CompletableFuture.supplyAsync(() -> Optional.ofNullable(repository.findOne(itemId)));
return CompletableFuture.supplyAsync(() -> repository.findById(itemId));
}
}

View File

@@ -44,9 +44,10 @@ public class ItemUpdater implements EventHandler {
}
private void modifyExistingItem(final ItemEvent event) {
final Item item = repository.findOne(event.getItemId());
item.project(event);
final Item updatedItem = repository.save(item);
log.info("Applied event {} to the aggregate with ID {} and current state {}.", event, event.getItemId(), updatedItem);
repository.findById(event.getItemId()).ifPresent(item -> {
item.project(event);
final Item updatedItem = repository.save(item);
log.info("Applied event {} to the aggregate with ID {} and current state {}.", event, event.getItemId(), updatedItem);
});
}
}

View File

@@ -31,7 +31,7 @@ public class ItemEventConsumer {
this.eventHandler = eventHandler;
}
@KafkaListener(topics = "${gtd.topic}", group = "getting-things-done")
@KafkaListener(topics = "${gtd.topic}", groupId = "${gtd.groupId}")
public void consume(final AvroItemEvent itemEvent, final Acknowledgment ack) {
final ItemEvent event = converter.to(itemEvent);
log.debug("Received event {}. Trying to apply it to the latest state of aggregate with ID {}.", event, event.getItemId());

View File

@@ -1,21 +1,25 @@
server:
contextPath: /api
port: 8090
servlet:
context-path: /api
spring:
application:
name: gtd-es-query-side
kafka:
bootstrapServers: localhost:9092
consumer:
groupId: getting-things-done-query
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: net.mguenther.gtd.kafka.serialization.ItemEventDeserializer
autoOffsetReset: latest
enableAutoCommit: false
isolationLevel: read_committed
listener:
ackMode: MANUAL
gtd:
topic: topic-getting-things-done
groupId: getting-thigns-done-query
management:
security:
@@ -28,6 +32,7 @@ eureka:
enabled: true
serviceUrl:
defaultZone: ${EUREKA_CLIENT_SERVICEURL_DEFAULTZONE:http://localhost:8761/eureka/}
register-with-eureka: true
instance:
appname: gtd-es-query-side
preferIpAddress: true

View File

@@ -1,3 +1,9 @@
spring:
application:
name: gtd-es-query-side
cloud:
discovery:
enabled: true
client:
health-indicator:
enabled: true

120
pom.xml
View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<version>2.4.2</version>
<relativePath/>
</parent>
@@ -21,29 +21,25 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- Managed dependency versions -->
<junit.version>4.12</junit.version>
<hamcrest.version>1.3</hamcrest.version>
<kafka.junit.version>2.7.0</kafka.junit.version>
<junit.jupiter.version>5.7.0</junit.jupiter.version>
<junit.vintage.version>5.7.0</junit.vintage.version>
<junit.version>4.13.1</junit.version>
<hamcrest.version>2.2</hamcrest.version>
<slf4j.version>1.7.22</slf4j.version>
<spring.kafka.version>1.3.0.BUILD-SNAPSHOT</spring.kafka.version>
<spring-cloud.version>Dalston.SR2</spring-cloud.version>
<spring.version>5.3.3</spring.version>
<spring.kafka.version>2.6.5</spring.kafka.version>
<spring-cloud.version>2020.0.0</spring-cloud.version>
<avro.version>1.8.1</avro.version>
<commons-lang3.version>3.5</commons-lang3.version>
<openfeign.version>11.0</openfeign.version>
<httpclient.version>4.5.13</httpclient.version>
<!-- Plugin versions -->
<plugin.avro.version>1.8.1</plugin.avro.version>
<plugin.avro.version>1.10.1</plugin.avro.version>
<plugin.compiler.version>3.5</plugin.compiler.version>
<plugin.surefire.version>2.22.2</plugin.surefire.version>
</properties>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<modules>
<module>gtd-codec</module>
<module>gtd-common</module>
@@ -51,6 +47,7 @@
<module>gtd-query-side</module>
<module>gtd-api-gateway</module>
<module>gtd-discovery-service</module>
<module>gtd-e2e-tests</module>
</modules>
<build>
@@ -70,6 +67,11 @@
<artifactId>avro-maven-plugin</artifactId>
<version>${plugin.avro.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${plugin.surefire.version}</version>
</plugin>
</plugins>
</pluginManagement>
</build>
@@ -90,11 +92,36 @@
<!-- Spring Cloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<artifactId>spring-cloud-gateway</artifactId>
<version>3.0.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>3.0.1</version>
</dependency>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
@@ -129,11 +156,60 @@
<artifactId>log4j-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- HTTP client -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-core</artifactId>
<version>${openfeign.version}</version>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
<version>${openfeign.version}</version>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-slf4j</artifactId>
<version>${openfeign.version}</version>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-jackson</artifactId>
<version>${openfeign.version}</version>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-jaxrs</artifactId>
<version>${openfeign.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Testing -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>${kafka.junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>