Updates dependencies to latest versions of Spring Boot, Spring for Apache Kafka, Kafka Clients, ...

This commit is contained in:
Markus Günther
2021-02-05 20:05:10 +01:00
parent 0ab26c6930
commit 6e72c0e649
14 changed files with 36 additions and 274 deletions

View File

@@ -2,7 +2,7 @@
[![Build Status](https://travis-ci.org/mguenther/spring-kafka-event-sourcing-sampler.svg?branch=master)](https://travis-ci.org/mguenther/spring-kafka-event-sourcing-sampler.svg)
This repository contains a sample application that demonstrates how to implement an Event-sourced systems using the CQRS architectural style. The solution uses Apache Kafka, which we easily integrate into a Spring Boot based application using Spring Kafka, Apache Avro for event serialization and deserialization and uses an in-memory H2 database that contributes to the query side of our CQRS-based system. The application itself is minimal and implements a subset of David Allen's Getting Things Done time management method.
This repository contains a sample application that demonstrates how to implement an Event-sourced systems using the CQRS architectural style. The solution uses Apache Kafka, which we easily integrate into a Spring Boot based application using [Spring for Apache Kafka](https://spring.io/projects/spring-kafka) (2.6.5), Apache Avro for event serialization and deserialization and uses an in-memory H2 database that contributes to the query side of our CQRS-based system. The application itself is minimal and implements a subset of David Allen's Getting Things Done time management method.
The code presented in this repository is the joint work of [Boris Fresow](mailto://bfresow@gmail.com) and [Markus Günther](mailto://markus.guenther@gmail.com) as part of an article series on **Building Event-based applications with Spring Kafka** for the German [JavaMagazin](https://jaxenter.de/magazine/java-magazin).
@@ -14,8 +14,8 @@ Running the showcase requires a working installation of Apache ZooKeeper and Apa
| Application | Version | Docker Image |
| ------------------- | --------- | ----------------------- |
| Apache Kafka | 0.11.0.0 | kafka-sampler/kafka |
| Apache ZooKeeper | 3.4.8-1 | kafka-sampler/zookeeper |
| Apache Kafka | 2.6.0 | wurstmeister/kafka:2.13-2.6.0 |
| Apache ZooKeeper | 3.4.13 | wurstmeister/zookeeper |
### Building and Running the Containers

View File

@@ -1,22 +1,26 @@
version: '2'
version: '2.2'
services:
zookeeper:
image: getting-things-done/zookeeper
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
- 2181:2181
kafka:
image: getting-things-done/kafka
links:
- zookeeper
image: wurstmeister/kafka:2.13-2.6.0
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 5
KAFKA_CREATE_TOPICS: "topic-getting-things-done:5:1"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
volumes:
- /var/run/docker.sock:/var/run/docker.sock
discovery:
image: getting-things-done/discovery-service
ports:

View File

@@ -1,32 +0,0 @@
FROM phusion/baseimage:latest
MAINTAINER Markus Günther <markus.guenther@gmail.com>
ENV DEBIAN_FRONTEND noninteractive
ENV SCALA_VERSION 2.11
ENV KAFKA_VERSION 0.11.0.0
ENV KAFKA_HOME /opt/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION"
# Install Oracle Java 8, some utilities and Kafka
RUN \
echo oracle-java8-installer shared/accepted-oracle-license-v1-1 select true | debconf-set-selections && \
add-apt-repository -y ppa:webupd8team/java && \
apt-get update && \
apt-get install -y oracle-java8-installer && \
apt-get install -y wget supervisor dnsutils curl jq coreutils docker net-tools && \
rm -rf /var/lib/apt/lists/* && \
rm -rf /var/cache/oracle-jdk8-installer && \
apt-get clean && \
wget -q http://apache.mirrors.spacedump.net/kafka/"$KAFKA_VERSION"/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && \
tar xfz /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -C /opt && \
rm /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz
VOLUME ["/kafka"]
ADD kafka-start /usr/bin/kafka-start
ADD kafka-create-topics /usr/bin/kafka-create-topics
ADD supervisord.conf /etc/supervisor/conf.d/
EXPOSE 9092
CMD ["/sbin/my_init", "kafka-start"]

View File

@@ -1,36 +0,0 @@
#!/bin/bash
if [[ -z "$START_TIMEOUT" ]]; then
START_TIMEOUT=600
fi
start_timeout_exceeded=false
count=0
step=10
while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
echo "waiting for kafka to be ready"
sleep $step;
count=$(expr $count + $step)
if [ $count -gt $START_TIMEOUT ]; then
start_timeout_exceeded=true
break
fi
done
if $start_timeout_exceeded; then
echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
exit 1
fi
if [[ -n $KAFKA_CREATE_TOPICS ]]; then
IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
echo "creating topics: $topicToCreate"
IFS=':' read -a topicConfig <<< "$topicToCreate"
if [ ${topicConfig[3]} ]; then
JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --config cleanup.policy="${topicConfig[3]}" --if-not-exists
else
JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --if-not-exists
fi
done
fi

View File

@@ -1,138 +0,0 @@
#!/bin/bash
if [[ -z "$KAFKA_PORT" ]]; then
export KAFKA_PORT=9092
fi
create-topics.sh &
if [[ -z "$KAFKA_ADVERTISED_PORT" && \
-z "$KAFKA_LISTENERS" && \
-z "$KAFKA_ADVERTISED_LISTENERS" && \
-S /var/run/docker.sock ]]; then
export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
fi
if [[ -z "$KAFKA_BROKER_ID" ]]; then
if [[ -n "$BROKER_ID_COMMAND" ]]; then
export KAFKA_BROKER_ID=$(eval $BROKER_ID_COMMAND)
else
# By default auto allocate broker ID
export KAFKA_BROKER_ID=-1
fi
fi
if [[ -z "$KAFKA_LOG_DIRS" ]]; then
export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
fi
if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
fi
if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
unset KAFKA_HEAP_OPTS
fi
if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then
export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND)
fi
if [[ -n "$KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" ]]; then
if [[ -n "$KAFKA_ADVERTISED_PORT" && -n "$KAFKA_ADVERTISED_PROTOCOL_NAME" ]]; then
export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_PROTOCOL_NAME}://${KAFKA_ADVERTISED_HOST_NAME-}:${KAFKA_ADVERTISED_PORT}"
export KAFKA_LISTENERS="$KAFKA_ADVERTISED_PROTOCOL_NAME://:$KAFKA_ADVERTISED_PORT"
fi
if [[ -z "$KAFKA_PROTOCOL_NAME" ]]; then
export KAFKA_PROTOCOL_NAME="${KAFKA_ADVERTISED_PROTOCOL_NAME}"
fi
if [[ -n "$KAFKA_PORT" && -n "$KAFKA_PROTOCOL_NAME" ]]; then
export ADD_LISTENER="${KAFKA_PROTOCOL_NAME}://${KAFKA_HOST_NAME-}:${KAFKA_PORT}"
fi
if [[ -z "$KAFKA_INTER_BROKER_LISTENER_NAME" ]]; then
export KAFKA_INTER_BROKER_LISTENER_NAME=$KAFKA_PROTOCOL_NAME
fi
else
#DEFAULT LISTENERS
export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME-}:${KAFKA_ADVERTISED_PORT-$KAFKA_PORT}"
export KAFKA_LISTENERS="PLAINTEXT://${KAFKA_HOST_NAME-}:${KAFKA_PORT-9092}"
fi
if [[ -n "$ADD_LISTENER" && -n "$KAFKA_LISTENERS" ]]; then
export KAFKA_LISTENERS="${KAFKA_LISTENERS},${ADD_LISTENER}"
fi
if [[ -n "$ADD_LISTENER" && -z "$KAFKA_LISTENERS" ]]; then
export KAFKA_LISTENERS="${ADD_LISTENER}"
fi
if [[ -n "$ADD_LISTENER" && -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_LISTENERS},${ADD_LISTENER}"
fi
if [[ -n "$ADD_LISTENER" && -z "$KAFKA_ADVERTISED_LISTENERS" ]]; then
export KAFKA_ADVERTISED_LISTENERS="${ADD_LISTENER}"
fi
if [[ -n "$KAFKA_INTER_BROKER_LISTENER_NAME" && ! "$KAFKA_INTER_BROKER_LISTENER_NAME"X = "$KAFKA_PROTOCOL_NAME"X ]]; then
if [[ -n "$KAFKA_INTER_BROKER_PORT" ]]; then
export KAFKA_INTER_BROKER_PORT=$(( $KAFKA_PORT + 1 ))
fi
export INTER_BROKER_LISTENER="${KAFKA_INTER_BROKER_LISTENER_NAME}://:${KAFKA_INTER_BROKER_PORT}"
export KAFKA_LISTENERS="${KAFKA_LISTENERS},${INTER_BROKER_LISTENER}"
export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_LISTENERS},${INTER_BROKER_LISTENER}"
unset KAFKA_INTER_BROKER_PORT
unset KAFKA_SECURITY_INTER_BROKER_PROTOCOL
unset INTER_BROKER_LISTENER
fi
if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then
export KAFKA_BROKER_RACK=$(eval $RACK_COMMAND)
fi
#Issue newline to config file in case there is not one already
echo -e "\n" >> $KAFKA_HOME/config/server.properties
unset KAFKA_CREATE_TOPICS
unset KAFKA_ADVERTISED_PROTOCOL_NAME
unset KAFKA_PROTOCOL_NAME
if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
unset KAFKA_ADVERTISED_PORT
unset KAFKA_ADVERTISED_HOST_NAME
fi
if [[ -n "$KAFKA_LISTENERS" ]]; then
unset KAFKA_PORT
unset KAFKA_HOST_NAME
fi
for VAR in `env`
do
if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
else
echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
fi
fi
if [[ $VAR =~ ^LOG4J_ ]]; then
log4j_name=`echo "$VAR" | sed -r "s/(LOG4J_.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
log4j_env=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
if egrep -q "(^|^#)$log4j_name=" $KAFKA_HOME/config/log4j.properties; then
sed -r -i "s@(^|^#)($log4j_name)=(.*)@\2=${!log4j_env}@g" $KAFKA_HOME/config/log4j.properties #note that no config values may contain an '@' char
else
echo "$log4j_name=${!log4j_env}" >> $KAFKA_HOME/config/log4j.properties
fi
fi
done
if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
eval $CUSTOM_INIT_SCRIPT
fi
exec $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

View File

@@ -1,7 +0,0 @@
[supervisord]
nodaemon=true
[program:kafka]
command=/usr/bin/kafka-start
autostart=true
autorestart=true

View File

@@ -1,20 +0,0 @@
FROM phusion/baseimage:latest
MAINTAINER Markus Guenther <markus.guenther@gmail.com>
ENV ZK_VERSION 3.4.8-1
ENV ZK_HOME /usr/share/zookeeper
RUN apt-get update && \
apt-get install -y zookeeper=$ZK_VERSION supervisor dnsutils && \
rm -rf /var/lib/apt/lists/* && \
apt-get clean && \
apt-cache policy zookeeper
VOLUME ["/zookeeper"]
ADD supervisord.conf /etc/supervisor/conf.d/
EXPOSE 2181 2888 3888
CMD ["/sbin/my_init", "supervisord"]

View File

@@ -1,7 +0,0 @@
[supervisord]
nodaemon=true
[program:zookeeper]
command=/usr/share/zookeeper/bin/zkServer.sh start-foreground
autostart=true
autorestart=true

View File

@@ -1,5 +1,6 @@
package net.mguenther.gtd;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@@ -16,7 +17,7 @@ import org.springframework.context.annotation.Bean;
public class GtdApiGatewayApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(GtdApiGatewayApplication.class).web(true).run(args);
new SpringApplicationBuilder(GtdApiGatewayApplication.class).web(WebApplicationType.SERVLET).run(args);
}
@Bean

View File

@@ -35,7 +35,7 @@ public class ItemEventConsumer {
this.eventHandler = eventHandler;
}
@KafkaListener(topics = "${gtd.topic}", group = "getting-things-done")
@KafkaListener(topics = "${gtd.topic}", groupId = "getting-things-done")
public void consume(final AvroItemEvent itemEvent, final Acknowledgment ack) {
final ItemEvent event = converter.to(itemEvent);
log.debug("Received event {}. Trying to apply it to the latest state of aggregate with ID {}.", event, event.getItemId());

View File

@@ -27,8 +27,7 @@ public class DefaultItemView implements ItemView {
public CompletableFuture<List<Item>> getItems() {
return CompletableFuture.supplyAsync(() -> {
final List<Item> items = new ArrayList<>();
items.addAll(repository.findAll());
final List<Item> items = new ArrayList<>(repository.findAll());
return Collections.unmodifiableList(items);
});
}
@@ -36,6 +35,6 @@ public class DefaultItemView implements ItemView {
@Override
public CompletableFuture<Optional<Item>> getItem(final String itemId) {
return CompletableFuture.supplyAsync(() -> Optional.ofNullable(repository.findOne(itemId)));
return CompletableFuture.supplyAsync(() -> Optional.of(repository.getOne(itemId)));
}
}

View File

@@ -44,7 +44,7 @@ public class ItemUpdater implements EventHandler {
}
private void modifyExistingItem(final ItemEvent event) {
final Item item = repository.findOne(event.getItemId());
final Item item = repository.getOne(event.getItemId());
item.project(event);
final Item updatedItem = repository.save(item);
log.info("Applied event {} to the aggregate with ID {} and current state {}.", event, event.getItemId(), updatedItem);

View File

@@ -31,7 +31,7 @@ public class ItemEventConsumer {
this.eventHandler = eventHandler;
}
@KafkaListener(topics = "${gtd.topic}", group = "getting-things-done")
@KafkaListener(topics = "${gtd.topic}", groupId = "getting-things-done")
public void consume(final AvroItemEvent itemEvent, final Acknowledgment ack) {
final ItemEvent event = converter.to(itemEvent);
log.debug("Received event {}. Trying to apply it to the latest state of aggregate with ID {}.", event, event.getItemId());

30
pom.xml
View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<version>2.4.2</version>
<relativePath/>
</parent>
@@ -21,29 +21,22 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- Managed dependency versions -->
<junit.version>4.12</junit.version>
<hamcrest.version>1.3</hamcrest.version>
<junit.jupiter.version>5.7.0</junit.jupiter.version>
<junit.vintage.version>5.7.0</junit.vintage.version>
<junit.version>4.13.1</junit.version>
<hamcrest.version>2.2</hamcrest.version>
<slf4j.version>1.7.22</slf4j.version>
<spring.kafka.version>1.3.0.BUILD-SNAPSHOT</spring.kafka.version>
<spring.version>5.3.3</spring.version>
<spring.kafka.version>2.6.5</spring.kafka.version>
<spring-cloud.version>Dalston.SR2</spring-cloud.version>
<avro.version>1.8.1</avro.version>
<commons-lang3.version>3.5</commons-lang3.version>
<!-- Plugin versions -->
<plugin.avro.version>1.8.1</plugin.avro.version>
<plugin.avro.version>1.10.1</plugin.avro.version>
<plugin.compiler.version>3.5</plugin.compiler.version>
<plugin.surefire.version>2.22.2</plugin.surefire.version>
</properties>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<modules>
<module>gtd-codec</module>
<module>gtd-common</module>
@@ -70,6 +63,11 @@
<artifactId>avro-maven-plugin</artifactId>
<version>${plugin.avro.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${plugin.surefire.version}</version>
</plugin>
</plugins>
</pluginManagement>
</build>