Verify session existence before update in ReactiveRedisOperationsSessionRepository
Currently, ReactiveRedisOperationsSessionRepository#save does not ensure session's existence before executing update. This can result in an invalid session record in Redis, since write use only delta, and in turn to error while retrieving the invalid session record. This commit adds check for session existence if session is being updated. Closes gh-1111
This commit is contained in:
@@ -16,6 +16,8 @@
|
|||||||
|
|
||||||
package org.springframework.session.data.redis;
|
package org.springframework.session.data.redis;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
@@ -191,6 +193,28 @@ public class ReactiveRedisOperationsSessionRepositoryITests extends AbstractRedi
|
|||||||
assertThat(this.repository.findById(originalId).block()).isNull();
|
assertThat(this.repository.findById(originalId).block()).isNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// gh-1111
|
||||||
|
@Test
|
||||||
|
public void changeSessionSaveOldSessionInstance() {
|
||||||
|
ReactiveRedisOperationsSessionRepository.RedisSession toSave = this.repository
|
||||||
|
.createSession().block();
|
||||||
|
String sessionId = toSave.getId();
|
||||||
|
|
||||||
|
this.repository.save(toSave).block();
|
||||||
|
|
||||||
|
ReactiveRedisOperationsSessionRepository.RedisSession session = this.repository
|
||||||
|
.findById(sessionId).block();
|
||||||
|
session.changeSessionId();
|
||||||
|
session.setLastAccessedTime(Instant.now());
|
||||||
|
this.repository.save(session).block();
|
||||||
|
|
||||||
|
toSave.setLastAccessedTime(Instant.now());
|
||||||
|
this.repository.save(toSave).block();
|
||||||
|
|
||||||
|
assertThat(this.repository.findById(sessionId).block()).isNull();
|
||||||
|
assertThat(this.repository.findById(session.getId()).block()).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@EnableRedisWebSession
|
@EnableRedisWebSession
|
||||||
static class Config extends BaseConfig {
|
static class Config extends BaseConfig {
|
||||||
|
|||||||
@@ -143,24 +143,39 @@ public class ReactiveRedisOperationsSessionRepository implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> save(RedisSession session) {
|
public Mono<Void> save(RedisSession session) {
|
||||||
return session.saveDelta().and((s) -> {
|
Mono<Void> result = session.saveChangeSessionId().and(session.saveDelta())
|
||||||
if (session.isNew) {
|
.and((s) -> {
|
||||||
session.setNew(false);
|
session.isNew = false;
|
||||||
}
|
s.onComplete();
|
||||||
|
});
|
||||||
s.onComplete();
|
if (session.isNew) {
|
||||||
});
|
return result;
|
||||||
|
}
|
||||||
|
else if (session.hasChangedSessionId()) {
|
||||||
|
String sessionKey = getSessionKey(session.originalSessionId);
|
||||||
|
return this.sessionRedisOperations.hasKey(sessionKey)
|
||||||
|
.flatMap((exists) -> exists ? result : Mono.empty());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
String sessionKey = getSessionKey(session.getId());
|
||||||
|
return this.sessionRedisOperations.hasKey(sessionKey)
|
||||||
|
.flatMap((exists) -> exists ? result : Mono.empty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<RedisSession> findById(String id) {
|
public Mono<RedisSession> findById(String id) {
|
||||||
String sessionKey = getSessionKey(id);
|
String sessionKey = getSessionKey(id);
|
||||||
|
|
||||||
|
// @formatter:off
|
||||||
return this.sessionRedisOperations.opsForHash().entries(sessionKey)
|
return this.sessionRedisOperations.opsForHash().entries(sessionKey)
|
||||||
.collectMap((e) -> e.getKey().toString(), Map.Entry::getValue)
|
.collectMap((e) -> e.getKey().toString(), Map.Entry::getValue)
|
||||||
.filter((map) -> !map.isEmpty()).map(new SessionMapper(id))
|
.filter((map) -> !map.isEmpty())
|
||||||
.filter((session) -> !session.isExpired()).map(RedisSession::new)
|
.map(new SessionMapper(id))
|
||||||
|
.filter((session) -> !session.isExpired())
|
||||||
|
.map(RedisSession::new)
|
||||||
.switchIfEmpty(Mono.defer(() -> deleteById(id).then(Mono.empty())));
|
.switchIfEmpty(Mono.defer(() -> deleteById(id).then(Mono.empty())));
|
||||||
|
// @formatter:on
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -285,12 +300,8 @@ public class ReactiveRedisOperationsSessionRepository implements
|
|||||||
return this.cached.isExpired();
|
return this.cached.isExpired();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setNew(boolean isNew) {
|
private boolean hasChangedSessionId() {
|
||||||
this.isNew = isNew;
|
return !getId().equals(this.originalSessionId);
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isNew() {
|
|
||||||
return this.isNew;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void flushImmediateIfNecessary() {
|
private void flushImmediateIfNecessary() {
|
||||||
@@ -305,38 +316,35 @@ public class ReactiveRedisOperationsSessionRepository implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Mono<Void> saveDelta() {
|
private Mono<Void> saveDelta() {
|
||||||
String sessionId = getId();
|
|
||||||
Mono<Void> changeSessionId = saveChangeSessionId(sessionId);
|
|
||||||
|
|
||||||
if (this.delta.isEmpty()) {
|
if (this.delta.isEmpty()) {
|
||||||
return changeSessionId.and(Mono.empty());
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
String sessionKey = getSessionKey(sessionId);
|
String sessionKey = getSessionKey(getId());
|
||||||
|
|
||||||
Mono<Boolean> update = ReactiveRedisOperationsSessionRepository.this.sessionRedisOperations
|
Mono<Boolean> update = ReactiveRedisOperationsSessionRepository.this.sessionRedisOperations
|
||||||
.opsForHash().putAll(sessionKey, this.delta);
|
.opsForHash().putAll(sessionKey, this.delta);
|
||||||
|
|
||||||
Mono<Boolean> setTtl = ReactiveRedisOperationsSessionRepository.this.sessionRedisOperations
|
Mono<Boolean> setTtl = ReactiveRedisOperationsSessionRepository.this.sessionRedisOperations
|
||||||
.expire(sessionKey, getMaxInactiveInterval());
|
.expire(sessionKey, getMaxInactiveInterval());
|
||||||
|
|
||||||
return changeSessionId.and(update).and(setTtl).and((s) -> {
|
return update.and(setTtl).and((s) -> {
|
||||||
this.delta.clear();
|
this.delta.clear();
|
||||||
s.onComplete();
|
s.onComplete();
|
||||||
}).then();
|
}).then();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Mono<Void> saveChangeSessionId(String sessionId) {
|
private Mono<Void> saveChangeSessionId() {
|
||||||
if (sessionId.equals(this.originalSessionId)) {
|
if (!hasChangedSessionId()) {
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String sessionId = getId();
|
||||||
|
|
||||||
Publisher<Void> replaceSessionId = (s) -> {
|
Publisher<Void> replaceSessionId = (s) -> {
|
||||||
this.originalSessionId = sessionId;
|
this.originalSessionId = sessionId;
|
||||||
s.onComplete();
|
s.onComplete();
|
||||||
};
|
};
|
||||||
|
|
||||||
if (isNew()) {
|
if (this.isNew) {
|
||||||
return Mono.from(replaceSessionId);
|
return Mono.from(replaceSessionId);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|||||||
@@ -183,6 +183,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void saveSessionNothingChanged() {
|
public void saveSessionNothingChanged() {
|
||||||
|
given(this.redisOperations.hasKey(anyString())).willReturn(Mono.just(true));
|
||||||
given(this.redisOperations.expire(anyString(), any()))
|
given(this.redisOperations.expire(anyString(), any()))
|
||||||
.willReturn(Mono.just(true));
|
.willReturn(Mono.just(true));
|
||||||
|
|
||||||
@@ -191,12 +192,14 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
|
|||||||
|
|
||||||
StepVerifier.create(this.repository.save(session)).verifyComplete();
|
StepVerifier.create(this.repository.save(session)).verifyComplete();
|
||||||
|
|
||||||
|
verify(this.redisOperations).hasKey(anyString());
|
||||||
verifyZeroInteractions(this.redisOperations);
|
verifyZeroInteractions(this.redisOperations);
|
||||||
verifyZeroInteractions(this.hashOperations);
|
verifyZeroInteractions(this.hashOperations);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void saveLastAccessChanged() {
|
public void saveLastAccessChanged() {
|
||||||
|
given(this.redisOperations.hasKey(anyString())).willReturn(Mono.just(true));
|
||||||
given(this.redisOperations.opsForHash()).willReturn(this.hashOperations);
|
given(this.redisOperations.opsForHash()).willReturn(this.hashOperations);
|
||||||
given(this.hashOperations.putAll(anyString(), any())).willReturn(Mono.just(true));
|
given(this.hashOperations.putAll(anyString(), any())).willReturn(Mono.just(true));
|
||||||
given(this.redisOperations.expire(anyString(), any()))
|
given(this.redisOperations.expire(anyString(), any()))
|
||||||
@@ -206,6 +209,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
|
|||||||
session.setLastAccessedTime(Instant.ofEpochMilli(12345678L));
|
session.setLastAccessedTime(Instant.ofEpochMilli(12345678L));
|
||||||
Mono.just(session).subscribe(this.repository::save);
|
Mono.just(session).subscribe(this.repository::save);
|
||||||
|
|
||||||
|
verify(this.redisOperations).hasKey(anyString());
|
||||||
verify(this.redisOperations).opsForHash();
|
verify(this.redisOperations).opsForHash();
|
||||||
verify(this.hashOperations).putAll(anyString(), this.delta.capture());
|
verify(this.hashOperations).putAll(anyString(), this.delta.capture());
|
||||||
verify(this.redisOperations).expire(anyString(), any());
|
verify(this.redisOperations).expire(anyString(), any());
|
||||||
@@ -219,6 +223,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void saveSetAttribute() {
|
public void saveSetAttribute() {
|
||||||
|
given(this.redisOperations.hasKey(anyString())).willReturn(Mono.just(true));
|
||||||
given(this.redisOperations.opsForHash()).willReturn(this.hashOperations);
|
given(this.redisOperations.opsForHash()).willReturn(this.hashOperations);
|
||||||
given(this.hashOperations.putAll(anyString(), any())).willReturn(Mono.just(true));
|
given(this.hashOperations.putAll(anyString(), any())).willReturn(Mono.just(true));
|
||||||
given(this.redisOperations.expire(anyString(), any()))
|
given(this.redisOperations.expire(anyString(), any()))
|
||||||
@@ -229,6 +234,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
|
|||||||
session.setAttribute(attrName, "attrValue");
|
session.setAttribute(attrName, "attrValue");
|
||||||
Mono.just(session).subscribe(this.repository::save);
|
Mono.just(session).subscribe(this.repository::save);
|
||||||
|
|
||||||
|
verify(this.redisOperations).hasKey(anyString());
|
||||||
verify(this.redisOperations).opsForHash();
|
verify(this.redisOperations).opsForHash();
|
||||||
verify(this.hashOperations).putAll(anyString(), this.delta.capture());
|
verify(this.hashOperations).putAll(anyString(), this.delta.capture());
|
||||||
verify(this.redisOperations).expire(anyString(), any());
|
verify(this.redisOperations).expire(anyString(), any());
|
||||||
@@ -242,6 +248,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void saveRemoveAttribute() {
|
public void saveRemoveAttribute() {
|
||||||
|
given(this.redisOperations.hasKey(anyString())).willReturn(Mono.just(true));
|
||||||
given(this.redisOperations.opsForHash()).willReturn(this.hashOperations);
|
given(this.redisOperations.opsForHash()).willReturn(this.hashOperations);
|
||||||
given(this.hashOperations.putAll(anyString(), any())).willReturn(Mono.just(true));
|
given(this.hashOperations.putAll(anyString(), any())).willReturn(Mono.just(true));
|
||||||
given(this.redisOperations.expire(anyString(), any()))
|
given(this.redisOperations.expire(anyString(), any()))
|
||||||
@@ -252,6 +259,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests {
|
|||||||
session.removeAttribute(attrName);
|
session.removeAttribute(attrName);
|
||||||
Mono.just(session).subscribe(this.repository::save);
|
Mono.just(session).subscribe(this.repository::save);
|
||||||
|
|
||||||
|
verify(this.redisOperations).hasKey(anyString());
|
||||||
verify(this.redisOperations).opsForHash();
|
verify(this.redisOperations).opsForHash();
|
||||||
verify(this.hashOperations).putAll(anyString(), this.delta.capture());
|
verify(this.hashOperations).putAll(anyString(), this.delta.capture());
|
||||||
verify(this.redisOperations).expire(anyString(), any());
|
verify(this.redisOperations).expire(anyString(), any());
|
||||||
|
|||||||
Reference in New Issue
Block a user