Save reactive Redis session on subscribe

This commit ensures ReactiveRedisOperationsSessionRepository#save does work only after subscribe. Without this, multiple invocations of #save over the course of same request can lead to race condition situations.

Resolves: #1399
This commit is contained in:
Vedran Pavic
2019-05-03 21:00:04 +02:00
parent 52f59a83e4
commit 78b72f2d1b
3 changed files with 69 additions and 63 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2018 the original author or authors.
* Copyright 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import java.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -68,6 +69,17 @@ public class ReactiveRedisOperationsSessionRepositoryITests extends AbstractRedi
assertThat(this.repository.findById(toSave.getId()).block()).isNull();
}
@Test // gh-1399
public void saveMultipleTimes() {
ReactiveRedisOperationsSessionRepository.RedisSession session = this.repository
.createSession().block();
session.setAttribute("attribute1", "value1");
Mono<Void> save1 = this.repository.save(session);
session.setAttribute("attribute2", "value2");
Mono<Void> save2 = this.repository.save(session);
Mono.zip(save1, save2).block();
}
@Test
public void putAllOnSingleAttrDoesNotRemoveOld() {
ReactiveRedisOperationsSessionRepository.RedisSession toSave = this.repository

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2018 the original author or authors.
* Copyright 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -143,24 +143,15 @@ public class ReactiveRedisOperationsSessionRepository implements
@Override
public Mono<Void> save(RedisSession session) {
Mono<Void> result = session.saveChangeSessionId()
.then(session.saveDelta())
.and((s) -> {
session.isNew = false;
s.onComplete();
});
if (session.isNew) {
return result;
}
else {
String sessionKey = getSessionKey(
session.hasChangedSessionId() ? session.originalSessionId
: session.getId());
return this.sessionRedisOperations.hasKey(sessionKey)
.flatMap((exists) -> exists ? result
: Mono.error(new IllegalStateException(
"Session was invalidated")));
return session.save();
}
String sessionKey = getSessionKey(
session.hasChangedSessionId() ? session.originalSessionId
: session.getId());
return this.sessionRedisOperations.hasKey(sessionKey).flatMap((exists) -> exists
? session.save()
: Mono.error(new IllegalStateException("Session was invalidated")));
}
@Override
@@ -306,7 +297,7 @@ public class ReactiveRedisOperationsSessionRepository implements
private void flushImmediateIfNecessary() {
if (ReactiveRedisOperationsSessionRepository.this.redisFlushMode == RedisFlushMode.IMMEDIATE) {
saveDelta();
save();
}
}
@@ -315,6 +306,11 @@ public class ReactiveRedisOperationsSessionRepository implements
flushImmediateIfNecessary();
}
private Mono<Void> save() {
return Mono.defer(() -> saveChangeSessionId().then(saveDelta())
.doOnSuccess((aVoid) -> this.isNew = false));
}
private Mono<Void> saveDelta() {
if (this.delta.isEmpty()) {
return Mono.empty();
@@ -322,7 +318,7 @@ public class ReactiveRedisOperationsSessionRepository implements
String sessionKey = getSessionKey(getId());
Mono<Boolean> update = ReactiveRedisOperationsSessionRepository.this.sessionRedisOperations
.opsForHash().putAll(sessionKey, this.delta);
.opsForHash().putAll(sessionKey, new HashMap<>(this.delta));
Mono<Boolean> setTtl = ReactiveRedisOperationsSessionRepository.this.sessionRedisOperations
.expire(sessionKey, getMaxInactiveInterval());

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2018 the original author or authors.
* Copyright 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -132,9 +132,10 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
@Test
public void createSessionDefaultMaxInactiveInterval() {
StepVerifier.create(this.repository.createSession()).consumeNextWith(
(session) -> assertThat(session.getMaxInactiveInterval()).isEqualTo(Duration
.ofSeconds(MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS)))
StepVerifier.create(this.repository.createSession())
.consumeNextWith((session) -> assertThat(session.getMaxInactiveInterval())
.isEqualTo(Duration.ofSeconds(
MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS)))
.verifyComplete();
}
@@ -155,30 +156,26 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
given(this.redisOperations.expire(anyString(), any()))
.willReturn(Mono.just(true));
StepVerifier
.create(this.repository.createSession().doOnNext(this.repository::save))
.consumeNextWith((session) -> {
verify(this.redisOperations).opsForHash();
verify(this.hashOperations).putAll(anyString(), this.delta.capture());
verify(this.redisOperations).expire(anyString(), any());
verifyZeroInteractions(this.redisOperations);
verifyZeroInteractions(this.hashOperations);
RedisSession newSession = this.repository.new RedisSession();
StepVerifier.create(this.repository.save(newSession)).verifyComplete();
Map<String, Object> delta = this.delta.getAllValues().get(0);
assertThat(delta.size()).isEqualTo(3);
assertThat(delta.get(
ReactiveRedisOperationsSessionRepository.CREATION_TIME_KEY))
.isEqualTo(session.getCreationTime().toEpochMilli());
assertThat(delta.get(
ReactiveRedisOperationsSessionRepository.MAX_INACTIVE_INTERVAL_KEY))
.isEqualTo((int) Duration.ofSeconds(
MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS)
.getSeconds());
assertThat(delta.get(
ReactiveRedisOperationsSessionRepository.LAST_ACCESSED_TIME_KEY))
.isEqualTo(
session.getLastAccessedTime().toEpochMilli());
}).verifyComplete();
verify(this.redisOperations).opsForHash();
verify(this.hashOperations).putAll(anyString(), this.delta.capture());
verify(this.redisOperations).expire(anyString(), any());
verifyZeroInteractions(this.redisOperations);
verifyZeroInteractions(this.hashOperations);
Map<String, Object> delta = this.delta.getAllValues().get(0);
assertThat(delta.size()).isEqualTo(3);
assertThat(delta.get(ReactiveRedisOperationsSessionRepository.CREATION_TIME_KEY))
.isEqualTo(newSession.getCreationTime().toEpochMilli());
assertThat(delta
.get(ReactiveRedisOperationsSessionRepository.MAX_INACTIVE_INTERVAL_KEY))
.isEqualTo(
(int) newSession.getMaxInactiveInterval().getSeconds());
assertThat(delta
.get(ReactiveRedisOperationsSessionRepository.LAST_ACCESSED_TIME_KEY))
.isEqualTo(newSession.getLastAccessedTime().toEpochMilli());
}
@Test
@@ -207,7 +204,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
RedisSession session = this.repository.new RedisSession(this.cached);
session.setLastAccessedTime(Instant.ofEpochMilli(12345678L));
Mono.just(session).subscribe(this.repository::save);
StepVerifier.create(this.repository.save(session)).verifyComplete();
verify(this.redisOperations).hasKey(anyString());
verify(this.redisOperations).opsForHash();
@@ -232,7 +229,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
String attrName = "attrName";
RedisSession session = this.repository.new RedisSession(this.cached);
session.setAttribute(attrName, "attrValue");
Mono.just(session).subscribe(this.repository::save);
StepVerifier.create(this.repository.save(session)).verifyComplete();
verify(this.redisOperations).hasKey(anyString());
verify(this.redisOperations).opsForHash();
@@ -257,7 +254,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
String attrName = "attrName";
RedisSession session = this.repository.new RedisSession(new MapSession());
session.removeAttribute(attrName);
Mono.just(session).subscribe(this.repository::save);
StepVerifier.create(this.repository.save(session)).verifyComplete();
verify(this.redisOperations).hasKey(anyString());
verify(this.redisOperations).opsForHash();
@@ -333,24 +330,25 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
given(this.hashOperations.entries(anyString()))
.willReturn(Flux.fromIterable(map.entrySet()));
StepVerifier.create(this.repository.findById("test")).consumeNextWith((session) -> {
verify(this.redisOperations).opsForHash();
verify(this.hashOperations).entries(anyString());
verifyZeroInteractions(this.redisOperations);
verifyZeroInteractions(this.hashOperations);
StepVerifier.create(this.repository.findById("test"))
.consumeNextWith((session) -> {
verify(this.redisOperations).opsForHash();
verify(this.hashOperations).entries(anyString());
verifyZeroInteractions(this.redisOperations);
verifyZeroInteractions(this.hashOperations);
assertThat(session.getId()).isEqualTo(expected.getId());
assertThat(session.getAttributeNames())
.isEqualTo(expected.getAttributeNames());
assertThat(session.<String>getAttribute(attribute1))
.isEqualTo(expected.getAttribute(attribute1));
assertThat(session.<String>getAttribute(attribute2))
.isEqualTo(expected.getAttribute(attribute2));
assertThat(session.getId()).isEqualTo(expected.getId());
assertThat(session.getAttributeNames())
.isEqualTo(expected.getAttributeNames());
assertThat(session.<String>getAttribute(attribute1))
.isEqualTo(expected.getAttribute(attribute1));
assertThat(session.<String>getAttribute(attribute2))
.isEqualTo(expected.getAttribute(attribute2));
assertThat(session.getCreationTime().truncatedTo(ChronoUnit.MILLIS))
.isEqualTo(expected.getCreationTime()
.truncatedTo(ChronoUnit.MILLIS));
assertThat(session.getMaxInactiveInterval())
.isEqualTo(expected.getMaxInactiveInterval());
.isEqualTo(expected.getMaxInactiveInterval());
assertThat(
session.getLastAccessedTime().truncatedTo(ChronoUnit.MILLIS))
.isEqualTo(expected.getLastAccessedTime()