Introduce more reliable coordination between a GemFire client/server during integration tests.
Fixes gh-672
This commit is contained in:
@@ -58,7 +58,7 @@ task runGemFireServer() {
|
||||
//"-Dgemfire.log-level=config",
|
||||
"-Dspring-session-data-gemfire.cache.server.port=$port",
|
||||
"-Dspring-session-data-gemfire.log.level="
|
||||
+ System.getProperty('spring-session-data-gemfire.log.level', 'config'),
|
||||
+ System.getProperty('spring-session-data-gemfire.log.level', 'warning'),
|
||||
'sample.server.GemFireServer'
|
||||
]
|
||||
|
||||
|
||||
@@ -24,10 +24,16 @@ import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.servlet.http.HttpSession;
|
||||
|
||||
import org.apache.geode.cache.Region;
|
||||
import org.apache.geode.cache.client.ClientCache;
|
||||
import org.apache.geode.cache.client.Pool;
|
||||
import org.apache.geode.cache.client.PoolManager;
|
||||
import org.apache.geode.cache.client.internal.PoolImpl;
|
||||
import org.apache.geode.management.membership.ClientMembership;
|
||||
import org.apache.geode.management.membership.ClientMembershipEvent;
|
||||
import org.apache.geode.management.membership.ClientMembershipListenerAdapter;
|
||||
@@ -40,6 +46,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
|
||||
import org.springframework.data.gemfire.client.ClientCacheFactoryBean;
|
||||
import org.springframework.data.gemfire.config.xml.GemfireConstants;
|
||||
import org.springframework.data.gemfire.support.ConnectionEndpoint;
|
||||
import org.springframework.data.gemfire.util.CollectionUtils;
|
||||
import org.springframework.session.data.gemfire.config.annotation.web.http.EnableGemFireHttpSession;
|
||||
@@ -78,6 +85,7 @@ public class Application {
|
||||
static final CountDownLatch LATCH = new CountDownLatch(1);
|
||||
|
||||
static final String DEFAULT_GEMFIRE_LOG_LEVEL = "warning";
|
||||
static final String GEMFIRE_DEFAULT_POOL_NAME = "DEFAULT";
|
||||
static final String INDEX_TEMPLATE_VIEW_NAME = "index";
|
||||
static final String PING_RESPONSE = "PONG";
|
||||
static final String REQUEST_COUNT_ATTRIBUTE_NAME = "requestCount";
|
||||
@@ -151,21 +159,20 @@ public class Application {
|
||||
}
|
||||
|
||||
@Bean
|
||||
BeanPostProcessor gemfireCacheServerReadyBeanPostProcessor(
|
||||
BeanPostProcessor gemfireClientServerReadyBeanPostProcessor(
|
||||
@Value("${spring-session-data-gemfire.cache.server.host:localhost}") final String host,
|
||||
@Value("${spring-session-data-gemfire.cache.server.port:12480}") final int port) { // <5>
|
||||
|
||||
return new BeanPostProcessor() {
|
||||
|
||||
private final AtomicBoolean checkGemFireServerIsRunning = new AtomicBoolean(true);
|
||||
private final AtomicReference<Pool> defaultPool = new AtomicReference<Pool>(null);
|
||||
|
||||
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (GemFireHttpSessionConfiguration.DEFAULT_SPRING_SESSION_GEMFIRE_REGION_NAME.equals(beanName)
|
||||
|| bean instanceof Region) {
|
||||
|
||||
if (shouldCheckWhetherGemFireServerIsRunning(bean, beanName)) {
|
||||
try {
|
||||
boolean didNotTimeout = LATCH.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
|
||||
Assert.state(didNotTimeout, String.format(
|
||||
"GemFire Cache Server failed to start on host [%s] and port [%d]", host, port));
|
||||
validateCacheClientNotified();
|
||||
validateCacheClientSubscriptionQueueConnectionEstablished();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
@@ -175,6 +182,69 @@ public class Application {
|
||||
return bean;
|
||||
}
|
||||
|
||||
private boolean shouldCheckWhetherGemFireServerIsRunning(Object bean, String beanName) {
|
||||
return (isGemFireRegion(bean, beanName)
|
||||
? checkGemFireServerIsRunning.compareAndSet(true, false)
|
||||
: whenGemFireCache(bean, beanName));
|
||||
}
|
||||
|
||||
private boolean isGemFireRegion(Object bean, String beanName) {
|
||||
return (GemFireHttpSessionConfiguration.DEFAULT_SPRING_SESSION_GEMFIRE_REGION_NAME.equals(beanName)
|
||||
|| bean instanceof Region);
|
||||
}
|
||||
|
||||
private boolean whenGemFireCache(Object bean, String beanName) {
|
||||
if (bean instanceof ClientCache) {
|
||||
defaultPool.compareAndSet(null, ((ClientCache) bean).getDefaultPool());
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private void validateCacheClientNotified() throws InterruptedException {
|
||||
boolean didNotTimeout = LATCH.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
|
||||
Assert.state(didNotTimeout, String.format(
|
||||
"GemFire Cache Server failed to start on host [%s] and port [%d]", host, port));
|
||||
}
|
||||
|
||||
@SuppressWarnings("all")
|
||||
private void validateCacheClientSubscriptionQueueConnectionEstablished() throws InterruptedException {
|
||||
boolean cacheClientSubscriptionQueueConnectionEstablished = false;
|
||||
|
||||
Pool pool = defaultIfNull(this.defaultPool.get(), GemfireConstants.DEFAULT_GEMFIRE_POOL_NAME,
|
||||
GEMFIRE_DEFAULT_POOL_NAME);
|
||||
|
||||
if (pool instanceof PoolImpl) {
|
||||
long timeout = (System.currentTimeMillis() + DEFAULT_TIMEOUT);
|
||||
|
||||
while (System.currentTimeMillis() < timeout
|
||||
&& !((PoolImpl) pool).isPrimaryUpdaterAlive()) {
|
||||
|
||||
synchronized (pool) {
|
||||
TimeUnit.MILLISECONDS.timedWait(pool, 500L);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
cacheClientSubscriptionQueueConnectionEstablished |=
|
||||
((PoolImpl) pool).isPrimaryUpdaterAlive();
|
||||
}
|
||||
|
||||
Assert.state(cacheClientSubscriptionQueueConnectionEstablished, String.format(
|
||||
"Cache client subscription queue connection not established; GemFire Pool was [%s];"
|
||||
+ " GemFire Pool configuration was [locators = %s, servers = %s]",
|
||||
pool, pool.getLocators(), pool.getServers()));
|
||||
}
|
||||
|
||||
private Pool defaultIfNull(Pool pool, String... poolNames) {
|
||||
for (String poolName : poolNames) {
|
||||
pool = (pool != null ? pool : PoolManager.find(poolName));
|
||||
}
|
||||
|
||||
return pool;
|
||||
}
|
||||
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
return bean;
|
||||
}
|
||||
|
||||
@@ -44,12 +44,14 @@ task runGemFireServer(dependsOn: availablePort) {
|
||||
def err = new StringBuilder()
|
||||
|
||||
String classpath = sourceSets.main.runtimeClasspath.collect { it }.join(File.pathSeparator)
|
||||
String gemfireLogLevel = System.getProperty('sample.httpsession.gemfire.log-level', 'config')
|
||||
String gemfireLogLevel = System.getProperty('spring.session.data.gemfire.log-level', 'warning')
|
||||
|
||||
String[] commandLine = [
|
||||
'java', '-server', '-ea', '-classpath', classpath,
|
||||
//"-Dgemfire.log-file=gemfire-server.log",
|
||||
//"-Dgemfire.log-level=config",
|
||||
"-Dspring.session.data.gemfire.log-level=" + gemfireLogLevel,
|
||||
"-Dspring.session.data.gemfire.port=$port",
|
||||
"-Dsample.httpsession.gemfire.log-level=" + gemfireLogLevel,
|
||||
'sample.ServerConfig'
|
||||
]
|
||||
|
||||
|
||||
@@ -20,8 +20,14 @@ import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.geode.cache.Region;
|
||||
import org.apache.geode.cache.client.ClientCache;
|
||||
import org.apache.geode.cache.client.Pool;
|
||||
import org.apache.geode.cache.client.PoolManager;
|
||||
import org.apache.geode.cache.client.internal.PoolImpl;
|
||||
import org.apache.geode.management.membership.ClientMembership;
|
||||
import org.apache.geode.management.membership.ClientMembershipEvent;
|
||||
import org.apache.geode.management.membership.ClientMembershipListenerAdapter;
|
||||
@@ -32,8 +38,10 @@ import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
|
||||
import org.springframework.data.gemfire.client.ClientCacheFactoryBean;
|
||||
import org.springframework.data.gemfire.config.xml.GemfireConstants;
|
||||
import org.springframework.data.gemfire.support.ConnectionEndpoint;
|
||||
import org.springframework.session.data.gemfire.config.annotation.web.http.EnableGemFireHttpSession;
|
||||
import org.springframework.session.data.gemfire.config.annotation.web.http.GemFireHttpSessionConfiguration;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
// tag::class[]
|
||||
@@ -44,7 +52,8 @@ public class ClientConfig {
|
||||
|
||||
static final CountDownLatch LATCH = new CountDownLatch(1);
|
||||
|
||||
static final String DEFAULT_GEMFIRE_LOG_LEVEL = "config";
|
||||
static final String DEFAULT_GEMFIRE_LOG_LEVEL = "warning";
|
||||
static final String GEMFIRE_DEFAULT_POOL_NAME = "DEFAULT";
|
||||
static final String PROXY_HOST = "dummy.example.com";
|
||||
static final String PROXY_PORT = "3128";
|
||||
|
||||
@@ -62,11 +71,11 @@ public class ClientConfig {
|
||||
}
|
||||
|
||||
String applicationName() {
|
||||
return "samples:httpsession-gemfire-clientserver:".concat(getClass().getSimpleName());
|
||||
return "spring-session-data-gemfire-clientserver-javaconfig-sample:".concat(getClass().getSimpleName());
|
||||
}
|
||||
|
||||
String logLevel() {
|
||||
return System.getProperty("sample.httpsession.gemfire.log-level", DEFAULT_GEMFIRE_LOG_LEVEL);
|
||||
return System.getProperty("spring.session.data.gemfire.log-level", DEFAULT_GEMFIRE_LOG_LEVEL);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@@ -112,19 +121,20 @@ public class ClientConfig {
|
||||
}
|
||||
|
||||
@Bean
|
||||
BeanPostProcessor gemfireCacheServerReadyBeanPostProcessor(
|
||||
@Value("${spring.session.data.gemfire.host:" + ServerConfig.SERVER_HOST + "}") final String host,
|
||||
@Value("${spring.session.data.gemfire.port:" + ServerConfig.SERVER_PORT + "}") final int port) { // <5>
|
||||
BeanPostProcessor gemfireClientServerReadyBeanPostProcessor(
|
||||
@Value("${spring-session-data-gemfire.cache.server.host:localhost}") final String host,
|
||||
@Value("${spring-session-data-gemfire.cache.server.port:12480}") final int port) { // <5>
|
||||
|
||||
return new BeanPostProcessor() {
|
||||
|
||||
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (bean instanceof Region) {
|
||||
try {
|
||||
boolean didNotTimeout = LATCH.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
private final AtomicBoolean checkGemFireServerIsRunning = new AtomicBoolean(true);
|
||||
private final AtomicReference<Pool> defaultPool = new AtomicReference<Pool>(null);
|
||||
|
||||
Assert.state(didNotTimeout, String.format(
|
||||
"GemFire Cache Server failed to start on host [%s] and port [%d]", host, port));
|
||||
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (shouldCheckWhetherGemFireServerIsRunning(bean, beanName)) {
|
||||
try {
|
||||
validateCacheClientNotified();
|
||||
validateCacheClientSubscriptionQueueConnectionEstablished();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
@@ -134,6 +144,69 @@ public class ClientConfig {
|
||||
return bean;
|
||||
}
|
||||
|
||||
private boolean shouldCheckWhetherGemFireServerIsRunning(Object bean, String beanName) {
|
||||
return (isGemFireRegion(bean, beanName)
|
||||
? checkGemFireServerIsRunning.compareAndSet(true, false)
|
||||
: whenGemFireCache(bean, beanName));
|
||||
}
|
||||
|
||||
private boolean isGemFireRegion(Object bean, String beanName) {
|
||||
return (GemFireHttpSessionConfiguration.DEFAULT_SPRING_SESSION_GEMFIRE_REGION_NAME.equals(beanName)
|
||||
|| bean instanceof Region);
|
||||
}
|
||||
|
||||
private boolean whenGemFireCache(Object bean, String beanName) {
|
||||
if (bean instanceof ClientCache) {
|
||||
defaultPool.compareAndSet(null, ((ClientCache) bean).getDefaultPool());
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private void validateCacheClientNotified() throws InterruptedException {
|
||||
boolean didNotTimeout = LATCH.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
|
||||
Assert.state(didNotTimeout, String.format(
|
||||
"GemFire Cache Server failed to start on host [%s] and port [%d]", host, port));
|
||||
}
|
||||
|
||||
@SuppressWarnings("all")
|
||||
private void validateCacheClientSubscriptionQueueConnectionEstablished() throws InterruptedException {
|
||||
boolean cacheClientSubscriptionQueueConnectionEstablished = false;
|
||||
|
||||
Pool pool = defaultIfNull(this.defaultPool.get(), GemfireConstants.DEFAULT_GEMFIRE_POOL_NAME,
|
||||
GEMFIRE_DEFAULT_POOL_NAME);
|
||||
|
||||
if (pool instanceof PoolImpl) {
|
||||
long timeout = (System.currentTimeMillis() + DEFAULT_TIMEOUT);
|
||||
|
||||
while (System.currentTimeMillis() < timeout
|
||||
&& !((PoolImpl) pool).isPrimaryUpdaterAlive()) {
|
||||
|
||||
synchronized (pool) {
|
||||
TimeUnit.MILLISECONDS.timedWait(pool, 500L);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
cacheClientSubscriptionQueueConnectionEstablished |=
|
||||
((PoolImpl) pool).isPrimaryUpdaterAlive();
|
||||
}
|
||||
|
||||
Assert.state(cacheClientSubscriptionQueueConnectionEstablished, String.format(
|
||||
"Cache client subscription queue connection not established; GemFire Pool was [%s];"
|
||||
+ " GemFire Pool configuration was [locators = %s, servers = %s]",
|
||||
pool, pool.getLocators(), pool.getServers()));
|
||||
}
|
||||
|
||||
private Pool defaultIfNull(Pool pool, String... poolNames) {
|
||||
for (String poolName : poolNames) {
|
||||
pool = (pool != null ? pool : PoolManager.find(poolName));
|
||||
}
|
||||
|
||||
return pool;
|
||||
}
|
||||
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
return bean;
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ public class ServerConfig {
|
||||
|
||||
static final int SERVER_PORT = 12480;
|
||||
|
||||
static final String DEFAULT_GEMFIRE_LOG_LEVEL = "config";
|
||||
static final String DEFAULT_GEMFIRE_LOG_LEVEL = "warning";
|
||||
static final String SERVER_HOST = "localhost";
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
|
||||
@@ -43,12 +43,14 @@ task runGemFireServer(dependsOn: availablePort) {
|
||||
def err = new StringBuilder()
|
||||
|
||||
String classpath = sourceSets.main.runtimeClasspath.collect { it }.join(File.pathSeparator)
|
||||
String gemfireLogLevel = System.getProperty('sample.httpsession.gemfire.log-level', 'warning')
|
||||
String gemfireLogLevel = System.getProperty('spring.session.data.gemfire.log-level', 'warning')
|
||||
|
||||
String[] commandLine = [
|
||||
'java', '-server', '-ea', '-classpath', classpath,
|
||||
//"-Dgemfire.log-file=gemfire-server.log",
|
||||
//"-Dgemfire.log-level=config",
|
||||
"-Dspring.session.data.gemfire.log-level=" + gemfireLogLevel,
|
||||
"-Dspring.session.data.gemfire.port=$port",
|
||||
"-Dsample.httpsession.gemfire.log-level=" + gemfireLogLevel,
|
||||
'sample.Application'
|
||||
]
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ import org.springframework.context.annotation.ImportResource;
|
||||
@ImportResource("META-INF/spring/session-server.xml") // <2>
|
||||
public class Application {
|
||||
|
||||
public static void main(final String[] args) {
|
||||
public static void main(String[] args) {
|
||||
new AnnotationConfigApplicationContext(Application.class).registerShutdownHook();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package sample;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.geode.cache.Region;
|
||||
import org.apache.geode.management.membership.ClientMembership;
|
||||
import org.apache.geode.management.membership.ClientMembershipEvent;
|
||||
import org.apache.geode.management.membership.ClientMembershipListenerAdapter;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
public class GemFireCacheServerReadyBeanPostProcessor implements BeanPostProcessor {
|
||||
|
||||
static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
|
||||
|
||||
static final CountDownLatch LATCH = new CountDownLatch(1);
|
||||
|
||||
@Value("${spring.session.data.gemfire.port:${application.gemfire.client-server.port}}")
|
||||
int port;
|
||||
|
||||
@Value("${application.gemfire.client-server.host:localhost}")
|
||||
String host;
|
||||
|
||||
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
|
||||
if ("gemfireCache".equals(beanName)) {
|
||||
ClientMembership.registerClientMembershipListener(
|
||||
new ClientMembershipListenerAdapter() {
|
||||
@Override
|
||||
public void memberJoined(ClientMembershipEvent event) {
|
||||
LATCH.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
else if (bean instanceof Region) {
|
||||
try {
|
||||
boolean didNotTimeout = LATCH.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
|
||||
Assert.state(didNotTimeout, String.format(
|
||||
"GemFire Cache Server failed to start on host [%s] and port [%d]", this.host, this.port));
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
return bean;
|
||||
}
|
||||
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
return bean;
|
||||
}
|
||||
// tag::end[]
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package sample;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.geode.cache.Region;
|
||||
import org.apache.geode.cache.client.Pool;
|
||||
import org.apache.geode.cache.client.PoolManager;
|
||||
import org.apache.geode.cache.client.internal.PoolImpl;
|
||||
import org.apache.geode.management.membership.ClientMembership;
|
||||
import org.apache.geode.management.membership.ClientMembershipEvent;
|
||||
import org.apache.geode.management.membership.ClientMembershipListenerAdapter;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.data.gemfire.config.xml.GemfireConstants;
|
||||
import org.springframework.session.data.gemfire.config.annotation.web.http.GemFireHttpSessionConfiguration;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
public class GemFireClientServerReadyBeanPostProcessor implements BeanPostProcessor {
|
||||
|
||||
private static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
|
||||
|
||||
private static final CountDownLatch LATCH = new CountDownLatch(1);
|
||||
|
||||
private static final String GEMFIRE_DEFAULT_POOL_NAME = "DEFAULT";
|
||||
|
||||
static {
|
||||
ClientMembership.registerClientMembershipListener(
|
||||
new ClientMembershipListenerAdapter() {
|
||||
@Override
|
||||
public void memberJoined(ClientMembershipEvent event) {
|
||||
LATCH.countDown();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Value("${spring.session.data.gemfire.port:${application.gemfire.client-server.port}}")
|
||||
private int port;
|
||||
|
||||
@Value("${application.gemfire.client-server.host:localhost}")
|
||||
private String host;
|
||||
|
||||
private final AtomicBoolean checkGemFireServerIsRunning = new AtomicBoolean(true);
|
||||
private final AtomicReference<Pool> gemfirePool = new AtomicReference<Pool>(null);
|
||||
|
||||
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (shouldCheckWhetherGemFireServerIsRunning(bean, beanName)) {
|
||||
try {
|
||||
validateCacheClientNotified();
|
||||
validateCacheClientSubscriptionQueueConnectionEstablished();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
return bean;
|
||||
}
|
||||
|
||||
private boolean shouldCheckWhetherGemFireServerIsRunning(Object bean, String beanName) {
|
||||
return (isGemFireRegion(bean, beanName)
|
||||
? this.checkGemFireServerIsRunning.compareAndSet(true, false)
|
||||
: whenGemFirePool(bean, beanName));
|
||||
}
|
||||
|
||||
private boolean isGemFireRegion(Object bean, String beanName) {
|
||||
return (GemFireHttpSessionConfiguration.DEFAULT_SPRING_SESSION_GEMFIRE_REGION_NAME.equals(beanName)
|
||||
|| bean instanceof Region);
|
||||
}
|
||||
|
||||
private boolean whenGemFirePool(Object bean, String beanName) {
|
||||
if (bean instanceof Pool) {
|
||||
this.gemfirePool.compareAndSet(null, (Pool) bean);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private void validateCacheClientNotified() throws InterruptedException {
|
||||
boolean didNotTimeout = LATCH.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
|
||||
Assert.state(didNotTimeout, String.format(
|
||||
"GemFire Cache Server failed to start on host [%s] and port [%d]", this.host, this.port));
|
||||
}
|
||||
|
||||
@SuppressWarnings("all")
|
||||
private void validateCacheClientSubscriptionQueueConnectionEstablished() throws InterruptedException {
|
||||
boolean cacheClientSubscriptionQueueConnectionEstablished = false;
|
||||
|
||||
Pool pool = defaultIfNull(this.gemfirePool.get(),
|
||||
GemfireConstants.DEFAULT_GEMFIRE_POOL_NAME, GEMFIRE_DEFAULT_POOL_NAME);
|
||||
|
||||
if (pool instanceof PoolImpl) {
|
||||
long timeout = (System.currentTimeMillis() + DEFAULT_TIMEOUT);
|
||||
|
||||
while (System.currentTimeMillis() < timeout
|
||||
&& !((PoolImpl) pool).isPrimaryUpdaterAlive()) {
|
||||
|
||||
synchronized (pool) {
|
||||
TimeUnit.MILLISECONDS.timedWait(pool, 500L);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
cacheClientSubscriptionQueueConnectionEstablished |=
|
||||
((PoolImpl) pool).isPrimaryUpdaterAlive();
|
||||
}
|
||||
|
||||
Assert.state(cacheClientSubscriptionQueueConnectionEstablished, String.format(
|
||||
"Cache client subscription queue connection not established; GemFire Pool was [%s];"
|
||||
+ " GemFire Pool configuration was [locators = %s, servers = %s]",
|
||||
pool, pool.getLocators(), pool.getServers()));
|
||||
}
|
||||
|
||||
private Pool defaultIfNull(Pool pool, String... poolNames) {
|
||||
for (String poolName : poolNames) {
|
||||
pool = (pool != null ? pool : PoolManager.find(poolName));
|
||||
}
|
||||
|
||||
return pool;
|
||||
}
|
||||
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
return bean;
|
||||
}
|
||||
}
|
||||
// tag::end[]
|
||||
@@ -23,9 +23,12 @@
|
||||
<util:properties id="gemfireProperties">
|
||||
<prop key="name">GemFireClientServerHttpSessionXmlSample</prop>
|
||||
<prop key="mcast-port">0</prop>
|
||||
<prop key="log-level">${sample.httpsession.gemfire.log-level:warning}</prop>
|
||||
<!--<prop key="log-file">gemfire-server.log</prop>-->
|
||||
<prop key="log-level">${spring.session.data.gemfire.log-level:warning}</prop>
|
||||
<!--
|
||||
<prop key="jmx-manager">true</prop>
|
||||
<prop key="jmx-manager-start">true</prop>
|
||||
-->
|
||||
</util:properties>
|
||||
|
||||
<!--4-->
|
||||
|
||||
@@ -20,11 +20,12 @@
|
||||
<context:property-placeholder location="classpath:META-INF/spring/application.properties"/>
|
||||
|
||||
<!--3-->
|
||||
<bean class="sample.GemFireCacheServerReadyBeanPostProcessor"/>
|
||||
<bean class="sample.GemFireClientServerReadyBeanPostProcessor"/>
|
||||
|
||||
<!--4-->
|
||||
<util:properties id="gemfireProperties">
|
||||
<prop key="log-level">${sample.httpsession.gemfire.log-level:warning}</prop>
|
||||
<!--<prop key="log-file">gemfire-client.log</prop>-->
|
||||
<prop key="log-level">${spring.session.data.gemfire.log-level:warning}</prop>
|
||||
</util:properties>
|
||||
|
||||
<!--5-->
|
||||
|
||||
Reference in New Issue
Block a user