Compare commits
9 Commits
wip-docume
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67ead6e5a1 | ||
|
|
a4474364d0 | ||
|
|
4fd05cfbc7 | ||
|
|
19757a7733 | ||
|
|
847a5a34d5 | ||
|
|
6e72c0e649 | ||
|
|
0ab26c6930 | ||
|
|
dccec0673a | ||
|
|
89335ee919 |
3
.travis.yml
Normal file
3
.travis.yml
Normal file
@@ -0,0 +1,3 @@
|
||||
language: java
|
||||
jdk:
|
||||
- openjdk8
|
||||
@@ -1,6 +1,8 @@
|
||||
# Event Sourcing using Spring Kafka
|
||||
|
||||
This repository contains a sample application that demonstrates how to implement an Event-sourced systems using the CQRS architectural style. The solution uses Apache Kafka, which we easily integrate into a Spring Boot based application using Spring Kafka, Apache Avro for event serialization and deserialization and uses an in-memory H2 database that contributes to the query side of our CQRS-based system. The application itself is minimal and implements a subset of David Allen's Getting Things Done time management method.
|
||||
[](https://travis-ci.org/mguenther/spring-kafka-event-sourcing-sampler.svg)
|
||||
|
||||
This repository contains a sample application that demonstrates how to implement an Event-sourced systems using the CQRS architectural style. The solution uses Apache Kafka, which we easily integrate into a Spring Boot based application using [Spring for Apache Kafka](https://spring.io/projects/spring-kafka) (2.6.5), Apache Avro for event serialization and deserialization and uses an in-memory H2 database that contributes to the query side of our CQRS-based system. The application itself is minimal and implements a subset of David Allen's Getting Things Done time management method.
|
||||
|
||||
The code presented in this repository is the joint work of [Boris Fresow](mailto://bfresow@gmail.com) and [Markus Günther](mailto://markus.guenther@gmail.com) as part of an article series on **Building Event-based applications with Spring Kafka** for the German [JavaMagazin](https://jaxenter.de/magazine/java-magazin).
|
||||
|
||||
@@ -12,8 +14,8 @@ Running the showcase requires a working installation of Apache ZooKeeper and Apa
|
||||
|
||||
| Application | Version | Docker Image |
|
||||
| ------------------- | --------- | ----------------------- |
|
||||
| Apache Kafka | 0.11.0.0 | kafka-sampler/kafka |
|
||||
| Apache ZooKeeper | 3.4.8-1 | kafka-sampler/zookeeper |
|
||||
| Apache Kafka | 2.6.0 | wurstmeister/kafka:2.13-2.6.0 |
|
||||
| Apache ZooKeeper | 3.4.13 | wurstmeister/zookeeper |
|
||||
|
||||
### Building and Running the Containers
|
||||
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
version: '2'
|
||||
version: '2.2'
|
||||
services:
|
||||
zookeeper:
|
||||
image: getting-things-done/zookeeper
|
||||
image: wurstmeister/zookeeper
|
||||
container_name: zookeeper
|
||||
ports:
|
||||
- "2181:2181"
|
||||
- 2181:2181
|
||||
kafka:
|
||||
image: getting-things-done/kafka
|
||||
links:
|
||||
- zookeeper
|
||||
image: wurstmeister/kafka:2.13-2.6.0
|
||||
ports:
|
||||
- 9092:9092
|
||||
environment:
|
||||
KAFKA_ADVERTISED_PORT: 9092
|
||||
KAFKA_ADVERTISED_HOST_NAME: kafka
|
||||
@@ -15,6 +16,9 @@ services:
|
||||
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
|
||||
KAFKA_NUM_PARTITIONS: 5
|
||||
KAFKA_CREATE_TOPICS: "topic-getting-things-done:5:1"
|
||||
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
||||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
discovery:
|
||||
|
||||
@@ -1,32 +0,0 @@
|
||||
FROM phusion/baseimage:latest
|
||||
|
||||
MAINTAINER Markus Günther <markus.guenther@gmail.com>
|
||||
|
||||
ENV DEBIAN_FRONTEND noninteractive
|
||||
ENV SCALA_VERSION 2.11
|
||||
ENV KAFKA_VERSION 0.11.0.0
|
||||
ENV KAFKA_HOME /opt/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION"
|
||||
|
||||
# Install Oracle Java 8, some utilities and Kafka
|
||||
RUN \
|
||||
echo oracle-java8-installer shared/accepted-oracle-license-v1-1 select true | debconf-set-selections && \
|
||||
add-apt-repository -y ppa:webupd8team/java && \
|
||||
apt-get update && \
|
||||
apt-get install -y oracle-java8-installer && \
|
||||
apt-get install -y wget supervisor dnsutils curl jq coreutils docker net-tools && \
|
||||
rm -rf /var/lib/apt/lists/* && \
|
||||
rm -rf /var/cache/oracle-jdk8-installer && \
|
||||
apt-get clean && \
|
||||
wget -q http://apache.mirrors.spacedump.net/kafka/"$KAFKA_VERSION"/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && \
|
||||
tar xfz /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -C /opt && \
|
||||
rm /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz
|
||||
|
||||
VOLUME ["/kafka"]
|
||||
|
||||
ADD kafka-start /usr/bin/kafka-start
|
||||
ADD kafka-create-topics /usr/bin/kafka-create-topics
|
||||
ADD supervisord.conf /etc/supervisor/conf.d/
|
||||
|
||||
EXPOSE 9092
|
||||
|
||||
CMD ["/sbin/my_init", "kafka-start"]
|
||||
@@ -1,36 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
|
||||
if [[ -z "$START_TIMEOUT" ]]; then
|
||||
START_TIMEOUT=600
|
||||
fi
|
||||
|
||||
start_timeout_exceeded=false
|
||||
count=0
|
||||
step=10
|
||||
while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
|
||||
echo "waiting for kafka to be ready"
|
||||
sleep $step;
|
||||
count=$(expr $count + $step)
|
||||
if [ $count -gt $START_TIMEOUT ]; then
|
||||
start_timeout_exceeded=true
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if $start_timeout_exceeded; then
|
||||
echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ -n $KAFKA_CREATE_TOPICS ]]; then
|
||||
IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
|
||||
echo "creating topics: $topicToCreate"
|
||||
IFS=':' read -a topicConfig <<< "$topicToCreate"
|
||||
if [ ${topicConfig[3]} ]; then
|
||||
JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --config cleanup.policy="${topicConfig[3]}" --if-not-exists
|
||||
else
|
||||
JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --if-not-exists
|
||||
fi
|
||||
done
|
||||
fi
|
||||
@@ -1,138 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
if [[ -z "$KAFKA_PORT" ]]; then
|
||||
export KAFKA_PORT=9092
|
||||
fi
|
||||
|
||||
create-topics.sh &
|
||||
|
||||
if [[ -z "$KAFKA_ADVERTISED_PORT" && \
|
||||
-z "$KAFKA_LISTENERS" && \
|
||||
-z "$KAFKA_ADVERTISED_LISTENERS" && \
|
||||
-S /var/run/docker.sock ]]; then
|
||||
export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
|
||||
fi
|
||||
if [[ -z "$KAFKA_BROKER_ID" ]]; then
|
||||
if [[ -n "$BROKER_ID_COMMAND" ]]; then
|
||||
export KAFKA_BROKER_ID=$(eval $BROKER_ID_COMMAND)
|
||||
else
|
||||
# By default auto allocate broker ID
|
||||
export KAFKA_BROKER_ID=-1
|
||||
fi
|
||||
fi
|
||||
if [[ -z "$KAFKA_LOG_DIRS" ]]; then
|
||||
export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
|
||||
fi
|
||||
if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
|
||||
export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
|
||||
fi
|
||||
|
||||
if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
|
||||
sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
|
||||
unset KAFKA_HEAP_OPTS
|
||||
fi
|
||||
|
||||
if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then
|
||||
export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND)
|
||||
fi
|
||||
|
||||
if [[ -n "$KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" ]]; then
|
||||
if [[ -n "$KAFKA_ADVERTISED_PORT" && -n "$KAFKA_ADVERTISED_PROTOCOL_NAME" ]]; then
|
||||
export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_PROTOCOL_NAME}://${KAFKA_ADVERTISED_HOST_NAME-}:${KAFKA_ADVERTISED_PORT}"
|
||||
export KAFKA_LISTENERS="$KAFKA_ADVERTISED_PROTOCOL_NAME://:$KAFKA_ADVERTISED_PORT"
|
||||
fi
|
||||
|
||||
if [[ -z "$KAFKA_PROTOCOL_NAME" ]]; then
|
||||
export KAFKA_PROTOCOL_NAME="${KAFKA_ADVERTISED_PROTOCOL_NAME}"
|
||||
fi
|
||||
|
||||
if [[ -n "$KAFKA_PORT" && -n "$KAFKA_PROTOCOL_NAME" ]]; then
|
||||
export ADD_LISTENER="${KAFKA_PROTOCOL_NAME}://${KAFKA_HOST_NAME-}:${KAFKA_PORT}"
|
||||
fi
|
||||
|
||||
if [[ -z "$KAFKA_INTER_BROKER_LISTENER_NAME" ]]; then
|
||||
export KAFKA_INTER_BROKER_LISTENER_NAME=$KAFKA_PROTOCOL_NAME
|
||||
fi
|
||||
else
|
||||
#DEFAULT LISTENERS
|
||||
export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME-}:${KAFKA_ADVERTISED_PORT-$KAFKA_PORT}"
|
||||
export KAFKA_LISTENERS="PLAINTEXT://${KAFKA_HOST_NAME-}:${KAFKA_PORT-9092}"
|
||||
fi
|
||||
|
||||
if [[ -n "$ADD_LISTENER" && -n "$KAFKA_LISTENERS" ]]; then
|
||||
export KAFKA_LISTENERS="${KAFKA_LISTENERS},${ADD_LISTENER}"
|
||||
fi
|
||||
|
||||
if [[ -n "$ADD_LISTENER" && -z "$KAFKA_LISTENERS" ]]; then
|
||||
export KAFKA_LISTENERS="${ADD_LISTENER}"
|
||||
fi
|
||||
|
||||
if [[ -n "$ADD_LISTENER" && -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
|
||||
export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_LISTENERS},${ADD_LISTENER}"
|
||||
fi
|
||||
|
||||
if [[ -n "$ADD_LISTENER" && -z "$KAFKA_ADVERTISED_LISTENERS" ]]; then
|
||||
export KAFKA_ADVERTISED_LISTENERS="${ADD_LISTENER}"
|
||||
fi
|
||||
|
||||
if [[ -n "$KAFKA_INTER_BROKER_LISTENER_NAME" && ! "$KAFKA_INTER_BROKER_LISTENER_NAME"X = "$KAFKA_PROTOCOL_NAME"X ]]; then
|
||||
if [[ -n "$KAFKA_INTER_BROKER_PORT" ]]; then
|
||||
export KAFKA_INTER_BROKER_PORT=$(( $KAFKA_PORT + 1 ))
|
||||
fi
|
||||
export INTER_BROKER_LISTENER="${KAFKA_INTER_BROKER_LISTENER_NAME}://:${KAFKA_INTER_BROKER_PORT}"
|
||||
export KAFKA_LISTENERS="${KAFKA_LISTENERS},${INTER_BROKER_LISTENER}"
|
||||
export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_LISTENERS},${INTER_BROKER_LISTENER}"
|
||||
unset KAFKA_INTER_BROKER_PORT
|
||||
unset KAFKA_SECURITY_INTER_BROKER_PROTOCOL
|
||||
unset INTER_BROKER_LISTENER
|
||||
fi
|
||||
|
||||
if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then
|
||||
export KAFKA_BROKER_RACK=$(eval $RACK_COMMAND)
|
||||
fi
|
||||
|
||||
#Issue newline to config file in case there is not one already
|
||||
echo -e "\n" >> $KAFKA_HOME/config/server.properties
|
||||
|
||||
unset KAFKA_CREATE_TOPICS
|
||||
unset KAFKA_ADVERTISED_PROTOCOL_NAME
|
||||
unset KAFKA_PROTOCOL_NAME
|
||||
|
||||
if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
|
||||
unset KAFKA_ADVERTISED_PORT
|
||||
unset KAFKA_ADVERTISED_HOST_NAME
|
||||
fi
|
||||
|
||||
if [[ -n "$KAFKA_LISTENERS" ]]; then
|
||||
unset KAFKA_PORT
|
||||
unset KAFKA_HOST_NAME
|
||||
fi
|
||||
|
||||
for VAR in `env`
|
||||
do
|
||||
if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
|
||||
kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
|
||||
env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
|
||||
if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
|
||||
sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
|
||||
else
|
||||
echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ $VAR =~ ^LOG4J_ ]]; then
|
||||
log4j_name=`echo "$VAR" | sed -r "s/(LOG4J_.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
|
||||
log4j_env=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
|
||||
if egrep -q "(^|^#)$log4j_name=" $KAFKA_HOME/config/log4j.properties; then
|
||||
sed -r -i "s@(^|^#)($log4j_name)=(.*)@\2=${!log4j_env}@g" $KAFKA_HOME/config/log4j.properties #note that no config values may contain an '@' char
|
||||
else
|
||||
echo "$log4j_name=${!log4j_env}" >> $KAFKA_HOME/config/log4j.properties
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
|
||||
eval $CUSTOM_INIT_SCRIPT
|
||||
fi
|
||||
|
||||
exec $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
|
||||
@@ -1,7 +0,0 @@
|
||||
[supervisord]
|
||||
nodaemon=true
|
||||
|
||||
[program:kafka]
|
||||
command=/usr/bin/kafka-start
|
||||
autostart=true
|
||||
autorestart=true
|
||||
@@ -1,20 +0,0 @@
|
||||
FROM phusion/baseimage:latest
|
||||
|
||||
MAINTAINER Markus Guenther <markus.guenther@gmail.com>
|
||||
|
||||
ENV ZK_VERSION 3.4.8-1
|
||||
ENV ZK_HOME /usr/share/zookeeper
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y zookeeper=$ZK_VERSION supervisor dnsutils && \
|
||||
rm -rf /var/lib/apt/lists/* && \
|
||||
apt-get clean && \
|
||||
apt-cache policy zookeeper
|
||||
|
||||
VOLUME ["/zookeeper"]
|
||||
|
||||
ADD supervisord.conf /etc/supervisor/conf.d/
|
||||
|
||||
EXPOSE 2181 2888 3888
|
||||
|
||||
CMD ["/sbin/my_init", "supervisord"]
|
||||
@@ -1,7 +0,0 @@
|
||||
[supervisord]
|
||||
nodaemon=true
|
||||
|
||||
[program:zookeeper]
|
||||
command=/usr/share/zookeeper/bin/zkServer.sh start-foreground
|
||||
autostart=true
|
||||
autorestart=true
|
||||
@@ -24,12 +24,20 @@
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-zuul</artifactId>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-eureka</artifactId>
|
||||
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-gateway</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
||||
@@ -1,26 +1,20 @@
|
||||
package net.mguenther.gtd;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
|
||||
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
|
||||
import org.springframework.cloud.netflix.zuul.EnableZuulProxy;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
/**
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableZuulProxy
|
||||
@EnableDiscoveryClient
|
||||
@EnableEurekaClient
|
||||
public class GtdApiGatewayApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
new SpringApplicationBuilder(GtdApiGatewayApplication.class).web(true).run(args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public OnMethodFilter onMethodZuulFilter() {
|
||||
return new OnMethodFilter();
|
||||
SpringApplication.run(GtdApiGatewayApplication.class, args);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,61 +0,0 @@
|
||||
package net.mguenther.gtd;
|
||||
|
||||
import com.netflix.zuul.ZuulFilter;
|
||||
import com.netflix.zuul.context.RequestContext;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.netflix.zuul.filters.support.FilterConstants;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
public class OnMethodFilter extends ZuulFilter {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(OnMethodFilter.class);
|
||||
private static final List<String> methodsForCommands = Arrays.asList("POST", "PUT", "PATCH", "DELETE");
|
||||
|
||||
@Override
|
||||
public String filterType() {
|
||||
return FilterConstants.PRE_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int filterOrder() {
|
||||
return FilterConstants.PRE_DECORATION_FILTER_ORDER - 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldFilter() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object run() {
|
||||
final RequestContext ctx = RequestContext.getCurrentContext();
|
||||
final String method = ctx.getRequest().getMethod();
|
||||
if (isCommand(ctx)) {
|
||||
log.info("Resolved incoming request using method {} to service ID 'gtd-es-command-side'.", method);
|
||||
ctx.set("serviceId", "gtd-es-command-side");
|
||||
ctx.setRouteHost(null);
|
||||
ctx.addOriginResponseHeader("X-Zuul-ServiceId", UUID.randomUUID().toString());
|
||||
} else {
|
||||
log.info("Resolved incoming request using method {} to service ID 'gtd-es-query-side'.", method);
|
||||
ctx.set("serviceId", "gtd-es-query-side");
|
||||
ctx.setRouteHost(null);
|
||||
ctx.addOriginResponseHeader("X-Zuul-ServiceId", UUID.randomUUID().toString());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean isCommand(final RequestContext ctx) {
|
||||
return
|
||||
StringUtils.isNotEmpty(ctx.getRequest().getMethod()) &&
|
||||
methodsForCommands.contains(ctx.getRequest().getMethod().toUpperCase());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package net.mguenther.gtd;
|
||||
|
||||
import org.springframework.cloud.gateway.route.RouteLocator;
|
||||
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.HttpMethod;
|
||||
|
||||
@Configuration
|
||||
public class SpringCloudConfig {
|
||||
|
||||
@Bean
|
||||
public RouteLocator gatewayRoutes(RouteLocatorBuilder builder) {
|
||||
return builder.routes()
|
||||
.route(r -> r.method(HttpMethod.DELETE, HttpMethod.PUT, HttpMethod.POST, HttpMethod.PATCH)
|
||||
.uri("lb://gtd-es-command-side"))
|
||||
.route(r -> r.method(HttpMethod.GET)
|
||||
.uri("lb://gtd-es-query-side"))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
spring.application.name=gtd-es-api-gateway
|
||||
|
||||
info.component=Edge Server
|
||||
|
||||
endpoints.restart.enabled=true
|
||||
endpoints.shutdown.enabled=true
|
||||
endpoints.health.sensitive=false
|
||||
|
||||
ribbon.eureka.enabled=true
|
||||
|
||||
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
|
||||
eureka.client.registerWithEureka=false
|
||||
|
||||
server.port=8765
|
||||
|
||||
zuul.prefix=/api
|
||||
zuul.routes.gtd.path: /**
|
||||
|
||||
management.security.enabled=false
|
||||
@@ -5,9 +5,6 @@ management:
|
||||
security:
|
||||
enabled: false
|
||||
|
||||
info:
|
||||
component: Edge Server
|
||||
|
||||
endpoints:
|
||||
restart:
|
||||
enabled: true
|
||||
@@ -16,9 +13,6 @@ endpoints:
|
||||
health:
|
||||
sensitive: false
|
||||
|
||||
ribbon:
|
||||
eurekaEnabled: true
|
||||
|
||||
eureka:
|
||||
client:
|
||||
enabled: true
|
||||
@@ -26,12 +20,23 @@ eureka:
|
||||
enabled: true
|
||||
serviceUrl:
|
||||
defaultZone: ${EUREKA_CLIENT_SERVICEURL_DEFAULTZONE:http://localhost:8761/eureka/}
|
||||
register-with-eureka: true
|
||||
fetch-registry: true
|
||||
instance:
|
||||
appname: gtd-es-api-gateway
|
||||
preferIpAddress: true
|
||||
|
||||
zuul:
|
||||
prefix: /api
|
||||
routes:
|
||||
gtd:
|
||||
path: /**
|
||||
spring:
|
||||
application:
|
||||
name: gtd-es-api-gateway
|
||||
cloud:
|
||||
discovery:
|
||||
enabled: true
|
||||
client:
|
||||
health-indicator:
|
||||
enabled: true
|
||||
gateway:
|
||||
discovery:
|
||||
locator:
|
||||
lower-case-service-id: true
|
||||
enabled: true
|
||||
|
||||
@@ -14,22 +14,25 @@ import java.time.Instant;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* Converts bidirectionally between domain events and their respective Avro representation.
|
||||
* This is a bit of a mess, but we have to cope with it due to the lack of polymorphy and
|
||||
* inheritance in Avro.
|
||||
*
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
@Component
|
||||
public class ItemEventConverter {
|
||||
|
||||
private AvroItemEvent wrap(final ItemEvent event, final Object eventPayload) {
|
||||
|
||||
return AvroItemEvent
|
||||
.newBuilder()
|
||||
.setEventId(event.getEventId())
|
||||
.setTimestamp(event.getTimestamp())
|
||||
.setData(eventPayload)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Consumes a domain event of type {@code ItemEvent} and returns its corresponding
|
||||
* Avro type (cf. {@code AvroItemEvent}).
|
||||
*
|
||||
* @param event
|
||||
* the domain event that ought to be converted
|
||||
* @return
|
||||
* instance of {@code AvroItemEvent} that mirrors the domain event
|
||||
*/
|
||||
public AvroItemEvent from(final ItemEvent event) {
|
||||
|
||||
if (event instanceof ItemCreated) return from((ItemCreated) event);
|
||||
@@ -111,6 +114,25 @@ public class ItemEventConverter {
|
||||
return wrap(event, avroEvent);
|
||||
}
|
||||
|
||||
private AvroItemEvent wrap(final ItemEvent event, final Object eventPayload) {
|
||||
|
||||
return AvroItemEvent
|
||||
.newBuilder()
|
||||
.setEventId(event.getEventId())
|
||||
.setTimestamp(event.getTimestamp())
|
||||
.setData(eventPayload)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Consumes an Avro event of type {@code AvroItemEvent} and returns its corresponding
|
||||
* domain event (cf. {@code ItemEvent}).
|
||||
*
|
||||
* @param event
|
||||
* the Avro event that ought to be converted
|
||||
* @return
|
||||
* instance of {@code ItemEvent} that mirrros the Avro event
|
||||
*/
|
||||
public ItemEvent to(final AvroItemEvent event) {
|
||||
|
||||
final String eventId = String.valueOf(event.getEventId());
|
||||
|
||||
@@ -11,6 +11,9 @@ import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Simple {@code Deserializer} that operates on {@code AvroItemEvent}. This {@code Deserializer} does
|
||||
* not support multiple versions of the same schema.
|
||||
*
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
|
||||
@@ -12,6 +12,9 @@ import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Simple {@code Serializer} that operates on {@code AvroItemEvent}. This {@code Serializer} does
|
||||
* not support multiple versions of the same schema.
|
||||
*
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
|
||||
@@ -78,7 +78,7 @@
|
||||
<!-- Spring Cloud -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-eureka</artifactId>
|
||||
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
||||
@@ -14,6 +14,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
@@ -37,9 +38,11 @@ public class ItemsCommandResource {
|
||||
|
||||
log.info("Received a create item request with data {}.", createItem);
|
||||
|
||||
ItemCommand createItemCommand = commandsFor(createItem);
|
||||
|
||||
return commandHandler
|
||||
.onCommand(commandsFor(createItem))
|
||||
.thenApply(dontCare -> ResponseEntity.accepted().build())
|
||||
.onCommand(createItemCommand)
|
||||
.thenApply(dontCare -> ResponseEntity.created(itemUri(createItemCommand.getItemId())).build())
|
||||
.exceptionally(e -> {
|
||||
log.warn("Caught an exception at the service boundary.", e);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
|
||||
@@ -49,4 +52,8 @@ public class ItemsCommandResource {
|
||||
private ItemCommand commandsFor(final CreateItemRequest createItem) {
|
||||
return new CreateItem(createItem.getDescription());
|
||||
}
|
||||
|
||||
private URI itemUri(final String itemId) {
|
||||
return URI.create("/items/" + itemId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,10 @@ import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Accepts a command (or a list of commands) and attempts to validate them against the current
|
||||
* state of the referenced aggregate. If the validation holds, the {@code CommandHandler} emits
|
||||
* corresponding events to the event log.
|
||||
*
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
|
||||
@@ -3,6 +3,8 @@ package net.mguenther.gtd.domain;
|
||||
import net.mguenther.gtd.domain.event.ItemEvent;
|
||||
|
||||
/**
|
||||
* Publishes a given {@code ItemEvent} to an event log.
|
||||
*
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
|
||||
@@ -12,6 +12,10 @@ import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Consumes {@code AvroItemEvent}s that update the internal state of domain for the command-side. This is
|
||||
* essential so that the validation that a {@code CommandHandler} performs always goes against the most
|
||||
* recent state.
|
||||
*
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
@@ -31,7 +35,7 @@ public class ItemEventConsumer {
|
||||
this.eventHandler = eventHandler;
|
||||
}
|
||||
|
||||
@KafkaListener(topics = "${gtd.topic}", group = "getting-things-done")
|
||||
@KafkaListener(topics = "${gtd.topic}", groupId = "${gtd.groupId}")
|
||||
public void consume(final AvroItemEvent itemEvent, final Acknowledgment ack) {
|
||||
final ItemEvent event = converter.to(itemEvent);
|
||||
log.debug("Received event {}. Trying to apply it to the latest state of aggregate with ID {}.", event, event.getItemId());
|
||||
|
||||
@@ -13,6 +13,8 @@ import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Publisher for {@code ItemEvent}s that exhibits transactional guarantees.
|
||||
*
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
|
||||
@@ -24,8 +24,7 @@ public class TransactionalItemEventPublisherConfig {
|
||||
@Bean
|
||||
public ProducerFactory<String, AvroItemEvent> producerFactory() {
|
||||
final Map<String, Object> config = new HashMap<>();
|
||||
// TODO (mgu): Extract to config value
|
||||
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
|
||||
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ItemEventSerializer.class);
|
||||
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
|
||||
|
||||
@@ -1,25 +1,32 @@
|
||||
server:
|
||||
contextPath: /api
|
||||
port: 8089
|
||||
servlet:
|
||||
context-path: /api
|
||||
|
||||
management:
|
||||
security:
|
||||
enabled: false
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: gtd-es-command-side
|
||||
kafka:
|
||||
bootstrapServers: localhost:9092
|
||||
consumer:
|
||||
groupId: getting-things-done-command
|
||||
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
valueDeserializer: net.mguenther.gtd.kafka.serialization.ItemEventDeserializer
|
||||
autoOffsetReset: earliest
|
||||
enableAutoCommit: false
|
||||
isolationLevel: read_committed
|
||||
listener:
|
||||
ackMode: MANUAL
|
||||
producer:
|
||||
bootstrapServers: localhost:9092
|
||||
transaction-id-prefix: gtd-es
|
||||
|
||||
gtd:
|
||||
topic: topic-getting-things-done
|
||||
groupId: getting-thigns-done-command
|
||||
|
||||
eureka:
|
||||
client:
|
||||
@@ -28,6 +35,7 @@ eureka:
|
||||
enabled: true
|
||||
serviceUrl:
|
||||
defaultZone: ${EUREKA_CLIENT_SERVICEURL_DEFAULTZONE:http://localhost:8761/eureka/}
|
||||
register-with-eureka: true
|
||||
instance:
|
||||
appname: gtd-es-command-side
|
||||
preferIpAddress: true
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
spring:
|
||||
application:
|
||||
name: gtd-es-command-side
|
||||
cloud:
|
||||
discovery:
|
||||
enabled: true
|
||||
client:
|
||||
health-indicator:
|
||||
enabled: true
|
||||
|
||||
@@ -5,6 +5,8 @@ import net.mguenther.gtd.domain.event.ItemEvent;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Consumes {@code ItemEvent}s and acts upon them.
|
||||
*
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
* @author Boris Fresow (bfresow@gmail.com)
|
||||
*/
|
||||
|
||||
@@ -91,6 +91,13 @@ public class Item implements Serializable {
|
||||
return associatedList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutates the state of this {@code Item} in compliance with the given {@code ItemEvent}.
|
||||
*
|
||||
* @param event
|
||||
* an event that occured in the system and that signals a change of state
|
||||
* for the aggregate {@code Item}
|
||||
*/
|
||||
public void project(final ItemEvent event) {
|
||||
if (event instanceof DueDateAssigned) project((DueDateAssigned) event);
|
||||
else if (event instanceof RequiredTimeAssigned) project((RequiredTimeAssigned) event);
|
||||
@@ -101,15 +108,15 @@ public class Item implements Serializable {
|
||||
else throw new IllegalStateException("Unrecognized event: " + event.toString());
|
||||
}
|
||||
|
||||
public void project(final DueDateAssigned event) {
|
||||
private void project(final DueDateAssigned event) {
|
||||
this.dueDate = event.getDueDate();
|
||||
}
|
||||
|
||||
public void project(final RequiredTimeAssigned event) {
|
||||
private void project(final RequiredTimeAssigned event) {
|
||||
this.requiredTime = event.getRequiredTime();
|
||||
}
|
||||
|
||||
public void project(final TagAssigned event) {
|
||||
private void project(final TagAssigned event) {
|
||||
synchronized (this) {
|
||||
if (!tags.contains(event.getTag())) {
|
||||
tags.add(event.getTag());
|
||||
@@ -117,17 +124,17 @@ public class Item implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
public void project(final TagRemoved event) {
|
||||
private void project(final TagRemoved event) {
|
||||
synchronized (this) {
|
||||
tags.remove(event.getTag());
|
||||
}
|
||||
}
|
||||
|
||||
public void project(final ItemConcluded event) {
|
||||
private void project(final ItemConcluded event) {
|
||||
this.done = true;
|
||||
}
|
||||
|
||||
public void project(final ItemMovedToList event) {
|
||||
private void project(final ItemMovedToList event) {
|
||||
this.associatedList = event.getList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-eureka-server</artifactId>
|
||||
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
||||
88
gtd-e2e-tests/pom.xml
Normal file
88
gtd-e2e-tests/pom.xml
Normal file
@@ -0,0 +1,88 @@
|
||||
<?xml version="1.0"?>
|
||||
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
|
||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>net.mguenther.gtd</groupId>
|
||||
<artifactId>gtd-parent</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>gtd-e2e-tests</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>gtd-e2e-tests</name>
|
||||
|
||||
<dependencies>
|
||||
<!-- Intra-project dependencies -->
|
||||
<dependency>
|
||||
<groupId>net.mguenther.gtd</groupId>
|
||||
<artifactId>gtd-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.mguenther.gtd</groupId>
|
||||
<artifactId>gtd-codec</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<!-- HTTP client -->
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-httpclient</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-slf4j</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-jackson</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-jaxrs</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>${httpclient.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<!-- Testing -->
|
||||
<dependency>
|
||||
<groupId>net.mguenther.kafka</groupId>
|
||||
<artifactId>kafka-junit</artifactId>
|
||||
<version>${kafka.junit.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>${junit.jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>${junit.jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-core</artifactId>
|
||||
<version>${hamcrest.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,24 @@
|
||||
package net.mguenther.gtd.client;
|
||||
|
||||
/**
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
*/
|
||||
public class CreateItem {
|
||||
|
||||
private final String description;
|
||||
|
||||
public CreateItem(final String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CreateItemRequest{" +
|
||||
"description='" + description + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package net.mguenther.gtd.client;
|
||||
|
||||
import feign.Headers;
|
||||
import feign.Param;
|
||||
import feign.RequestLine;
|
||||
import feign.Response;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface GettingThingsDone {
|
||||
|
||||
@RequestLine("POST /items")
|
||||
@Headers("Content-Type: application/json")
|
||||
Response createItem(CreateItem payload);
|
||||
|
||||
@RequestLine("PUT /items/{itemId}")
|
||||
@Headers("Content-Type: application/json")
|
||||
Response updateItem(@Param("itemId") String itemId, UpdateItem payload);
|
||||
|
||||
@RequestLine("GET /items/{itemId}")
|
||||
@Headers("Accept: application/json")
|
||||
Item getItem(@Param("itemId") String itemId);
|
||||
|
||||
@RequestLine("GET /items")
|
||||
@Headers("Accept: application/json")
|
||||
List<Item> getItems();
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package net.mguenther.gtd.client;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
*/
|
||||
public class Item {
|
||||
|
||||
private String id;
|
||||
|
||||
private String description;
|
||||
|
||||
private int requiredTime;
|
||||
|
||||
private Date dueDate;
|
||||
|
||||
private List<String> tags;
|
||||
|
||||
private String associatedList;
|
||||
|
||||
private boolean done;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public boolean isDone() {
|
||||
return done;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public int getRequiredTime() {
|
||||
return requiredTime;
|
||||
}
|
||||
|
||||
public Date getDueDate() {
|
||||
return dueDate;
|
||||
}
|
||||
|
||||
public List<String> getTags() {
|
||||
return Collections.unmodifiableList(tags);
|
||||
}
|
||||
|
||||
public String getAssociatedList() {
|
||||
return associatedList;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
package net.mguenther.gtd.client;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author Markus Günther (markus.guenther@gmail.com)
|
||||
*/
|
||||
public class UpdateItem {
|
||||
|
||||
private Long dueDate;
|
||||
|
||||
private Integer requiredTime;
|
||||
|
||||
private List<String> tags;
|
||||
|
||||
private String associatedList;
|
||||
|
||||
public Long getDueDate() {
|
||||
return dueDate;
|
||||
}
|
||||
|
||||
public void setDueDate(long dueDate) {
|
||||
this.dueDate = dueDate;
|
||||
}
|
||||
|
||||
public Integer getRequiredTime() {
|
||||
return requiredTime;
|
||||
}
|
||||
|
||||
public void setRequiredTime(int requiredTime) {
|
||||
this.requiredTime = requiredTime;
|
||||
}
|
||||
|
||||
public List<String> getTags() {
|
||||
return tags;
|
||||
}
|
||||
|
||||
public void setTags(List<String> tags) {
|
||||
this.tags = tags;
|
||||
}
|
||||
|
||||
public String getAssociatedList() {
|
||||
return associatedList;
|
||||
}
|
||||
|
||||
public void setAssociatedList(String associatedList) {
|
||||
this.associatedList = associatedList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "UpdateItemRequest{" +
|
||||
"dueDate=" + dueDate +
|
||||
", requiredTime=" + requiredTime +
|
||||
", tags=" + tags +
|
||||
", associatedList='" + associatedList + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
package net.mguenther.gtd;
|
||||
|
||||
import feign.Feign;
|
||||
import feign.Logger;
|
||||
import feign.Response;
|
||||
import feign.httpclient.ApacheHttpClient;
|
||||
import feign.jackson.JacksonDecoder;
|
||||
import feign.jackson.JacksonEncoder;
|
||||
import feign.slf4j.Slf4jLogger;
|
||||
import net.mguenther.gtd.client.CreateItem;
|
||||
import net.mguenther.gtd.client.GettingThingsDone;
|
||||
import net.mguenther.gtd.domain.event.ItemCreated;
|
||||
import net.mguenther.gtd.kafka.serialization.AvroItemEvent;
|
||||
import net.mguenther.gtd.kafka.serialization.ItemEventConverter;
|
||||
import net.mguenther.gtd.kafka.serialization.ItemEventDeserializer;
|
||||
import net.mguenther.kafka.junit.ExternalKafkaCluster;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static net.mguenther.kafka.junit.ObserveKeyValues.on;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@Disabled
|
||||
public class EventPublicationTest {
|
||||
|
||||
private static final String URL = "http://localhost:8765/api";
|
||||
|
||||
private final ItemEventConverter converter = new ItemEventConverter();
|
||||
|
||||
@Test
|
||||
public void anItemCreatedEventShouldBePublishedAfterCreatingNewItem() throws Exception {
|
||||
|
||||
ExternalKafkaCluster kafka = ExternalKafkaCluster.at("http://localhost:9092");
|
||||
GettingThingsDone gtd = createGetthingThingsDoneClient();
|
||||
String itemId = extractItemId(gtd.createItem(new CreateItem("I gotta do my homework!")));
|
||||
|
||||
List<AvroItemEvent> publishedEvents = kafka
|
||||
.observeValues(on("topic-getting-things-done", 1, AvroItemEvent.class)
|
||||
.observeFor(10, TimeUnit.SECONDS)
|
||||
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ItemEventDeserializer.class)
|
||||
.filterOnKeys(aggregateId -> aggregateId.equals(itemId)));
|
||||
|
||||
ItemCreated itemCreatedEvent = publishedEvents.stream()
|
||||
.findFirst()
|
||||
.map(converter::to)
|
||||
.map(e -> (ItemCreated) e)
|
||||
.orElseThrow(AssertionError::new);
|
||||
|
||||
assertThat(itemCreatedEvent.getItemId(), equalTo(itemId));
|
||||
assertThat(itemCreatedEvent.getDescription(), equalTo("I gotta do my homework!"));
|
||||
}
|
||||
|
||||
private GettingThingsDone createGetthingThingsDoneClient() {
|
||||
return Feign.builder()
|
||||
.client(new ApacheHttpClient())
|
||||
.encoder(new JacksonEncoder())
|
||||
.decoder(new JacksonDecoder())
|
||||
.logger(new Slf4jLogger(GettingThingsDone.class))
|
||||
.logLevel(Logger.Level.FULL)
|
||||
.target(GettingThingsDone.class, URL);
|
||||
}
|
||||
|
||||
private String extractItemId(final Response response) {
|
||||
return response.headers()
|
||||
.get("Location")
|
||||
.stream()
|
||||
.findFirst()
|
||||
.map(s -> s.replace("/items/", ""))
|
||||
.orElseThrow(AssertionError::new);
|
||||
}
|
||||
}
|
||||
@@ -82,7 +82,7 @@
|
||||
<!-- Spring Cloud -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-eureka</artifactId>
|
||||
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
||||
@@ -8,6 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
@@ -40,4 +41,6 @@ public class ItemsQueryResource {
|
||||
.thenApply(ResponseEntity::ok)
|
||||
.exceptionally(e -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -27,8 +27,7 @@ public class DefaultItemView implements ItemView {
|
||||
public CompletableFuture<List<Item>> getItems() {
|
||||
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
final List<Item> items = new ArrayList<>();
|
||||
items.addAll(repository.findAll());
|
||||
final List<Item> items = new ArrayList<>(repository.findAll());
|
||||
return Collections.unmodifiableList(items);
|
||||
});
|
||||
}
|
||||
@@ -36,6 +35,6 @@ public class DefaultItemView implements ItemView {
|
||||
@Override
|
||||
public CompletableFuture<Optional<Item>> getItem(final String itemId) {
|
||||
|
||||
return CompletableFuture.supplyAsync(() -> Optional.ofNullable(repository.findOne(itemId)));
|
||||
return CompletableFuture.supplyAsync(() -> repository.findById(itemId));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,9 +44,10 @@ public class ItemUpdater implements EventHandler {
|
||||
}
|
||||
|
||||
private void modifyExistingItem(final ItemEvent event) {
|
||||
final Item item = repository.findOne(event.getItemId());
|
||||
item.project(event);
|
||||
final Item updatedItem = repository.save(item);
|
||||
log.info("Applied event {} to the aggregate with ID {} and current state {}.", event, event.getItemId(), updatedItem);
|
||||
repository.findById(event.getItemId()).ifPresent(item -> {
|
||||
item.project(event);
|
||||
final Item updatedItem = repository.save(item);
|
||||
log.info("Applied event {} to the aggregate with ID {} and current state {}.", event, event.getItemId(), updatedItem);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ public class ItemEventConsumer {
|
||||
this.eventHandler = eventHandler;
|
||||
}
|
||||
|
||||
@KafkaListener(topics = "${gtd.topic}", group = "getting-things-done")
|
||||
@KafkaListener(topics = "${gtd.topic}", groupId = "${gtd.groupId}")
|
||||
public void consume(final AvroItemEvent itemEvent, final Acknowledgment ack) {
|
||||
final ItemEvent event = converter.to(itemEvent);
|
||||
log.debug("Received event {}. Trying to apply it to the latest state of aggregate with ID {}.", event, event.getItemId());
|
||||
|
||||
@@ -1,21 +1,25 @@
|
||||
server:
|
||||
contextPath: /api
|
||||
port: 8090
|
||||
servlet:
|
||||
context-path: /api
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: gtd-es-query-side
|
||||
kafka:
|
||||
bootstrapServers: localhost:9092
|
||||
consumer:
|
||||
groupId: getting-things-done-query
|
||||
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
valueDeserializer: net.mguenther.gtd.kafka.serialization.ItemEventDeserializer
|
||||
autoOffsetReset: latest
|
||||
enableAutoCommit: false
|
||||
isolationLevel: read_committed
|
||||
listener:
|
||||
ackMode: MANUAL
|
||||
|
||||
gtd:
|
||||
topic: topic-getting-things-done
|
||||
groupId: getting-thigns-done-query
|
||||
|
||||
management:
|
||||
security:
|
||||
@@ -28,6 +32,7 @@ eureka:
|
||||
enabled: true
|
||||
serviceUrl:
|
||||
defaultZone: ${EUREKA_CLIENT_SERVICEURL_DEFAULTZONE:http://localhost:8761/eureka/}
|
||||
register-with-eureka: true
|
||||
instance:
|
||||
appname: gtd-es-query-side
|
||||
preferIpAddress: true
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
spring:
|
||||
application:
|
||||
name: gtd-es-query-side
|
||||
cloud:
|
||||
discovery:
|
||||
enabled: true
|
||||
client:
|
||||
health-indicator:
|
||||
enabled: true
|
||||
|
||||
120
pom.xml
120
pom.xml
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>1.5.4.RELEASE</version>
|
||||
<version>2.4.2</version>
|
||||
<relativePath/>
|
||||
</parent>
|
||||
|
||||
@@ -21,29 +21,25 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<!-- Managed dependency versions -->
|
||||
<junit.version>4.12</junit.version>
|
||||
<hamcrest.version>1.3</hamcrest.version>
|
||||
<kafka.junit.version>2.7.0</kafka.junit.version>
|
||||
<junit.jupiter.version>5.7.0</junit.jupiter.version>
|
||||
<junit.vintage.version>5.7.0</junit.vintage.version>
|
||||
<junit.version>4.13.1</junit.version>
|
||||
<hamcrest.version>2.2</hamcrest.version>
|
||||
<slf4j.version>1.7.22</slf4j.version>
|
||||
<spring.kafka.version>1.3.0.BUILD-SNAPSHOT</spring.kafka.version>
|
||||
<spring-cloud.version>Dalston.SR2</spring-cloud.version>
|
||||
<spring.version>5.3.3</spring.version>
|
||||
<spring.kafka.version>2.6.5</spring.kafka.version>
|
||||
<spring-cloud.version>2020.0.0</spring-cloud.version>
|
||||
<avro.version>1.8.1</avro.version>
|
||||
<commons-lang3.version>3.5</commons-lang3.version>
|
||||
<openfeign.version>11.0</openfeign.version>
|
||||
<httpclient.version>4.5.13</httpclient.version>
|
||||
<!-- Plugin versions -->
|
||||
<plugin.avro.version>1.8.1</plugin.avro.version>
|
||||
<plugin.avro.version>1.10.1</plugin.avro.version>
|
||||
<plugin.compiler.version>3.5</plugin.compiler.version>
|
||||
<plugin.surefire.version>2.22.2</plugin.surefire.version>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>spring-snapshots</id>
|
||||
<name>Spring Snapshots</name>
|
||||
<url>https://repo.spring.io/libs-snapshot</url>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<modules>
|
||||
<module>gtd-codec</module>
|
||||
<module>gtd-common</module>
|
||||
@@ -51,6 +47,7 @@
|
||||
<module>gtd-query-side</module>
|
||||
<module>gtd-api-gateway</module>
|
||||
<module>gtd-discovery-service</module>
|
||||
<module>gtd-e2e-tests</module>
|
||||
</modules>
|
||||
|
||||
<build>
|
||||
@@ -70,6 +67,11 @@
|
||||
<artifactId>avro-maven-plugin</artifactId>
|
||||
<version>${plugin.avro.version}</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>${plugin.surefire.version}</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
@@ -90,11 +92,36 @@
|
||||
<!-- Spring Cloud -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-dependencies</artifactId>
|
||||
<version>${spring-cloud.version}</version>
|
||||
<artifactId>spring-cloud-gateway</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-actuator</artifactId>
|
||||
<version>2.4.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
<version>2.4.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-gateway</artifactId>
|
||||
<version>3.0.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
|
||||
<version>3.0.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
|
||||
<version>3.0.1</version>
|
||||
</dependency>
|
||||
<!-- Spring -->
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
@@ -129,11 +156,60 @@
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<version>${slf4j.version}</version>
|
||||
</dependency>
|
||||
<!-- HTTP client -->
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-core</artifactId>
|
||||
<version>${openfeign.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-httpclient</artifactId>
|
||||
<version>${openfeign.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-slf4j</artifactId>
|
||||
<version>${openfeign.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-jackson</artifactId>
|
||||
<version>${openfeign.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
<artifactId>feign-jaxrs</artifactId>
|
||||
<version>${openfeign.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>${httpclient.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<!-- Testing -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>${junit.version}</version>
|
||||
<groupId>net.mguenther.kafka</groupId>
|
||||
<artifactId>kafka-junit</artifactId>
|
||||
<version>${kafka.junit.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>${junit.jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>${junit.jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
||||
Reference in New Issue
Block a user