Compare commits
15 Commits
feature/mo
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e7f1a901d9 | ||
|
|
39430d8426 | ||
|
|
db49715fc0 | ||
|
|
98611cf3e8 | ||
|
|
dfce05f89b | ||
|
|
c8105a8e91 | ||
|
|
92974d37b6 | ||
|
|
9d792559ce | ||
|
|
087998745b | ||
|
|
995b1c5457 | ||
|
|
d56dfa6208 | ||
|
|
eb019a12f0 | ||
|
|
f9a50bf1d9 | ||
|
|
a87fde57fe | ||
|
|
ff088e891f |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -35,4 +35,5 @@ kafka_data
|
||||
zookeeper
|
||||
pgdata
|
||||
prometheus
|
||||
mongodb_data_container
|
||||
mongodb_data_container
|
||||
grafana
|
||||
22
Makefile
22
Makefile
@@ -1,27 +1,5 @@
|
||||
.PHONY:
|
||||
|
||||
|
||||
#Quarkus
|
||||
|
||||
## Running the application in dev mode
|
||||
#You can run your application in dev mode that enables live coding using:
|
||||
dev:
|
||||
./mvnw compile quarkus:dev
|
||||
|
||||
# The application can be packaged using:
|
||||
package:
|
||||
./mvnw package
|
||||
|
||||
|
||||
package-uber-jar:
|
||||
./mvnw package -Dquarkus.package.type=uber-jar
|
||||
|
||||
package-native:
|
||||
./mvnw package -Pnative
|
||||
|
||||
package-native-GraalVM:
|
||||
./mvnw package -Pnative -Dquarkus.native.container-build=true
|
||||
|
||||
# ==============================================================================
|
||||
# Docker
|
||||
|
||||
|
||||
39
README.md
Normal file
39
README.md
Normal file
@@ -0,0 +1,39 @@
|
||||
### Spring CQRS and Event Sourcing microservice example 👋💫✨
|
||||
|
||||
#### 👨💻 Full list what has been used:
|
||||
* [Spring](https://spring.io/) - Java Spring
|
||||
* [Spring Data JPA](https://spring.io/projects/spring-data-jpa) - data access layer
|
||||
* [Spring Data MongoDB](https://spring.io/projects/spring-data-mongodb) - Spring Data MongoDB
|
||||
* [Spring Cloud Sleuth](https://spring.io/projects/spring-cloud-sleuth) - Spring Cloud Sleuth distributed tracing
|
||||
* [Kafka](https://spring.io/projects/spring-kafka) - Spring for Apache Kafka
|
||||
* [PostgreSQL](https://www.postgresql.org/) - PostgreSQL database.
|
||||
* [Jaeger](https://www.jaegertracing.io/) - Jaeger is a distributed tracing system
|
||||
* [Docker](https://www.docker.com/) - Docker
|
||||
* [Prometheus](https://prometheus.io/) - Prometheus
|
||||
* [Grafana](https://grafana.com/) - Grafana
|
||||
* [Flyway](https://flywaydb.org/) - Database migrations
|
||||
* [Resilience4j](https://resilience4j.readme.io/docs/getting-started-3) - Resilience4j is a lightweight, easy-to-use fault tolerance
|
||||
* [Swagger OpenAPI 3](https://springdoc.org/) - java library helps to automate the generation of API documentation
|
||||
|
||||
### Swagger UI:
|
||||
|
||||
http://localhost:8006/swagger-ui/index.html
|
||||
|
||||
### Jaeger UI:
|
||||
|
||||
http://localhost:16686
|
||||
|
||||
### Prometheus UI:
|
||||
|
||||
http://localhost:9090
|
||||
|
||||
### Grafana UI:
|
||||
|
||||
http://localhost:3005
|
||||
|
||||
|
||||
For local development:
|
||||
```
|
||||
make local // runs docker-compose.yaml with all required containers
|
||||
run spring application
|
||||
```
|
||||
@@ -102,7 +102,9 @@ services:
|
||||
grafana:
|
||||
container_name: grafana_container
|
||||
restart: always
|
||||
image: grafana/grafana
|
||||
build: monitoring/grafana
|
||||
volumes:
|
||||
- ./grafana:/var/lib/grafana
|
||||
ports:
|
||||
- '3005:3000'
|
||||
networks: [ "microservices" ]
|
||||
|
||||
2
monitoring/grafana/Dockerfile
Normal file
2
monitoring/grafana/Dockerfile
Normal file
@@ -0,0 +1,2 @@
|
||||
FROM grafana/grafana
|
||||
ADD ./provisioning /etc/grafana/provisioning
|
||||
12
monitoring/grafana/provisioning/dashboards/all.yaml
Normal file
12
monitoring/grafana/provisioning/dashboards/all.yaml
Normal file
@@ -0,0 +1,12 @@
|
||||
# grafana/provisioning/dashboards/all.yaml
|
||||
apiVersion: 1
|
||||
|
||||
providers:
|
||||
- name: 'default'
|
||||
folder: 'default'
|
||||
type: file
|
||||
allowUiUpdates: true
|
||||
updateIntervalSeconds: 30
|
||||
options:
|
||||
path: /etc/grafana/provisioning/dashboards
|
||||
foldersFromFilesStructure: true
|
||||
3687
monitoring/grafana/provisioning/dashboards/jvm-micrometer_rev9.json
Normal file
3687
monitoring/grafana/provisioning/dashboards/jvm-micrometer_rev9.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,816 @@
|
||||
{
|
||||
"annotations": {
|
||||
"list": [
|
||||
{
|
||||
"builtIn": 1,
|
||||
"datasource": "-- Grafana --",
|
||||
"enable": true,
|
||||
"hide": true,
|
||||
"iconColor": "rgba(0, 211, 255, 1)",
|
||||
"name": "Annotations & Alerts",
|
||||
"type": "dashboard"
|
||||
}
|
||||
]
|
||||
},
|
||||
"description": "Micrometer Http throughput dashboard ",
|
||||
"editable": true,
|
||||
"gnetId": 5373,
|
||||
"graphTooltip": 0,
|
||||
"id": 5,
|
||||
"iteration": 1615648797248,
|
||||
"links": [],
|
||||
"panels": [
|
||||
{
|
||||
"collapsed": false,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"id": 2,
|
||||
"panels": [],
|
||||
"repeat": null,
|
||||
"title": "Status and SLA",
|
||||
"type": "row"
|
||||
},
|
||||
{
|
||||
"cacheTimeout": null,
|
||||
"colorBackground": false,
|
||||
"colorValue": false,
|
||||
"colors": [
|
||||
"#299c46",
|
||||
"rgba(237, 129, 40, 0.89)",
|
||||
"#d44a3a"
|
||||
],
|
||||
"datasource": "Prometheus",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"format": "none",
|
||||
"gauge": {
|
||||
"maxValue": 100,
|
||||
"minValue": 0,
|
||||
"show": false,
|
||||
"thresholdLabels": false,
|
||||
"thresholdMarkers": true
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 4,
|
||||
"w": 2,
|
||||
"x": 0,
|
||||
"y": 1
|
||||
},
|
||||
"id": 4,
|
||||
"interval": null,
|
||||
"links": [],
|
||||
"mappingType": 1,
|
||||
"mappingTypes": [
|
||||
{
|
||||
"name": "value to text",
|
||||
"value": 1
|
||||
},
|
||||
{
|
||||
"name": "range to text",
|
||||
"value": 2
|
||||
}
|
||||
],
|
||||
"maxDataPoints": 100,
|
||||
"nullPointMode": "connected",
|
||||
"nullText": null,
|
||||
"postfix": "",
|
||||
"postfixFontSize": "50%",
|
||||
"prefix": "",
|
||||
"prefixFontSize": "50%",
|
||||
"rangeMaps": [
|
||||
{
|
||||
"from": "null",
|
||||
"text": "N/A",
|
||||
"to": "null"
|
||||
}
|
||||
],
|
||||
"sparkline": {
|
||||
"fillColor": "rgba(31, 118, 189, 0.18)",
|
||||
"full": false,
|
||||
"lineColor": "rgb(31, 120, 193)",
|
||||
"show": false
|
||||
},
|
||||
"tableColumn": "",
|
||||
"targets": [
|
||||
{
|
||||
"expr": "up{application=\"$application\",instance=\"$instance\"}",
|
||||
"format": "time_series",
|
||||
"intervalFactor": 1,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"thresholds": "",
|
||||
"title": "Status",
|
||||
"type": "singlestat",
|
||||
"valueFontSize": "80%",
|
||||
"valueMaps": [
|
||||
{
|
||||
"op": "=",
|
||||
"text": "UP",
|
||||
"value": "1"
|
||||
},
|
||||
{
|
||||
"op": "=",
|
||||
"text": "DOWN",
|
||||
"value": "0"
|
||||
},
|
||||
{
|
||||
"op": "=",
|
||||
"text": "DOWN",
|
||||
"value": "null"
|
||||
}
|
||||
],
|
||||
"valueName": "avg"
|
||||
},
|
||||
{
|
||||
"columns": [],
|
||||
"datasource": "Prometheus",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"fontSize": "100%",
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 22,
|
||||
"x": 2,
|
||||
"y": 1
|
||||
},
|
||||
"hideTimeOverride": false,
|
||||
"id": 26,
|
||||
"links": [],
|
||||
"pageSize": null,
|
||||
"scroll": true,
|
||||
"showHeader": true,
|
||||
"sort": {
|
||||
"col": 0,
|
||||
"desc": true
|
||||
},
|
||||
"styles": [
|
||||
{
|
||||
"alias": "Time",
|
||||
"align": "auto",
|
||||
"dateFormat": "YYYY-MM-DD HH:mm:ss",
|
||||
"pattern": "Time",
|
||||
"type": "date"
|
||||
},
|
||||
{
|
||||
"alias": "",
|
||||
"align": "auto",
|
||||
"colorMode": null,
|
||||
"colors": [
|
||||
"rgba(245, 54, 54, 0.9)",
|
||||
"rgba(237, 129, 40, 0.89)",
|
||||
"rgba(50, 172, 45, 0.97)"
|
||||
],
|
||||
"decimals": 2,
|
||||
"pattern": "/.*/",
|
||||
"thresholds": [],
|
||||
"type": "number",
|
||||
"unit": "short"
|
||||
}
|
||||
],
|
||||
"targets": [
|
||||
{
|
||||
"expr": "http_server_requests_seconds_bucket{application=\"$application\", instance=\"$instance\", le=\"0.05\"}",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Number Requests that missed 50ms SLA",
|
||||
"transform": "table",
|
||||
"type": "table-old"
|
||||
},
|
||||
{
|
||||
"cacheTimeout": null,
|
||||
"colorBackground": false,
|
||||
"colorValue": true,
|
||||
"colors": [
|
||||
"#299c46",
|
||||
"rgba(237, 129, 40, 0.89)",
|
||||
"#d44a3a"
|
||||
],
|
||||
"datasource": "Prometheus",
|
||||
"decimals": 1,
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"format": "s",
|
||||
"gauge": {
|
||||
"maxValue": 100,
|
||||
"minValue": 0,
|
||||
"show": false,
|
||||
"thresholdLabels": false,
|
||||
"thresholdMarkers": true
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 4,
|
||||
"w": 2,
|
||||
"x": 0,
|
||||
"y": 5
|
||||
},
|
||||
"id": 18,
|
||||
"interval": null,
|
||||
"links": [],
|
||||
"mappingType": 1,
|
||||
"mappingTypes": [
|
||||
{
|
||||
"name": "value to text",
|
||||
"value": 1
|
||||
},
|
||||
{
|
||||
"name": "range to text",
|
||||
"value": 2
|
||||
}
|
||||
],
|
||||
"maxDataPoints": 100,
|
||||
"nullPointMode": "connected",
|
||||
"nullText": null,
|
||||
"postfix": "",
|
||||
"postfixFontSize": "50%",
|
||||
"prefix": "",
|
||||
"prefixFontSize": "50%",
|
||||
"rangeMaps": [
|
||||
{
|
||||
"from": "null",
|
||||
"text": "N/A",
|
||||
"to": "null"
|
||||
}
|
||||
],
|
||||
"sparkline": {
|
||||
"fillColor": "rgba(31, 118, 189, 0.18)",
|
||||
"full": false,
|
||||
"lineColor": "rgb(31, 120, 193)",
|
||||
"show": false
|
||||
},
|
||||
"tableColumn": "",
|
||||
"targets": [
|
||||
{
|
||||
"expr": "process_uptime_seconds{application=\"$application\", instance=\"$instance\"}",
|
||||
"format": "time_series",
|
||||
"intervalFactor": 1,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"thresholds": "",
|
||||
"title": "Up Time",
|
||||
"type": "singlestat",
|
||||
"valueFontSize": "80%",
|
||||
"valueMaps": [
|
||||
{
|
||||
"op": "=",
|
||||
"text": "N/A",
|
||||
"value": "null"
|
||||
}
|
||||
],
|
||||
"valueName": "current"
|
||||
},
|
||||
{
|
||||
"collapsed": false,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 9
|
||||
},
|
||||
"id": 8,
|
||||
"panels": [],
|
||||
"title": "Throughput",
|
||||
"type": "row"
|
||||
},
|
||||
{
|
||||
"aliasColors": {},
|
||||
"bars": false,
|
||||
"dashLength": 10,
|
||||
"dashes": false,
|
||||
"datasource": "Prometheus",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 10
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 10,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
"current": true,
|
||||
"max": true,
|
||||
"min": false,
|
||||
"show": true,
|
||||
"total": false,
|
||||
"values": true
|
||||
},
|
||||
"lines": true,
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"alertThreshold": true
|
||||
},
|
||||
"percentage": false,
|
||||
"pluginVersion": "7.4.3",
|
||||
"pointradius": 5,
|
||||
"points": false,
|
||||
"renderer": "flot",
|
||||
"seriesOverrides": [],
|
||||
"spaceLength": 10,
|
||||
"stack": false,
|
||||
"steppedLine": false,
|
||||
"targets": [
|
||||
{
|
||||
"expr": "rate(http_server_requests_seconds_count{application=\"$application\", instance=\"$instance\"}[1m])",
|
||||
"format": "time_series",
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "{{method}}-{{status}}-{{uri}}",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"thresholds": [],
|
||||
"timeFrom": null,
|
||||
"timeRegions": [],
|
||||
"timeShift": null,
|
||||
"title": "Requests per second",
|
||||
"tooltip": {
|
||||
"shared": true,
|
||||
"sort": 0,
|
||||
"value_type": "individual"
|
||||
},
|
||||
"type": "graph",
|
||||
"xaxis": {
|
||||
"buckets": null,
|
||||
"mode": "time",
|
||||
"name": null,
|
||||
"show": true,
|
||||
"values": []
|
||||
},
|
||||
"yaxes": [
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
},
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
}
|
||||
],
|
||||
"yaxis": {
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"aliasColors": {},
|
||||
"bars": false,
|
||||
"dashLength": 10,
|
||||
"dashes": false,
|
||||
"datasource": "Prometheus",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 10
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 12,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
"current": false,
|
||||
"hideEmpty": false,
|
||||
"hideZero": false,
|
||||
"max": false,
|
||||
"min": false,
|
||||
"show": true,
|
||||
"total": false,
|
||||
"values": false
|
||||
},
|
||||
"lines": true,
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null as zero",
|
||||
"options": {
|
||||
"alertThreshold": true
|
||||
},
|
||||
"percentage": false,
|
||||
"pluginVersion": "7.4.3",
|
||||
"pointradius": 5,
|
||||
"points": false,
|
||||
"renderer": "flot",
|
||||
"seriesOverrides": [],
|
||||
"spaceLength": 10,
|
||||
"stack": false,
|
||||
"steppedLine": false,
|
||||
"targets": [
|
||||
{
|
||||
"expr": "rate(http_server_requests_seconds_sum{application=\"$application\", instance=\"$instance\"}[1m])/rate(http_server_requests_seconds_count{application=\"$application\", instance=\"$instance\"}[1m])",
|
||||
"format": "time_series",
|
||||
"instant": false,
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "{{method}}-{{status}}-{{uri}}",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"thresholds": [],
|
||||
"timeFrom": null,
|
||||
"timeRegions": [],
|
||||
"timeShift": null,
|
||||
"title": "Mean response time",
|
||||
"tooltip": {
|
||||
"shared": true,
|
||||
"sort": 0,
|
||||
"value_type": "individual"
|
||||
},
|
||||
"type": "graph",
|
||||
"xaxis": {
|
||||
"buckets": null,
|
||||
"mode": "time",
|
||||
"name": null,
|
||||
"show": true,
|
||||
"values": []
|
||||
},
|
||||
"yaxes": [
|
||||
{
|
||||
"format": "s",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
},
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
}
|
||||
],
|
||||
"yaxis": {
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"aliasColors": {},
|
||||
"bars": false,
|
||||
"dashLength": 10,
|
||||
"dashes": false,
|
||||
"datasource": "Prometheus",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 19
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 14,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
"current": true,
|
||||
"max": false,
|
||||
"min": false,
|
||||
"show": true,
|
||||
"total": false,
|
||||
"values": true
|
||||
},
|
||||
"lines": true,
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"alertThreshold": true
|
||||
},
|
||||
"percentage": false,
|
||||
"pluginVersion": "7.4.3",
|
||||
"pointradius": 5,
|
||||
"points": false,
|
||||
"renderer": "flot",
|
||||
"seriesOverrides": [],
|
||||
"spaceLength": 10,
|
||||
"stack": false,
|
||||
"steppedLine": false,
|
||||
"targets": [
|
||||
{
|
||||
"expr": "histogram_quantile(0.95, sum(rate(http_server_requests_seconds_bucket{application=\"$application\", instance=\"$instance\"}[1m])) by (le))",
|
||||
"format": "time_series",
|
||||
"instant": false,
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "95%",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.9, sum(rate(http_server_requests_seconds_bucket{application=\"$application\", instance=\"$instance\"}[1m])) by (le))",
|
||||
"format": "time_series",
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "90%",
|
||||
"refId": "B"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.75, sum(rate(http_server_requests_seconds_bucket{application=\"$application\", instance=\"$instance\"}[1m])) by (le))",
|
||||
"format": "time_series",
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "75%",
|
||||
"refId": "C"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.5, sum(rate(http_server_requests_seconds_bucket{application=\"$application\", instance=\"$instance\"}[1m])) by (le))",
|
||||
"format": "time_series",
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "50%",
|
||||
"refId": "D"
|
||||
}
|
||||
],
|
||||
"thresholds": [],
|
||||
"timeFrom": null,
|
||||
"timeRegions": [],
|
||||
"timeShift": null,
|
||||
"title": "Response time of 50%, 75%, 90%, 95% of requests",
|
||||
"tooltip": {
|
||||
"shared": true,
|
||||
"sort": 0,
|
||||
"value_type": "individual"
|
||||
},
|
||||
"type": "graph",
|
||||
"xaxis": {
|
||||
"buckets": null,
|
||||
"mode": "time",
|
||||
"name": null,
|
||||
"show": true,
|
||||
"values": []
|
||||
},
|
||||
"yaxes": [
|
||||
{
|
||||
"format": "s",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
},
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
}
|
||||
],
|
||||
"yaxis": {
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"aliasColors": {},
|
||||
"bars": false,
|
||||
"dashLength": 10,
|
||||
"dashes": false,
|
||||
"datasource": "Prometheus",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 19
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 16,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
"current": false,
|
||||
"max": false,
|
||||
"min": false,
|
||||
"show": true,
|
||||
"total": false,
|
||||
"values": false
|
||||
},
|
||||
"lines": true,
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null as zero",
|
||||
"options": {
|
||||
"alertThreshold": true
|
||||
},
|
||||
"percentage": false,
|
||||
"pluginVersion": "7.4.3",
|
||||
"pointradius": 5,
|
||||
"points": false,
|
||||
"renderer": "flot",
|
||||
"seriesOverrides": [],
|
||||
"spaceLength": 10,
|
||||
"stack": false,
|
||||
"steppedLine": false,
|
||||
"targets": [
|
||||
{
|
||||
"expr": "topk(10, sum by(uri, method) (rate(http_server_requests_seconds_count{application=\"$application\"}[1m])))",
|
||||
"format": "time_series",
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"thresholds": [],
|
||||
"timeFrom": null,
|
||||
"timeRegions": [],
|
||||
"timeShift": null,
|
||||
"title": "Top 10 APIs",
|
||||
"tooltip": {
|
||||
"shared": true,
|
||||
"sort": 0,
|
||||
"value_type": "individual"
|
||||
},
|
||||
"type": "graph",
|
||||
"xaxis": {
|
||||
"buckets": null,
|
||||
"mode": "time",
|
||||
"name": null,
|
||||
"show": true,
|
||||
"values": []
|
||||
},
|
||||
"yaxes": [
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
},
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
}
|
||||
],
|
||||
"yaxis": {
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"schemaVersion": 27,
|
||||
"style": "dark",
|
||||
"tags": [],
|
||||
"templating": {
|
||||
"list": [
|
||||
{
|
||||
"allValue": null,
|
||||
"current": {
|
||||
"isNone": true,
|
||||
"selected": false,
|
||||
"text": "None",
|
||||
"value": ""
|
||||
},
|
||||
"datasource": "Prometheus",
|
||||
"definition": "",
|
||||
"description": null,
|
||||
"error": null,
|
||||
"hide": 0,
|
||||
"includeAll": false,
|
||||
"label": "Application",
|
||||
"multi": false,
|
||||
"name": "application",
|
||||
"options": [],
|
||||
"query": {
|
||||
"query": "label_values(application)",
|
||||
"refId": "Prometheus-application-Variable-Query"
|
||||
},
|
||||
"refresh": 2,
|
||||
"regex": "",
|
||||
"skipUrlSync": false,
|
||||
"sort": 0,
|
||||
"tagValuesQuery": "",
|
||||
"tags": [],
|
||||
"tagsQuery": "",
|
||||
"type": "query",
|
||||
"useTags": false
|
||||
},
|
||||
{
|
||||
"allValue": null,
|
||||
"current": {
|
||||
"selected": false,
|
||||
"text": "host.docker.internal:8080",
|
||||
"value": "host.docker.internal:8080"
|
||||
},
|
||||
"datasource": "Prometheus",
|
||||
"definition": "",
|
||||
"description": null,
|
||||
"error": null,
|
||||
"hide": 0,
|
||||
"includeAll": false,
|
||||
"label": "instance",
|
||||
"multi": false,
|
||||
"name": "instance",
|
||||
"options": [],
|
||||
"query": {
|
||||
"query": "label_values(up{application=\"$application\"}, instance)",
|
||||
"refId": "Prometheus-instance-Variable-Query"
|
||||
},
|
||||
"refresh": 2,
|
||||
"regex": "",
|
||||
"skipUrlSync": false,
|
||||
"sort": 0,
|
||||
"tagValuesQuery": "",
|
||||
"tags": [],
|
||||
"tagsQuery": "",
|
||||
"type": "query",
|
||||
"useTags": false
|
||||
}
|
||||
]
|
||||
},
|
||||
"time": {
|
||||
"from": "now-15m",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {
|
||||
"refresh_intervals": [
|
||||
"5s",
|
||||
"10s",
|
||||
"30s",
|
||||
"1m",
|
||||
"5m",
|
||||
"15m",
|
||||
"30m",
|
||||
"1h",
|
||||
"2h",
|
||||
"1d"
|
||||
],
|
||||
"time_options": [
|
||||
"5m",
|
||||
"15m",
|
||||
"1h",
|
||||
"6h",
|
||||
"12h",
|
||||
"24h",
|
||||
"2d",
|
||||
"7d",
|
||||
"30d"
|
||||
]
|
||||
},
|
||||
"timezone": "",
|
||||
"title": "Micrometer Spring Throughput",
|
||||
"uid": "twqdYjziz",
|
||||
"version": 1
|
||||
}
|
||||
9
monitoring/grafana/provisioning/datasources/all.yaml
Normal file
9
monitoring/grafana/provisioning/datasources/all.yaml
Normal file
@@ -0,0 +1,9 @@
|
||||
apiVersion: 1
|
||||
|
||||
datasources:
|
||||
- name: Prometheus
|
||||
label: Prometheus
|
||||
type: prometheus
|
||||
access: proxy
|
||||
url: http://localhost:9090
|
||||
isDefault: true
|
||||
25
pom.xml
25
pom.xml
@@ -17,6 +17,31 @@
|
||||
<java.version>17</java.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-spring-boot2</artifactId>
|
||||
<version>1.7.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
|
||||
<version>3.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-sleuth</artifactId>
|
||||
<version>3.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.micrometer</groupId>
|
||||
<artifactId>micrometer-registry-prometheus</artifactId>
|
||||
<version>1.8.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springdoc</groupId>
|
||||
<artifactId>springdoc-openapi-ui</artifactId>
|
||||
<version>1.6.7</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-mongodb</artifactId>
|
||||
|
||||
@@ -3,19 +3,27 @@ package com.eventsourcing.bankAccount.commands;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.annotation.Retry;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@Service
|
||||
public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
public class BankAccountCommandHandler implements BankAccountCommandService {
|
||||
|
||||
private final EventStoreDB eventStoreDB;
|
||||
private static final String SERVICE_NAME = "microservice";
|
||||
|
||||
@Override
|
||||
public String handle(CreateBankAccountCommand command) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public String handle(@SpanTag("command") CreateBankAccountCommand command) {
|
||||
final var aggregate = new BankAccountAggregate(command.aggregateID());
|
||||
aggregate.createBankAccount(command.email(), command.address(), command.userName());
|
||||
eventStoreDB.save(aggregate);
|
||||
@@ -25,7 +33,10 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ChangeEmailCommand command) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public void handle(@SpanTag("command") ChangeEmailCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.changeEmail(command.newEmail());
|
||||
eventStoreDB.save(aggregate);
|
||||
@@ -33,7 +44,10 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ChangeAddressCommand command) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public void handle(@SpanTag("command") ChangeAddressCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.changeAddress(command.newAddress());
|
||||
eventStoreDB.save(aggregate);
|
||||
@@ -41,7 +55,10 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(DepositAmountCommand command) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public void handle(@SpanTag("command") DepositAmountCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.depositBalance(command.amount());
|
||||
eventStoreDB.save(aggregate);
|
||||
|
||||
@@ -4,9 +4,11 @@ package com.eventsourcing.bankAccount.delivery;
|
||||
import com.eventsourcing.bankAccount.commands.*;
|
||||
import com.eventsourcing.bankAccount.dto.*;
|
||||
import com.eventsourcing.bankAccount.queries.BankAccountQueryService;
|
||||
import com.eventsourcing.bankAccount.queries.FindAllOrderByBalance;
|
||||
import com.eventsourcing.bankAccount.queries.GetBankAccountByIDQuery;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
@@ -25,43 +27,43 @@ public class BankAccountController {
|
||||
|
||||
@GetMapping("{aggregateId}")
|
||||
public ResponseEntity<BankAccountResponseDTO> getBankAccount(@PathVariable String aggregateId) {
|
||||
final var query = new GetBankAccountByIDQuery(aggregateId);
|
||||
log.info("GET bank account query: {}", query);
|
||||
final var result = queryService.handle(query);
|
||||
log.info("GET bank account result: {}", result);
|
||||
final var result = queryService.handle(new GetBankAccountByIDQuery(aggregateId));
|
||||
log.info("Get bank account result: {}", result);
|
||||
return ResponseEntity.ok(result);
|
||||
}
|
||||
|
||||
@PostMapping
|
||||
public ResponseEntity<String> createBankAccount(@Valid @RequestBody CreateBankAccountRequestDTO dto) {
|
||||
final var aggregateID = UUID.randomUUID().toString();
|
||||
final var command = new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address());
|
||||
final var id = commandService.handle(command);
|
||||
log.info("CREATE bank account id: {}", id);
|
||||
final var id = commandService.handle(new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address()));
|
||||
log.info("Created bank account id: {}", id);
|
||||
return ResponseEntity.status(HttpStatus.CREATED).body(id);
|
||||
}
|
||||
|
||||
@PostMapping(path = "/deposit/{aggregateId}")
|
||||
public ResponseEntity<Void> depositAmount(@Valid @RequestBody DepositAmountRequestDTO dto, @PathVariable String aggregateId) {
|
||||
final var command = new DepositAmountCommand(aggregateId, dto.amount());
|
||||
commandService.handle(command);
|
||||
log.info("DepositAmountCommand command: {}", command);
|
||||
commandService.handle(new DepositAmountCommand(aggregateId, dto.amount()));
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@PostMapping(path = "/email/{aggregateId}")
|
||||
public ResponseEntity<Void> changeEmail(@Valid @RequestBody ChangeEmailRequestDTO dto, @PathVariable String aggregateId) {
|
||||
final var command = new ChangeEmailCommand(aggregateId, dto.newEmail());
|
||||
commandService.handle(command);
|
||||
log.info("ChangeEmailCommand command: {}", command);
|
||||
commandService.handle(new ChangeEmailCommand(aggregateId, dto.newEmail()));
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@PostMapping(path = "/address/{aggregateId}")
|
||||
public ResponseEntity<Void> changeAddress(@Valid @RequestBody ChangeAddressRequestDTO dto, @PathVariable String aggregateId) {
|
||||
final var command = new ChangeAddressCommand(aggregateId, dto.newAddress());
|
||||
commandService.handle(command);
|
||||
log.info("changeAddress command: {}", command);
|
||||
commandService.handle(new ChangeAddressCommand(aggregateId, dto.newAddress()));
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@GetMapping("/balance")
|
||||
public ResponseEntity<Page<BankAccountResponseDTO>> getAllOrderByBalance(@RequestParam(name = "page", defaultValue = "0") Integer page,
|
||||
@RequestParam(name = "size", defaultValue = "10") Integer size) {
|
||||
|
||||
final var result = queryService.handle(new FindAllOrderByBalance(page, size));
|
||||
log.info("Get all by balance page: {}, size: {}, result: {}", page, size, result);
|
||||
return ResponseEntity.ok(result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
|
||||
import com.eventsourcing.bankAccount.exceptions.InvalidAddressException;
|
||||
import com.eventsourcing.bankAccount.exceptions.InvalidEmailException;
|
||||
import com.eventsourcing.es.AggregateRoot;
|
||||
import com.eventsourcing.es.Event;
|
||||
import com.eventsourcing.es.SerializerUtils;
|
||||
@@ -57,13 +59,13 @@ public class BankAccountAggregate extends AggregateRoot {
|
||||
|
||||
private void handle(final EmailChangedEvent event) {
|
||||
Objects.requireNonNull(event.getNewEmail());
|
||||
if (event.getNewEmail().isBlank()) throw new RuntimeException("invalid email address");
|
||||
if (event.getNewEmail().isBlank()) throw new InvalidEmailException();
|
||||
this.email = event.getNewEmail();
|
||||
}
|
||||
|
||||
private void handle(final AddressUpdatedEvent event) {
|
||||
Objects.requireNonNull(event.getNewAddress());
|
||||
if (event.getNewAddress().isBlank()) throw new RuntimeException("invalid address");
|
||||
if (event.getNewAddress().isBlank()) throw new InvalidAddressException();
|
||||
this.address = event.getNewAddress();
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.eventsourcing.bankAccount.exceptions;
|
||||
|
||||
public class BankAccountDocumentNotFoundException extends RuntimeException {
|
||||
public BankAccountDocumentNotFoundException() {
|
||||
}
|
||||
|
||||
public BankAccountDocumentNotFoundException(String id) {
|
||||
super("bank account document not found id:" + id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.eventsourcing.bankAccount.exceptions;
|
||||
|
||||
public class InvalidAddressException extends RuntimeException {
|
||||
public InvalidAddressException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public InvalidAddressException(String address) {
|
||||
super("invalid address: " + address);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.eventsourcing.bankAccount.exceptions;
|
||||
|
||||
public class InvalidEmailException extends RuntimeException {
|
||||
public InvalidEmailException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public InvalidEmailException(String email) {
|
||||
super("invalid email address: " + email);
|
||||
}
|
||||
}
|
||||
@@ -7,14 +7,19 @@ import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
|
||||
import com.eventsourcing.bankAccount.exceptions.BankAccountDocumentNotFoundException;
|
||||
import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
|
||||
import com.eventsourcing.es.Event;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
import com.eventsourcing.es.Projection;
|
||||
import com.eventsourcing.es.SerializerUtils;
|
||||
import com.eventsourcing.mappers.BankAccountMapper;
|
||||
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.annotation.Retry;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
@@ -24,7 +29,6 @@ import org.springframework.stereotype.Service;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@@ -33,28 +37,28 @@ public class BankAccountMongoProjection implements Projection {
|
||||
|
||||
private final BankAccountMongoRepository mongoRepository;
|
||||
private final EventStoreDB eventStoreDB;
|
||||
private static final String SERVICE_NAME = "microservice";
|
||||
|
||||
|
||||
@KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
|
||||
groupId = "${microservice.kafka.groupId}",
|
||||
concurrency = "${microservice.kafka.default-concurrency}")
|
||||
public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) {
|
||||
log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp());
|
||||
log.info("(BankAccountMongoProjection) data: {}", new String(data));
|
||||
log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}, data: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), new String(data));
|
||||
|
||||
try {
|
||||
final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
|
||||
this.processEvents(Arrays.stream(events).toList());
|
||||
ack.acknowledge();
|
||||
log.info("ack events: {}", Arrays.toString(events));
|
||||
} catch (Exception e) {
|
||||
} catch (Exception ex) {
|
||||
ack.nack(100);
|
||||
log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp());
|
||||
log.error("bankAccountMongoProjectionListener: {}", e.getMessage());
|
||||
log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void processEvents(List<Event> events) {
|
||||
@NewSpan
|
||||
private void processEvents(@SpanTag("events") List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
try {
|
||||
@@ -69,7 +73,10 @@ public class BankAccountMongoProjection implements Projection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void when(Event event) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public void when(@SpanTag("event") Event event) {
|
||||
final var aggregateId = event.getAggregateId();
|
||||
log.info("(when) >>>>> aggregateId: {}", aggregateId);
|
||||
|
||||
@@ -87,7 +94,8 @@ public class BankAccountMongoProjection implements Projection {
|
||||
}
|
||||
|
||||
|
||||
private void handle(BankAccountCreatedEvent event) {
|
||||
@NewSpan
|
||||
private void handle(@SpanTag("event") BankAccountCreatedEvent event) {
|
||||
log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
|
||||
final var document = BankAccountDocument.builder()
|
||||
@@ -102,37 +110,39 @@ public class BankAccountMongoProjection implements Projection {
|
||||
log.info("(BankAccountCreatedEvent) insert: {}", insert);
|
||||
}
|
||||
|
||||
private void handle(EmailChangedEvent event) {
|
||||
@NewSpan
|
||||
private void handle(@SpanTag("event") EmailChangedEvent event) {
|
||||
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
if (documentOptional.isEmpty())
|
||||
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
|
||||
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
|
||||
|
||||
final var document = documentOptional.get();
|
||||
document.setEmail(event.getNewEmail());
|
||||
mongoRepository.save(document);
|
||||
}
|
||||
|
||||
private void handle(AddressUpdatedEvent event) {
|
||||
@NewSpan
|
||||
private void handle(@SpanTag("event") AddressUpdatedEvent event) {
|
||||
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
if (documentOptional.isEmpty())
|
||||
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
|
||||
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
|
||||
|
||||
final var document = documentOptional.get();
|
||||
document.setAddress(event.getNewAddress());
|
||||
mongoRepository.save(document);
|
||||
}
|
||||
|
||||
private void handle(BalanceDepositedEvent event) {
|
||||
@NewSpan
|
||||
private void handle(@SpanTag("event") BalanceDepositedEvent event) {
|
||||
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
if (documentOptional.isEmpty())
|
||||
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
|
||||
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
|
||||
|
||||
final var document = documentOptional.get();
|
||||
final var balance = document.getBalance();
|
||||
final var newBalance = balance.add(event.getAmount());
|
||||
final var newBalance = document.getBalance().add(event.getAmount());
|
||||
document.setBalance(newBalance);
|
||||
mongoRepository.save(document);
|
||||
}
|
||||
|
||||
@@ -6,8 +6,15 @@ import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
|
||||
import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
import com.eventsourcing.mappers.BankAccountMapper;
|
||||
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.annotation.Retry;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Optional;
|
||||
@@ -20,20 +27,33 @@ public class BankAccountQueryHandler implements BankAccountQueryService {
|
||||
|
||||
private final EventStoreDB eventStoreDB;
|
||||
private final BankAccountMongoRepository mongoRepository;
|
||||
private static final String SERVICE_NAME = "microservice";
|
||||
|
||||
@Override
|
||||
public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) {
|
||||
Optional<BankAccountDocument> optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
|
||||
if (optionalDocument.isPresent()) {
|
||||
return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get());
|
||||
}
|
||||
|
||||
final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class);
|
||||
BankAccountDocument savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
|
||||
final var savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
|
||||
log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument);
|
||||
|
||||
final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate);
|
||||
log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO);
|
||||
return bankAccountResponseDTO;
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public Page<BankAccountResponseDTO> handle(@SpanTag("query") FindAllOrderByBalance query) {
|
||||
return mongoRepository.findAll(PageRequest.of(query.page(), query.size(), Sort.by("balance")))
|
||||
.map(BankAccountMapper::bankAccountResponseDTOFromDocument);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package com.eventsourcing.bankAccount.queries;
|
||||
|
||||
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
|
||||
import org.springframework.data.domain.Page;
|
||||
|
||||
public interface BankAccountQueryService {
|
||||
BankAccountResponseDTO handle(GetBankAccountByIDQuery query);
|
||||
Page<BankAccountResponseDTO> handle(FindAllOrderByBalance query);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.eventsourcing.bankAccount.queries;
|
||||
|
||||
public record FindAllOrderByBalance(int page, int size) {
|
||||
}
|
||||
@@ -28,7 +28,6 @@ public class KafkaProducerConfig {
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
producerProps.put(ProducerConfig.ACKS_CONFIG, kafkaConfigProperties.getAcks());
|
||||
// producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfigProperties.getCompressionType());
|
||||
producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries());
|
||||
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getDeliveryTimeoutMs());
|
||||
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfigProperties.getMaxRequestSize());
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.eventsourcing.configuration;
|
||||
|
||||
|
||||
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
|
||||
import io.swagger.v3.oas.annotations.info.Contact;
|
||||
import io.swagger.v3.oas.annotations.info.Info;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
||||
@OpenAPIDefinition(info = @Info(title = "Spring CQRS and Event Sourcing Microservice",
|
||||
description = "Spring Postgresql MongoDB Kafka CQRS and Event Sourcing Microservice",
|
||||
contact = @Contact(name = "Alexander Bryksin", email = "alexander.bryksin@yandex.ru", url = "https://github.com/AleksK1NG")))
|
||||
@Component
|
||||
public class SwaggerOpenAPIConfiguration {
|
||||
}
|
||||
@@ -62,10 +62,6 @@ public abstract class AggregateRoot {
|
||||
this.clearChanges();
|
||||
}
|
||||
|
||||
public String string() {
|
||||
return String.format("id: {%s}, type: {%s}, version: {%s}, changes: {%s}", id, type, version, changes.size());
|
||||
}
|
||||
|
||||
private void validateEvent(final Event event) {
|
||||
if (Objects.isNull(event) || !event.getAggregateId().equals(this.id))
|
||||
throw new InvalidEventException(event.toString());
|
||||
|
||||
@@ -6,7 +6,6 @@ import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@@ -14,6 +13,15 @@ import java.util.UUID;
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class Event {
|
||||
private UUID id;
|
||||
private String aggregateId;
|
||||
private String eventType;
|
||||
private String aggregateType;
|
||||
private long version;
|
||||
private byte[] data;
|
||||
private byte[] metaData;
|
||||
private LocalDateTime timeStamp;
|
||||
|
||||
|
||||
public Event(String eventType, String aggregateType) {
|
||||
this.id = UUID.randomUUID();
|
||||
@@ -22,23 +30,6 @@ public class Event {
|
||||
this.timeStamp = LocalDateTime.now();
|
||||
}
|
||||
|
||||
private UUID id;
|
||||
|
||||
private String aggregateId;
|
||||
|
||||
private String eventType;
|
||||
|
||||
private String aggregateType;
|
||||
|
||||
private long version;
|
||||
|
||||
private byte[] data;
|
||||
|
||||
private byte[] metaData;
|
||||
|
||||
// @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm")
|
||||
private LocalDateTime timeStamp;
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
||||
@@ -4,13 +4,18 @@ package com.eventsourcing.es;
|
||||
import com.eventsourcing.es.exceptions.AggregateNotFoundException;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.dao.EmptyResultDataAccessException;
|
||||
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
|
||||
import static com.eventsourcing.es.Constants.*;
|
||||
|
||||
@Repository
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@@ -27,58 +32,11 @@ public class EventStore implements EventStoreDB {
|
||||
private final NamedParameterJdbcTemplate jdbcTemplate;
|
||||
private final EventBus eventBus;
|
||||
|
||||
@Override
|
||||
public void saveEvents(List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
final List<Event> changes = new ArrayList<>(events);
|
||||
changes.forEach(event -> {
|
||||
int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, Map.of(
|
||||
"aggregate_id", event.getAggregateId(),
|
||||
"aggregate_type", event.getAggregateType(),
|
||||
"event_type", event.getEventType(),
|
||||
"data", Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
|
||||
"metadata", Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
|
||||
"version", event.getVersion()));
|
||||
|
||||
log.info("(saveEvents) saved result: {}, event: {}", result, event);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Event> loadEvents(String aggregateId, long version) {
|
||||
final List<Event> events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version),
|
||||
(rs, rowNum) -> Event.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
.aggregateType(rs.getString("aggregate_type"))
|
||||
.eventType(rs.getString("event_type"))
|
||||
.data(rs.getBytes("data"))
|
||||
.metaData(rs.getBytes("metadata"))
|
||||
.version(rs.getLong("version"))
|
||||
.timeStamp(rs.getTimestamp("timestamp").toLocalDateTime())
|
||||
.build());
|
||||
|
||||
log.info("(loadEvents) events list: {}", events);
|
||||
return events;
|
||||
}
|
||||
|
||||
private <T extends AggregateRoot> void saveSnapshot(T aggregate) {
|
||||
aggregate.toSnapshot();
|
||||
final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);
|
||||
|
||||
int update = jdbcTemplate.update(SAVE_SNAPSHOT_QUERY,
|
||||
Map.of("aggregate_id", snapshot.getAggregateId(),
|
||||
"aggregate_type", snapshot.getAggregateType(),
|
||||
"data", Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
|
||||
"metadata", Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(),
|
||||
"version", snapshot.getVersion()));
|
||||
|
||||
log.info("(saveSnapshot) result: {}", update);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public <T extends AggregateRoot> void save(T aggregate) {
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
|
||||
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
|
||||
|
||||
if (aggregate.getVersion() > 1) {
|
||||
@@ -95,49 +53,10 @@ public class EventStore implements EventStoreDB {
|
||||
log.info("(save) saved aggregate: {}", aggregate);
|
||||
}
|
||||
|
||||
private void handleConcurrency(String aggregateId) {
|
||||
try {
|
||||
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class);
|
||||
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
|
||||
} catch (EmptyResultDataAccessException e) {
|
||||
log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage());
|
||||
}
|
||||
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId);
|
||||
}
|
||||
|
||||
private Optional<Snapshot> loadSnapshot(String aggregateId) {
|
||||
final Optional<Snapshot> snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
.aggregateType(rs.getString("aggregate_type"))
|
||||
.data(rs.getBytes("data"))
|
||||
.metaData(rs.getBytes("metadata"))
|
||||
.version(rs.getLong("version"))
|
||||
.timeStamp(rs.getTimestamp("timestamp").toLocalDateTime())
|
||||
.build()).stream().findFirst();
|
||||
|
||||
snapshot.ifPresent(result -> log.info("(loadSnapshot) snapshot: {}", result));
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
private <T extends AggregateRoot> T getAggregate(final String aggregateId, final Class<T> aggregateType) {
|
||||
try {
|
||||
return aggregateType.getConstructor(String.class).newInstance(aggregateId);
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends AggregateRoot> T getSnapshotFromClass(Optional<Snapshot> snapshot, String aggregateId, Class<T> aggregateType) {
|
||||
if (snapshot.isEmpty()) {
|
||||
final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
|
||||
return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
|
||||
}
|
||||
return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
public <T extends AggregateRoot> T load(String aggregateId, Class<T> aggregateType) {
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
|
||||
|
||||
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
|
||||
|
||||
@@ -156,14 +75,123 @@ public class EventStore implements EventStoreDB {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean exists(String aggregateId) {
|
||||
@NewSpan
|
||||
public void saveEvents(@SpanTag("events") List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
final List<Event> changes = new ArrayList<>(events);
|
||||
if (changes.size() > 1) {
|
||||
this.eventsBatchInsert(changes);
|
||||
return;
|
||||
}
|
||||
|
||||
final Event event = changes.get(0);
|
||||
int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, mapFromEvent(event));
|
||||
log.info("(saveEvents) saved result: {}, event: {}", result, event);
|
||||
}
|
||||
|
||||
private Map<String, Serializable> mapFromEvent(Event event) {
|
||||
return Map.of(
|
||||
AGGREGATE_ID, event.getAggregateId(),
|
||||
AGGREGATE_TYPE, event.getAggregateType(),
|
||||
EVENT_TYPE, event.getEventType(),
|
||||
DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
|
||||
METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
|
||||
VERSION, event.getVersion());
|
||||
}
|
||||
|
||||
|
||||
@NewSpan
|
||||
private void eventsBatchInsert(@SpanTag("events") List<Event> events) {
|
||||
final var args = events.stream().map(this::mapFromEvent).toList();
|
||||
final Map<String, ?>[] maps = args.toArray(new Map[0]);
|
||||
int[] ints = jdbcTemplate.batchUpdate(SAVE_EVENTS_QUERY, maps);
|
||||
log.info("(saveEvents) BATCH saved result: {}, event: {}", ints);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public List<Event> loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) {
|
||||
return jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of(AGGREGATE_ID, aggregateId, VERSION, version),
|
||||
(rs, rowNum) -> Event.builder()
|
||||
.aggregateId(rs.getString(AGGREGATE_ID))
|
||||
.aggregateType(rs.getString(AGGREGATE_TYPE))
|
||||
.eventType(rs.getString(EVENT_TYPE))
|
||||
.data(rs.getBytes(DATA))
|
||||
.metaData(rs.getBytes(METADATA))
|
||||
.version(rs.getLong(VERSION))
|
||||
.timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
|
||||
.build());
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private <T extends AggregateRoot> void saveSnapshot(@SpanTag("aggregate") T aggregate) {
|
||||
aggregate.toSnapshot();
|
||||
final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);
|
||||
|
||||
int updateResult = jdbcTemplate.update(SAVE_SNAPSHOT_QUERY,
|
||||
Map.of(AGGREGATE_ID, snapshot.getAggregateId(),
|
||||
AGGREGATE_TYPE, snapshot.getAggregateType(),
|
||||
DATA, Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
|
||||
METADATA, Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(),
|
||||
VERSION, snapshot.getVersion()));
|
||||
|
||||
log.info("(saveSnapshot) updateResult: {}", updateResult);
|
||||
}
|
||||
|
||||
|
||||
@NewSpan
|
||||
private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
|
||||
try {
|
||||
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class);
|
||||
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
|
||||
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
|
||||
} catch (EmptyResultDataAccessException e) {
|
||||
log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage());
|
||||
}
|
||||
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId);
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private Optional<Snapshot> loadSnapshot(@SpanTag("aggregateId") String aggregateId) {
|
||||
return jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of(AGGREGATE_ID, aggregateId), (rs, rowNum) -> Snapshot.builder()
|
||||
.aggregateId(rs.getString(AGGREGATE_ID))
|
||||
.aggregateType(rs.getString(AGGREGATE_TYPE))
|
||||
.data(rs.getBytes(DATA))
|
||||
.metaData(rs.getBytes(METADATA))
|
||||
.version(rs.getLong(VERSION))
|
||||
.timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
|
||||
.build()).stream().findFirst();
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private <T extends AggregateRoot> T getAggregate(@SpanTag("aggregateId") final String aggregateId, @SpanTag("aggregateType") final Class<T> aggregateType) {
|
||||
try {
|
||||
return aggregateType.getConstructor(String.class).newInstance(aggregateId);
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private <T extends AggregateRoot> T getSnapshotFromClass(@SpanTag("snapshot") Optional<Snapshot> snapshot, @SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
|
||||
if (snapshot.isEmpty()) {
|
||||
final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
|
||||
return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
|
||||
}
|
||||
return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public Boolean exists(@SpanTag("aggregateId") String aggregateId) {
|
||||
try {
|
||||
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
|
||||
log.info("aggregate exists id: {}", id);
|
||||
return true;
|
||||
} catch (Exception ex) {
|
||||
if (!(ex instanceof EmptyResultDataAccessException)) {
|
||||
throw new RuntimeException("exists", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -15,22 +17,25 @@ import java.util.concurrent.TimeUnit;
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class KafkaEventBus implements EventBus {
|
||||
|
||||
private final KafkaTemplate<String, byte[]> kafkaTemplate;
|
||||
private final static long sendTimeout = 3000;
|
||||
|
||||
@Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}")
|
||||
private String bankAccountTopicName;
|
||||
|
||||
@Override
|
||||
public void publish(List<Event> events) {
|
||||
@NewSpan
|
||||
public void publish(@SpanTag("events") List<Event> events) {
|
||||
final byte[] eventsBytes = SerializerUtils.serializeToJsonBytes(events.toArray(new Event[]{}));
|
||||
final ProducerRecord<String, byte[]> record = new ProducerRecord<>(bankAccountTopicName, eventsBytes);
|
||||
|
||||
try {
|
||||
kafkaTemplate.send(record).get(3000, TimeUnit.MILLISECONDS);
|
||||
kafkaTemplate.send(record).get(sendTimeout, TimeUnit.MILLISECONDS);
|
||||
log.info("publishing kafka record value >>>>> {}", new String(record.value()));
|
||||
|
||||
} catch (Exception ex) {
|
||||
log.error("(KafkaEventBus) publish get", ex);
|
||||
log.error("(KafkaEventBus) publish get timeout", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,21 +15,13 @@ import java.util.UUID;
|
||||
public class Snapshot {
|
||||
|
||||
private UUID id;
|
||||
|
||||
private String aggregateId;
|
||||
|
||||
|
||||
private String aggregateType;
|
||||
|
||||
private byte[] data;
|
||||
|
||||
private byte[] metaData;
|
||||
|
||||
|
||||
private long version;
|
||||
private LocalDateTime timeStamp;
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Snapshot{" +
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.eventsourcing.exceptions;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
public record ExceptionResponseDTO(int Status, String message, LocalDateTime timestamp) {
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
package com.eventsourcing.exceptions;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
public record NotFoundResponseDTO(int Status, String message, LocalDateTime timestamp) {
|
||||
}
|
||||
@@ -1,8 +1,11 @@
|
||||
package com.eventsourcing.filters;
|
||||
|
||||
import com.eventsourcing.bankAccount.exceptions.BankAccountDocumentNotFoundException;
|
||||
import com.eventsourcing.bankAccount.exceptions.InvalidAddressException;
|
||||
import com.eventsourcing.bankAccount.exceptions.InvalidEmailException;
|
||||
import com.eventsourcing.es.exceptions.AggregateNotFoundException;
|
||||
import com.eventsourcing.exceptions.InternalServerErrorResponse;
|
||||
import com.eventsourcing.exceptions.NotFoundResponseDTO;
|
||||
import com.eventsourcing.exceptions.ExceptionResponseDTO;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.annotation.Order;
|
||||
@@ -25,7 +28,6 @@ import java.util.Map;
|
||||
@Order(2)
|
||||
public class GlobalControllerAdvice {
|
||||
|
||||
|
||||
@ExceptionHandler(RuntimeException.class)
|
||||
public ResponseEntity<InternalServerErrorResponse> handleRuntimeException(RuntimeException ex, WebRequest request) {
|
||||
final var response = new InternalServerErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage(), LocalDateTime.now().toString());
|
||||
@@ -44,10 +46,18 @@ public class GlobalControllerAdvice {
|
||||
}
|
||||
|
||||
@ResponseStatus(HttpStatus.NOT_FOUND)
|
||||
@ExceptionHandler(AggregateNotFoundException.class)
|
||||
public ResponseEntity<NotFoundResponseDTO> handleAggregateNotFoundException(AggregateNotFoundException ex) {
|
||||
final var notFoundResponseDTO = new NotFoundResponseDTO(HttpStatus.NOT_FOUND.value(), ex.getMessage(), LocalDateTime.now());
|
||||
log.error("AggregateNotFoundException response ex:", ex);
|
||||
@ExceptionHandler({AggregateNotFoundException.class, BankAccountDocumentNotFoundException.class})
|
||||
public ResponseEntity<ExceptionResponseDTO> handleAggregateNotFoundExceptions(AggregateNotFoundException ex) {
|
||||
final var notFoundResponseDTO = new ExceptionResponseDTO(HttpStatus.NOT_FOUND.value(), ex.getMessage(), LocalDateTime.now());
|
||||
log.error("handleAggregateNotFoundExceptions response ex:", ex);
|
||||
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(notFoundResponseDTO);
|
||||
}
|
||||
|
||||
@ResponseStatus(HttpStatus.BAD_REQUEST)
|
||||
@ExceptionHandler({InvalidAddressException.class, InvalidEmailException.class})
|
||||
public ResponseEntity<ExceptionResponseDTO> handleInvalidAggregateExceptions(AggregateNotFoundException ex) {
|
||||
final var notFoundResponseDTO = new ExceptionResponseDTO(HttpStatus.BAD_REQUEST.value(), ex.getMessage(), LocalDateTime.now());
|
||||
log.error("handleInvalidAggregateExceptions response ex:", ex);
|
||||
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(notFoundResponseDTO);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,3 +36,40 @@ spring.data.mongodb.authentication-database=admin
|
||||
spring.data.mongodb.username=admin
|
||||
spring.data.mongodb.password=admin
|
||||
spring.data.mongodb.database=microservices
|
||||
|
||||
springdoc.swagger-ui.path=/swagger-ui.html
|
||||
|
||||
management.endpoints.web.exposure.include=health,prometheus,info
|
||||
management.metrics.export.prometheus.enabled=true
|
||||
|
||||
spring.sleuth.propagation.type=w3c,b3
|
||||
spring.sleuth.opentracing.enabled=true
|
||||
spring.zipkin.base-url=http://localhost:9411
|
||||
|
||||
|
||||
resilience4j.retry.instances.microservice.max-attempts=3
|
||||
resilience4j.retry.instances.microservice.waitDuration=1s
|
||||
resilience4j.retry.instances.microservice.enableExponentialBackoff=true
|
||||
resilience4j.retry.instances.microservice.exponentialBackoffMultiplier=2
|
||||
resilience4j.retry.instances.microservice.ignore-exceptions=com.eventsourcing.es.exceptions.AggregateNotFoundException
|
||||
|
||||
|
||||
resilience4j.circuitbreaker.instances.microservice.registerHealthIndicator=true
|
||||
resilience4j.circuitbreaker.instances.microservice.slidingWindowSize=5
|
||||
resilience4j.circuitbreaker.instances.microservice.permittedNumberOfCallsInHalfOpenState=3
|
||||
resilience4j.circuitbreaker.instances.microservice.slidingWindowType=TIME_BASED
|
||||
resilience4j.circuitbreaker.instances.microservice.minimumNumberOfCalls=10
|
||||
resilience4j.circuitbreaker.instances.microservice.waitDurationInOpenState=20s
|
||||
resilience4j.circuitbreaker.instances.microservice.failureRateThreshold=30
|
||||
resilience4j.circuitbreaker.instances.microservice.eventConsumerBufferSize=10
|
||||
|
||||
|
||||
resilience4j.thread-pool-bulkhead.instances.microservice.maxThreadPoolSize=1
|
||||
resilience4j.thread-pool-bulkhead.instances.microservice.coreThreadPoolSize=1
|
||||
resilience4j.thread-pool-bulkhead.instances.microservice.queueCapacity=1
|
||||
|
||||
resilience4j.timelimiter.instances.microservice.timeoutDuration=3s
|
||||
resilience4j.timelimiter.instances.microservice.cancelRunningFuture=true
|
||||
|
||||
spring.jdbc.template.query-timeout=3
|
||||
|
||||
|
||||
Reference in New Issue
Block a user