diff --git a/spring-session-data-redis/src/integration-test/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepositoryITests.java b/spring-session-data-redis/src/integration-test/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepositoryITests.java index 1439dfc1..5cd1cb34 100644 --- a/spring-session-data-redis/src/integration-test/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepositoryITests.java +++ b/spring-session-data-redis/src/integration-test/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepositoryITests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2018 the original author or authors. + * Copyright 2014-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; +import reactor.core.publisher.Mono; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -68,6 +69,17 @@ public class ReactiveRedisOperationsSessionRepositoryITests extends AbstractRedi assertThat(this.repository.findById(toSave.getId()).block()).isNull(); } + @Test // gh-1399 + public void saveMultipleTimes() { + ReactiveRedisOperationsSessionRepository.RedisSession session = this.repository + .createSession().block(); + session.setAttribute("attribute1", "value1"); + Mono save1 = this.repository.save(session); + session.setAttribute("attribute2", "value2"); + Mono save2 = this.repository.save(session); + Mono.zip(save1, save2).block(); + } + @Test public void putAllOnSingleAttrDoesNotRemoveOld() { ReactiveRedisOperationsSessionRepository.RedisSession toSave = this.repository diff --git a/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepository.java b/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepository.java index b415f12b..be375f50 100644 --- a/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepository.java +++ b/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2018 the original author or authors. + * Copyright 2014-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -143,24 +143,15 @@ public class ReactiveRedisOperationsSessionRepository implements @Override public Mono save(RedisSession session) { - Mono result = session.saveChangeSessionId() - .then(session.saveDelta()) - .and((s) -> { - session.isNew = false; - s.onComplete(); - }); if (session.isNew) { - return result; - } - else { - String sessionKey = getSessionKey( - session.hasChangedSessionId() ? session.originalSessionId - : session.getId()); - return this.sessionRedisOperations.hasKey(sessionKey) - .flatMap((exists) -> exists ? result - : Mono.error(new IllegalStateException( - "Session was invalidated"))); + return session.save(); } + String sessionKey = getSessionKey( + session.hasChangedSessionId() ? session.originalSessionId + : session.getId()); + return this.sessionRedisOperations.hasKey(sessionKey).flatMap((exists) -> exists + ? session.save() + : Mono.error(new IllegalStateException("Session was invalidated"))); } @Override @@ -306,7 +297,7 @@ public class ReactiveRedisOperationsSessionRepository implements private void flushImmediateIfNecessary() { if (ReactiveRedisOperationsSessionRepository.this.redisFlushMode == RedisFlushMode.IMMEDIATE) { - saveDelta(); + save(); } } @@ -315,6 +306,11 @@ public class ReactiveRedisOperationsSessionRepository implements flushImmediateIfNecessary(); } + private Mono save() { + return Mono.defer(() -> saveChangeSessionId().then(saveDelta()) + .doOnSuccess((aVoid) -> this.isNew = false)); + } + private Mono saveDelta() { if (this.delta.isEmpty()) { return Mono.empty(); @@ -322,7 +318,7 @@ public class ReactiveRedisOperationsSessionRepository implements String sessionKey = getSessionKey(getId()); Mono update = ReactiveRedisOperationsSessionRepository.this.sessionRedisOperations - .opsForHash().putAll(sessionKey, this.delta); + .opsForHash().putAll(sessionKey, new HashMap<>(this.delta)); Mono setTtl = ReactiveRedisOperationsSessionRepository.this.sessionRedisOperations .expire(sessionKey, getMaxInactiveInterval()); diff --git a/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepositoryTests.java b/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepositoryTests.java index 223d666a..21f1f937 100644 --- a/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepositoryTests.java +++ b/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/ReactiveRedisOperationsSessionRepositoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2018 the original author or authors. + * Copyright 2014-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -132,9 +132,10 @@ public class ReactiveRedisOperationsSessionRepositoryTests { @Test public void createSessionDefaultMaxInactiveInterval() { - StepVerifier.create(this.repository.createSession()).consumeNextWith( - (session) -> assertThat(session.getMaxInactiveInterval()).isEqualTo(Duration - .ofSeconds(MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS))) + StepVerifier.create(this.repository.createSession()) + .consumeNextWith((session) -> assertThat(session.getMaxInactiveInterval()) + .isEqualTo(Duration.ofSeconds( + MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS))) .verifyComplete(); } @@ -155,30 +156,26 @@ public class ReactiveRedisOperationsSessionRepositoryTests { given(this.redisOperations.expire(anyString(), any())) .willReturn(Mono.just(true)); - StepVerifier - .create(this.repository.createSession().doOnNext(this.repository::save)) - .consumeNextWith((session) -> { - verify(this.redisOperations).opsForHash(); - verify(this.hashOperations).putAll(anyString(), this.delta.capture()); - verify(this.redisOperations).expire(anyString(), any()); - verifyZeroInteractions(this.redisOperations); - verifyZeroInteractions(this.hashOperations); + RedisSession newSession = this.repository.new RedisSession(); + StepVerifier.create(this.repository.save(newSession)).verifyComplete(); - Map delta = this.delta.getAllValues().get(0); - assertThat(delta.size()).isEqualTo(3); - assertThat(delta.get( - ReactiveRedisOperationsSessionRepository.CREATION_TIME_KEY)) - .isEqualTo(session.getCreationTime().toEpochMilli()); - assertThat(delta.get( - ReactiveRedisOperationsSessionRepository.MAX_INACTIVE_INTERVAL_KEY)) - .isEqualTo((int) Duration.ofSeconds( - MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS) - .getSeconds()); - assertThat(delta.get( - ReactiveRedisOperationsSessionRepository.LAST_ACCESSED_TIME_KEY)) - .isEqualTo( - session.getLastAccessedTime().toEpochMilli()); - }).verifyComplete(); + verify(this.redisOperations).opsForHash(); + verify(this.hashOperations).putAll(anyString(), this.delta.capture()); + verify(this.redisOperations).expire(anyString(), any()); + verifyZeroInteractions(this.redisOperations); + verifyZeroInteractions(this.hashOperations); + + Map delta = this.delta.getAllValues().get(0); + assertThat(delta.size()).isEqualTo(3); + assertThat(delta.get(ReactiveRedisOperationsSessionRepository.CREATION_TIME_KEY)) + .isEqualTo(newSession.getCreationTime().toEpochMilli()); + assertThat(delta + .get(ReactiveRedisOperationsSessionRepository.MAX_INACTIVE_INTERVAL_KEY)) + .isEqualTo( + (int) newSession.getMaxInactiveInterval().getSeconds()); + assertThat(delta + .get(ReactiveRedisOperationsSessionRepository.LAST_ACCESSED_TIME_KEY)) + .isEqualTo(newSession.getLastAccessedTime().toEpochMilli()); } @Test @@ -207,7 +204,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests { RedisSession session = this.repository.new RedisSession(this.cached); session.setLastAccessedTime(Instant.ofEpochMilli(12345678L)); - Mono.just(session).subscribe(this.repository::save); + StepVerifier.create(this.repository.save(session)).verifyComplete(); verify(this.redisOperations).hasKey(anyString()); verify(this.redisOperations).opsForHash(); @@ -232,7 +229,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests { String attrName = "attrName"; RedisSession session = this.repository.new RedisSession(this.cached); session.setAttribute(attrName, "attrValue"); - Mono.just(session).subscribe(this.repository::save); + StepVerifier.create(this.repository.save(session)).verifyComplete(); verify(this.redisOperations).hasKey(anyString()); verify(this.redisOperations).opsForHash(); @@ -257,7 +254,7 @@ public class ReactiveRedisOperationsSessionRepositoryTests { String attrName = "attrName"; RedisSession session = this.repository.new RedisSession(new MapSession()); session.removeAttribute(attrName); - Mono.just(session).subscribe(this.repository::save); + StepVerifier.create(this.repository.save(session)).verifyComplete(); verify(this.redisOperations).hasKey(anyString()); verify(this.redisOperations).opsForHash(); @@ -333,24 +330,25 @@ public class ReactiveRedisOperationsSessionRepositoryTests { given(this.hashOperations.entries(anyString())) .willReturn(Flux.fromIterable(map.entrySet())); - StepVerifier.create(this.repository.findById("test")).consumeNextWith((session) -> { - verify(this.redisOperations).opsForHash(); - verify(this.hashOperations).entries(anyString()); - verifyZeroInteractions(this.redisOperations); - verifyZeroInteractions(this.hashOperations); + StepVerifier.create(this.repository.findById("test")) + .consumeNextWith((session) -> { + verify(this.redisOperations).opsForHash(); + verify(this.hashOperations).entries(anyString()); + verifyZeroInteractions(this.redisOperations); + verifyZeroInteractions(this.hashOperations); - assertThat(session.getId()).isEqualTo(expected.getId()); - assertThat(session.getAttributeNames()) - .isEqualTo(expected.getAttributeNames()); - assertThat(session.getAttribute(attribute1)) - .isEqualTo(expected.getAttribute(attribute1)); - assertThat(session.getAttribute(attribute2)) - .isEqualTo(expected.getAttribute(attribute2)); + assertThat(session.getId()).isEqualTo(expected.getId()); + assertThat(session.getAttributeNames()) + .isEqualTo(expected.getAttributeNames()); + assertThat(session.getAttribute(attribute1)) + .isEqualTo(expected.getAttribute(attribute1)); + assertThat(session.getAttribute(attribute2)) + .isEqualTo(expected.getAttribute(attribute2)); assertThat(session.getCreationTime().truncatedTo(ChronoUnit.MILLIS)) .isEqualTo(expected.getCreationTime() .truncatedTo(ChronoUnit.MILLIS)); assertThat(session.getMaxInactiveInterval()) - .isEqualTo(expected.getMaxInactiveInterval()); + .isEqualTo(expected.getMaxInactiveInterval()); assertThat( session.getLastAccessedTime().truncatedTo(ChronoUnit.MILLIS)) .isEqualTo(expected.getLastAccessedTime()