Optimize HazelcastSessionRepository write operations
This commit introduces several optimizations to write operations in `HazelcastSessionRepository`. - when storing a new session, `IMap#set` is now used instead of `IMap#put` - when updating an existing session, `IMap#executeOnKey` and a dedicated `EntryProcessor` are used To make these two changes possible, internal `HazelcastSession` now adds a flag to determine which of the two mentioned write scenarios to use, and also tracks a delta of session attributes in order to optimize updates. Closes gh-850
This commit is contained in:
@@ -30,6 +30,8 @@ import javax.annotation.PreDestroy;
|
||||
|
||||
import com.hazelcast.core.EntryEvent;
|
||||
import com.hazelcast.core.IMap;
|
||||
import com.hazelcast.map.AbstractEntryProcessor;
|
||||
import com.hazelcast.map.EntryProcessor;
|
||||
import com.hazelcast.map.listener.EntryAddedListener;
|
||||
import com.hazelcast.map.listener.EntryEvictedListener;
|
||||
import com.hazelcast.map.listener.EntryRemovedListener;
|
||||
@@ -210,11 +212,15 @@ public class HazelcastSessionRepository implements
|
||||
this.sessions.remove(session.originalId);
|
||||
session.originalId = session.getId();
|
||||
}
|
||||
if (session.isChanged()) {
|
||||
this.sessions.put(session.getId(), session.getDelegate(),
|
||||
if (session.isNew) {
|
||||
this.sessions.set(session.getId(), session.getDelegate(),
|
||||
session.getMaxInactiveInterval().getSeconds(), TimeUnit.SECONDS);
|
||||
session.markUnchanged();
|
||||
}
|
||||
else if (session.changed) {
|
||||
this.sessions.executeOnKey(session.getId(),
|
||||
new SessionUpdateEntryProcessor(session.getDelegate(), session.delta));
|
||||
}
|
||||
session.clearFlags();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -286,16 +292,22 @@ public class HazelcastSessionRepository implements
|
||||
final class HazelcastSession implements Session {
|
||||
|
||||
private final MapSession delegate;
|
||||
|
||||
private boolean isNew;
|
||||
|
||||
private boolean changed;
|
||||
|
||||
private String originalId;
|
||||
|
||||
private Map<String, Object> delta = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Creates a new instance ensuring to mark all of the new attributes to be
|
||||
* persisted in the next save operation.
|
||||
*/
|
||||
HazelcastSession() {
|
||||
this(new MapSession());
|
||||
this.changed = true;
|
||||
this.isNew = true;
|
||||
flushImmediateIfNecessary();
|
||||
}
|
||||
|
||||
@@ -334,7 +346,7 @@ public class HazelcastSessionRepository implements
|
||||
|
||||
@Override
|
||||
public String changeSessionId() {
|
||||
this.changed = true;
|
||||
this.isNew = true;
|
||||
return this.delegate.changeSessionId();
|
||||
}
|
||||
|
||||
@@ -368,6 +380,7 @@ public class HazelcastSessionRepository implements
|
||||
@Override
|
||||
public void setAttribute(String attributeName, Object attributeValue) {
|
||||
this.delegate.setAttribute(attributeName, attributeValue);
|
||||
this.delta.put(attributeName, attributeValue);
|
||||
this.changed = true;
|
||||
flushImmediateIfNecessary();
|
||||
}
|
||||
@@ -375,22 +388,21 @@ public class HazelcastSessionRepository implements
|
||||
@Override
|
||||
public void removeAttribute(String attributeName) {
|
||||
this.delegate.removeAttribute(attributeName);
|
||||
this.delta.put(attributeName, null);
|
||||
this.changed = true;
|
||||
flushImmediateIfNecessary();
|
||||
}
|
||||
|
||||
boolean isChanged() {
|
||||
return this.changed;
|
||||
}
|
||||
|
||||
void markUnchanged() {
|
||||
this.changed = false;
|
||||
}
|
||||
|
||||
MapSession getDelegate() {
|
||||
return this.delegate;
|
||||
}
|
||||
|
||||
void clearFlags() {
|
||||
this.isNew = false;
|
||||
this.changed = false;
|
||||
this.delta.clear();
|
||||
}
|
||||
|
||||
private void flushImmediateIfNecessary() {
|
||||
if (HazelcastSessionRepository.this.hazelcastFlushMode == HazelcastFlushMode.IMMEDIATE) {
|
||||
HazelcastSessionRepository.this.save(this);
|
||||
@@ -399,4 +411,41 @@ public class HazelcastSessionRepository implements
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Hazelcast {@link EntryProcessor} responsible for handling updates to session.
|
||||
*
|
||||
* @since 2.0.0
|
||||
* @see #save(HazelcastSession)
|
||||
*/
|
||||
private static final class SessionUpdateEntryProcessor
|
||||
extends AbstractEntryProcessor<String, MapSession> {
|
||||
|
||||
private final MapSession session;
|
||||
|
||||
private final Map<String, Object> delta;
|
||||
|
||||
SessionUpdateEntryProcessor(MapSession session, Map<String, Object> delta) {
|
||||
this.session = session;
|
||||
this.delta = delta;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object process(Map.Entry<String, MapSession> entry) {
|
||||
MapSession value = entry.getValue();
|
||||
value.setLastAccessedTime(this.session.getLastAccessedTime());
|
||||
value.setMaxInactiveInterval(this.session.getMaxInactiveInterval());
|
||||
for (final Map.Entry<String, Object> attribute : this.delta.entrySet()) {
|
||||
if (attribute.getValue() != null) {
|
||||
value.setAttribute(attribute.getKey(), attribute.getValue());
|
||||
}
|
||||
else {
|
||||
value.removeAttribute(attribute.getKey());
|
||||
}
|
||||
}
|
||||
entry.setValue(value);
|
||||
return value;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.hazelcast.core.IMap;
|
||||
import com.hazelcast.map.EntryProcessor;
|
||||
import com.hazelcast.query.impl.predicates.EqualPredicate;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
@@ -41,6 +42,7 @@ import org.springframework.session.MapSession;
|
||||
import org.springframework.session.hazelcast.HazelcastSessionRepository.HazelcastSession;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isA;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
@@ -107,7 +109,7 @@ public class HazelcastSessionRepositoryTests {
|
||||
verifyZeroInteractions(this.sessions);
|
||||
|
||||
this.repository.save(session);
|
||||
verify(this.sessions, times(1)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@@ -116,7 +118,7 @@ public class HazelcastSessionRepositoryTests {
|
||||
this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
|
||||
|
||||
HazelcastSession session = this.repository.createSession();
|
||||
verify(this.sessions, times(1)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@@ -127,7 +129,7 @@ public class HazelcastSessionRepositoryTests {
|
||||
verifyZeroInteractions(this.sessions);
|
||||
|
||||
this.repository.save(session);
|
||||
verify(this.sessions, times(1)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@@ -137,8 +139,10 @@ public class HazelcastSessionRepositoryTests {
|
||||
|
||||
HazelcastSession session = this.repository.createSession();
|
||||
session.setAttribute("testName", "testValue");
|
||||
verify(this.sessions, times(2)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
verify(this.sessions, times(1)).executeOnKey(eq(session.getId()),
|
||||
any(EntryProcessor.class));
|
||||
|
||||
this.repository.save(session);
|
||||
verifyZeroInteractions(this.sessions);
|
||||
@@ -151,7 +155,7 @@ public class HazelcastSessionRepositoryTests {
|
||||
verifyZeroInteractions(this.sessions);
|
||||
|
||||
this.repository.save(session);
|
||||
verify(this.sessions, times(1)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@@ -161,8 +165,10 @@ public class HazelcastSessionRepositoryTests {
|
||||
|
||||
HazelcastSession session = this.repository.createSession();
|
||||
session.removeAttribute("testName");
|
||||
verify(this.sessions, times(2)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
verify(this.sessions, times(1)).executeOnKey(eq(session.getId()),
|
||||
any(EntryProcessor.class));
|
||||
|
||||
this.repository.save(session);
|
||||
verifyZeroInteractions(this.sessions);
|
||||
@@ -175,7 +181,7 @@ public class HazelcastSessionRepositoryTests {
|
||||
verifyZeroInteractions(this.sessions);
|
||||
|
||||
this.repository.save(session);
|
||||
verify(this.sessions, times(1)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@@ -185,8 +191,10 @@ public class HazelcastSessionRepositoryTests {
|
||||
|
||||
HazelcastSession session = this.repository.createSession();
|
||||
session.setLastAccessedTime(Instant.now());
|
||||
verify(this.sessions, times(2)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
verify(this.sessions, times(1)).executeOnKey(eq(session.getId()),
|
||||
any(EntryProcessor.class));
|
||||
|
||||
this.repository.save(session);
|
||||
verifyZeroInteractions(this.sessions);
|
||||
@@ -199,7 +207,7 @@ public class HazelcastSessionRepositoryTests {
|
||||
verifyZeroInteractions(this.sessions);
|
||||
|
||||
this.repository.save(session);
|
||||
verify(this.sessions, times(1)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@@ -209,8 +217,10 @@ public class HazelcastSessionRepositoryTests {
|
||||
|
||||
HazelcastSession session = this.repository.createSession();
|
||||
session.setMaxInactiveInterval(Duration.ofSeconds(1));
|
||||
verify(this.sessions, times(2)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
verify(this.sessions, times(1)).executeOnKey(eq(session.getId()),
|
||||
any(EntryProcessor.class));
|
||||
|
||||
this.repository.save(session);
|
||||
verifyZeroInteractions(this.sessions);
|
||||
@@ -220,7 +230,7 @@ public class HazelcastSessionRepositoryTests {
|
||||
public void saveUnchangedFlushModeOnSave() {
|
||||
HazelcastSession session = this.repository.createSession();
|
||||
this.repository.save(session);
|
||||
verify(this.sessions, times(1)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
|
||||
this.repository.save(session);
|
||||
@@ -232,7 +242,7 @@ public class HazelcastSessionRepositoryTests {
|
||||
this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
|
||||
|
||||
HazelcastSession session = this.repository.createSession();
|
||||
verify(this.sessions, times(1)).put(eq(session.getId()), eq(session.getDelegate()),
|
||||
verify(this.sessions, times(1)).set(eq(session.getId()), eq(session.getDelegate()),
|
||||
isA(Long.class), eq(TimeUnit.SECONDS));
|
||||
|
||||
this.repository.save(session);
|
||||
|
||||
Reference in New Issue
Block a user