This is an automated email from the ASF dual-hosted git repository. tv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
The following commit(s) were added to refs/heads/master by this push: new deea6ff Rework RemoteCache deea6ff is described below commit deea6ff170a4b171fb76b6ebf11803bf49d9c0c4 Author: Thomas Vandahl <t...@apache.org> AuthorDate: Tue Nov 23 18:10:16 2021 +0100 Rework RemoteCache --- .../auxiliary/lateral/LateralCacheMonitor.java | 28 +- .../remote/AbstractRemoteCacheNoWaitFacade.java | 99 +++---- .../jcs3/auxiliary/remote/RemoteCacheFactory.java | 104 +++----- .../remote/RemoteCacheFailoverRunner.java | 288 +------------------- .../jcs3/auxiliary/remote/RemoteCacheListener.java | 11 +- .../jcs3/auxiliary/remote/RemoteCacheManager.java | 37 ++- .../jcs3/auxiliary/remote/RemoteCacheMonitor.java | 5 +- .../jcs3/auxiliary/remote/RemoteCacheNoWait.java | 2 +- .../auxiliary/remote/RemoteCacheNoWaitFacade.java | 297 ++++++++++++++++++++- 9 files changed, 415 insertions(+), 456 deletions(-) diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java index c6e6e43..a0f9c7c 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/LateralCacheMonitor.java @@ -19,7 +19,6 @@ package org.apache.commons.jcs3.auxiliary.lateral; * under the License. */ -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheMonitor; @@ -41,7 +40,7 @@ public class LateralCacheMonitor extends AbstractAuxiliaryCacheMonitor /** * Map of caches to monitor */ - private final ConcurrentHashMap<String, LateralCacheNoWait<?, ?>> caches; + private final ConcurrentHashMap<String, LateralCacheNoWait<Object, Object>> caches; /** * Reference to the factory @@ -78,9 +77,10 @@ public class LateralCacheMonitor extends AbstractAuxiliaryCacheMonitor * * @param cache the cache */ + @SuppressWarnings("unchecked") // common map for all caches public void addCache(final LateralCacheNoWait<?, ?> cache) { - this.caches.put(cache.getCacheName(), cache); + this.caches.put(cache.getCacheName(), (LateralCacheNoWait<Object, Object>)cache); // if not yet started, go ahead if (this.getState() == Thread.State.NEW) @@ -106,35 +106,27 @@ public class LateralCacheMonitor extends AbstractAuxiliaryCacheMonitor { // Monitor each cache instance one after the other. log.info( "Number of caches to monitor = " + caches.size() ); - //for - for (final Map.Entry<String, LateralCacheNoWait<?, ?>> entry : caches.entrySet()) - { - final String cacheName = entry.getKey(); - @SuppressWarnings("unchecked") // Downcast to match service - final LateralCacheNoWait<Object, Object> c = - (LateralCacheNoWait<Object, Object>) entry.getValue(); + caches.forEach((cacheName, cache) -> { - if (c.getStatus() == CacheStatus.ERROR) + if (cache.getStatus() == CacheStatus.ERROR) { log.info( "Found LateralCacheNoWait in error, " + cacheName ); final ITCPLateralCacheAttributes lca = - (ITCPLateralCacheAttributes) c.getAuxiliaryCacheAttributes(); + (ITCPLateralCacheAttributes) cache.getAuxiliaryCacheAttributes(); // Get service instance final ICacheServiceNonLocal<Object, Object> cacheService = - factory.getCSNLInstance(lca, c.getElementSerializer()); + factory.getCSNLInstance(lca, cache.getElementSerializer()); // If we can't fix them, just skip and re-try in the // next round. - if (cacheService instanceof ZombieCacheServiceNonLocal) + if (!(cacheService instanceof ZombieCacheServiceNonLocal)) { - continue; + cache.fixCache(cacheService); } - - c.fixCache(cacheService); } - } + }); } } diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/AbstractRemoteCacheNoWaitFacade.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/AbstractRemoteCacheNoWaitFacade.java index 42429ac..736cb40 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/AbstractRemoteCacheNoWaitFacade.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/AbstractRemoteCacheNoWaitFacade.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCache; @@ -86,8 +87,7 @@ public abstract class AbstractRemoteCacheNoWaitFacade<K, V> public void update( final ICacheElement<K, V> ce ) throws IOException { - log.debug( "updating through cache facade, noWaits.length = {0}", - () -> noWaits.size() ); + log.debug("updating through cache facade, noWaits.length = {0}", noWaits::size); for (final RemoteCacheNoWait<K, V> nw : noWaits) { @@ -117,10 +117,6 @@ public abstract class AbstractRemoteCacheNoWaitFacade<K, V> // if it is a zombie, then move to the next in the failover list // will need to keep them in order or a count failover( nw ); - // should start a failover thread - // should probably only failover if there is only one in the noWait - // list - // Should start a background thread to restore the original primary if we are in failover state. } } } @@ -134,23 +130,22 @@ public abstract class AbstractRemoteCacheNoWaitFacade<K, V> @Override public ICacheElement<K, V> get( final K key ) { - for (final RemoteCacheNoWait<K, V> nw : noWaits) - { - try - { - final ICacheElement<K, V> obj = nw.get( key ); - if ( obj != null ) + return noWaits.stream() + .map(nw -> { + try { - return obj; + return nw.get( key ); } - } - catch ( final IOException ex ) - { - log.debug( "Failed to get." ); + catch ( final IOException ex ) + { + log.debug( "Failed to get." ); + } + return null; - } - } - return null; + }) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); } /** @@ -175,6 +170,7 @@ public abstract class AbstractRemoteCacheNoWaitFacade<K, V> log.debug( "Failed to getMatching." ); } } + return Collections.emptyMap(); } @@ -217,15 +213,13 @@ public abstract class AbstractRemoteCacheNoWaitFacade<K, V> final HashSet<K> allKeys = new HashSet<>(); for (final RemoteCacheNoWait<K, V> nw : noWaits) { - if ( nw != null ) + final Set<K> keys = nw.getKeySet(); + if(keys != null) { - final Set<K> keys = nw.getKeySet(); - if(keys != null) - { - allKeys.addAll( keys ); - } + allKeys.addAll( keys ); } } + return allKeys; } @@ -238,17 +232,17 @@ public abstract class AbstractRemoteCacheNoWaitFacade<K, V> @Override public boolean remove( final K key ) { - try - { - for (final RemoteCacheNoWait<K, V> nw : noWaits) + noWaits.forEach(nw -> { + try { nw.remove( key ); } - } - catch ( final IOException ex ) - { - log.error( ex ); - } + catch ( final IOException ex ) + { + log.error( ex ); + } + }); + return false; } @@ -258,27 +252,23 @@ public abstract class AbstractRemoteCacheNoWaitFacade<K, V> @Override public void removeAll() { - try - { - for (final RemoteCacheNoWait<K, V> nw : noWaits) + noWaits.forEach(nw -> { + try { nw.removeAll(); } - } - catch ( final IOException ex ) - { - log.error( ex ); - } + catch ( final IOException ex ) + { + log.error( ex ); + } + }); } /** Adds a dispose request to the remote cache. */ @Override public void dispose() { - for (final RemoteCacheNoWait<K, V> nw : noWaits) - { - nw.dispose(); - } + noWaits.forEach(RemoteCacheNoWait::dispose); } /** @@ -325,15 +315,11 @@ public abstract class AbstractRemoteCacheNoWaitFacade<K, V> @Override public CacheStatus getStatus() { - for (final RemoteCacheNoWait<K, V> nw : noWaits) - { - if ( nw.getStatus() == CacheStatus.ALIVE ) - { - return CacheStatus.ALIVE; - } - } - - return CacheStatus.DISPOSED; + return noWaits.stream() + .map(nw -> nw.getStatus()) + .filter(status -> status == CacheStatus.ALIVE) + .findFirst() + .orElse(CacheStatus.DISPOSED); } /** @@ -344,7 +330,8 @@ public abstract class AbstractRemoteCacheNoWaitFacade<K, V> @Override public String toString() { - return "RemoteCacheNoWaitFacade: " + remoteCacheAttributes.getCacheName() + ", rca = " + remoteCacheAttributes; + return "RemoteCacheNoWaitFacade: " + remoteCacheAttributes.getCacheName() + + ", rca = " + remoteCacheAttributes; } /** diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheFactory.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheFactory.java index a7d9efd..7d49b35 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheFactory.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheFactory.java @@ -21,11 +21,8 @@ package org.apache.commons.jcs3.auxiliary.remote; import java.rmi.registry.Registry; import java.util.ArrayList; -import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheFactory; import org.apache.commons.jcs3.auxiliary.AuxiliaryCache; @@ -51,9 +48,6 @@ public class RemoteCacheFactory /** Contains mappings of RemoteLocation instance to RemoteCacheManager instance. */ private ConcurrentMap<RemoteLocation, RemoteCacheManager> managers; - /** Lock for initialization of manager instances */ - private Lock managerLock; - /** * For LOCAL clients we get a handle to all the failovers, but we do not register a listener * with them. We create the RemoteCacheManager, but we do not get a cache. @@ -92,39 +86,36 @@ public class RemoteCacheFactory failovers.add( rca.getRemoteLocation() ); final RemoteCacheManager rcm = getManager( rca, cacheMgr, cacheEventLogger, elementSerializer ); - final RemoteCacheNoWait<K,V> ic = rcm.getCache( rca ); - noWaits.add( ic ); + noWaits.add(rcm.getCache(rca)); } // GET HANDLE BUT DONT REGISTER A LISTENER FOR FAILOVERS final String failoverList = rca.getFailoverServers(); if ( failoverList != null ) { - final StringTokenizer fit = new StringTokenizer( failoverList, "," ); + final String[] failoverServers = failoverList.split("\\s*,\\s*"); int fCnt = 0; - while ( fit.hasMoreTokens() ) + for (String server : failoverServers) { fCnt++; - - final String server = fit.nextToken(); final RemoteLocation location = RemoteLocation.parseServerAndPort(server); if (location != null) { failovers.add( location ); - rca.setRemoteLocation(location); - final RemoteCacheManager rcm = getManager( rca, cacheMgr, cacheEventLogger, elementSerializer ); + final RemoteCacheAttributes frca = (RemoteCacheAttributes) rca.clone(); + frca.setRemoteLocation(location); + final RemoteCacheManager rcm = getManager( frca, cacheMgr, cacheEventLogger, elementSerializer ); // add a listener if there are none, need to tell rca what // number it is at - if (!primaryDefined && fCnt == 1 || noWaits.size() <= 0) + if (!primaryDefined && fCnt == 1 || noWaits.isEmpty()) { - final RemoteCacheNoWait<K,V> ic = rcm.getCache( rca ); - noWaits.add( ic ); + noWaits.add(rcm.getCache(frca)); } } } - // end while + // end for } // end if failoverList != null @@ -133,19 +124,18 @@ public class RemoteCacheFactory case CLUSTER: // REGISTER LISTENERS FOR EACH SYSTEM CLUSTERED CACHEs - final StringTokenizer it = new StringTokenizer( rca.getClusterServers(), "," ); - while ( it.hasMoreElements() ) + final String[] clusterServers = rca.getClusterServers().split("\\s*,\\s*"); + for (String server: clusterServers) { - final String server = (String) it.nextElement(); final RemoteLocation location = RemoteLocation.parseServerAndPort(server); if (location != null) { - rca.setRemoteLocation(location); - final RemoteCacheManager rcm = getManager( rca, cacheMgr, cacheEventLogger, elementSerializer ); - rca.setRemoteType( RemoteType.CLUSTER ); - final RemoteCacheNoWait<K,V> ic = rcm.getCache( rca ); - noWaits.add( ic ); + final RemoteCacheAttributes crca = (RemoteCacheAttributes) rca.clone(); + crca.setRemoteLocation(location); + final RemoteCacheManager rcm = getManager( crca, cacheMgr, cacheEventLogger, elementSerializer ); + crca.setRemoteType( RemoteType.CLUSTER ); + noWaits.add(rcm.getCache(crca)); } } break; @@ -153,7 +143,6 @@ public class RemoteCacheFactory return new RemoteCacheNoWaitFacade<>(noWaits, rca, cacheEventLogger, elementSerializer, this); } - // end createCache /** @@ -167,14 +156,13 @@ public class RemoteCacheFactory */ public RemoteCacheManager getManager( final IRemoteCacheAttributes cattr ) { - if ( cattr.getRemoteLocation() == null ) + final RemoteCacheAttributes rca = (RemoteCacheAttributes) cattr.clone(); + if (rca.getRemoteLocation() == null) { - cattr.setRemoteLocation("", Registry.REGISTRY_PORT); + rca.setRemoteLocation("", Registry.REGISTRY_PORT); } - final RemoteLocation loc = cattr.getRemoteLocation(); - - return managers.get( loc ); + return managers.get(rca.getRemoteLocation()); } /** @@ -185,40 +173,31 @@ public class RemoteCacheFactory * If the connection cannot be established, zombie objects will be used for future recovery * purposes. * <p> - * @param cattr - * @param cacheMgr - * @param cacheEventLogger - * @param elementSerializer + * @param cattr the cache configuration object + * @param cacheMgr the cache manager + * @param cacheEventLogger the event logger + * @param elementSerializer the serializer to use for sending and receiving + * * @return The instance value, never null */ - public RemoteCacheManager getManager( final IRemoteCacheAttributes cattr, final ICompositeCacheManager cacheMgr, - final ICacheEventLogger cacheEventLogger, - final IElementSerializer elementSerializer ) + public RemoteCacheManager getManager( final IRemoteCacheAttributes cattr, + final ICompositeCacheManager cacheMgr, + final ICacheEventLogger cacheEventLogger, + final IElementSerializer elementSerializer ) { - RemoteCacheManager ins = getManager( cattr ); - - if ( ins == null ) + final RemoteCacheAttributes rca = (RemoteCacheAttributes) cattr.clone(); + if (rca.getRemoteLocation() == null) { - managerLock.lock(); + rca.setRemoteLocation("", Registry.REGISTRY_PORT); + } - try - { - ins = managers.get( cattr.getRemoteLocation() ); + return managers.computeIfAbsent(rca.getRemoteLocation(), key -> { - if (ins == null) - { - ins = new RemoteCacheManager( cattr, cacheMgr, monitor, cacheEventLogger, elementSerializer); - managers.put( cattr.getRemoteLocation(), ins ); - monitor.addManager(ins); - } - } - finally - { - managerLock.unlock(); - } - } + RemoteCacheManager manager = new RemoteCacheManager(rca, cacheMgr, monitor, cacheEventLogger, elementSerializer); + monitor.addManager(manager); - return ins; + return manager; + }); } /** @@ -230,7 +209,6 @@ public class RemoteCacheFactory super.initialize(); managers = new ConcurrentHashMap<>(); - managerLock = new ReentrantLock(); monitor = new RemoteCacheMonitor(); monitor.setDaemon(true); @@ -242,11 +220,7 @@ public class RemoteCacheFactory @Override public void dispose() { - for (final RemoteCacheManager manager : managers.values()) - { - manager.release(); - } - + managers.values().forEach(RemoteCacheManager::release); managers.clear(); if (monitor != null) diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheFailoverRunner.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheFailoverRunner.java index 38c5f86..c63d3a0 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheFailoverRunner.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheFailoverRunner.java @@ -19,14 +19,7 @@ package org.apache.commons.jcs3.auxiliary.remote; * under the License. */ -import java.io.IOException; -import java.util.List; -import java.util.ListIterator; - import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheMonitor; -import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes; -import org.apache.commons.jcs3.engine.CacheStatus; -import org.apache.commons.jcs3.engine.behavior.ICache; /** * The RemoteCacheFailoverRunner tries to establish a connection with a failover @@ -46,15 +39,14 @@ import org.apache.commons.jcs3.engine.behavior.ICache; * Connection in the background. If failovers are defined, the Failover runner * will try to connect to a failover until the primary is restored. * + * @deprecated Functionality moved to RemoteCacheNoWaitFacade */ +@Deprecated public class RemoteCacheFailoverRunner<K, V> extends AbstractAuxiliaryCacheMonitor { /** The facade returned to the composite cache. */ private final RemoteCacheNoWaitFacade<K, V> facade; - /** Factory instance */ - private final RemoteCacheFactory cacheFactory; - /** * Constructor for the RemoteCacheFailoverRunner object. This allows the * FailoverRunner to modify the facade that the CompositeCache references. @@ -66,7 +58,6 @@ public class RemoteCacheFailoverRunner<K, V> extends AbstractAuxiliaryCacheMonit { super("JCS-RemoteCacheFailoverRunner"); this.facade = facade; - this.cacheFactory = cacheFactory; setIdlePeriod(20000L); } @@ -107,280 +98,7 @@ public class RemoteCacheFailoverRunner<K, V> extends AbstractAuxiliaryCacheMonit { // start the main work of connecting to a failover and then restoring // the primary. - connectAndRestore(); - - if ( log.isInfoEnabled() ) - { - final int failoverIndex = facade.getAuxiliaryCacheAttributes().getFailoverIndex(); - log.info( "Exiting failover runner. Failover index = {0}", failoverIndex); - - if ( failoverIndex <= 0 ) - { - log.info( "Failover index is <= 0, meaning we are not connected to a failover server." ); - } - else { - log.info( "Failover index is > 0, meaning we are connected to a failover server." ); - } - // log if we are allright or not. - } - } - - /** - * This is the main loop. If there are failovers defined, then this will - * continue until the primary is re-connected. If no failovers are defined, - * this will exit automatically. - */ - private void connectAndRestore() - { - final IRemoteCacheAttributes rca0 = facade.getAuxiliaryCacheAttributes(); - - do - { - log.info( "Remote cache FAILOVER RUNNING." ); - - // there is no active listener - if ( !allright.get() ) - { - // Monitor each RemoteCacheManager instance one after the other. - // Each RemoteCacheManager corresponds to one remote connection. - final List<RemoteLocation> failovers = rca0.getFailovers(); - // we should probably check to see if there are any failovers, - // even though the caller - // should have already. - - if ( failovers == null ) - { - log.warn( "Remote is misconfigured, failovers was null." ); - return; - } - if ( failovers.size() == 1 ) - { - // if there is only the primary, return out of this - log.info( "No failovers defined, exiting failover runner." ); - return; - } - - final int fidx = rca0.getFailoverIndex(); - log.debug( "fidx = {0} failovers.size = {1}", () -> fidx, - failovers::size); - - // shouldn't we see if the primary is backup? - // If we don't check the primary, if it gets connected in the - // background, - // we will disconnect it only to put it right back - final ListIterator<RemoteLocation> i = failovers.listIterator(fidx); // + 1; // +1 skips the primary - log.debug( "starting at failover i = {0}", i ); - - // try them one at a time until successful - while (i.hasNext() && !allright.get()) - { - final RemoteLocation server = i.next(); - log.debug( "Trying server [{0}] at failover index i = {1}", server, i ); - - final RemoteCacheAttributes rca = (RemoteCacheAttributes) rca0.clone(); - rca.setRemoteLocation(server); - final RemoteCacheManager rcm = cacheFactory.getManager( rca ); - - log.debug( "RemoteCacheAttributes for failover = {0}", rca ); - - if (rcm != null) - { - // add a listener if there are none, need to tell rca - // what number it is at - final ICache<K, V> ic = rcm.getCache( rca ); - if ( ic.getStatus() == CacheStatus.ALIVE ) - { - // may need to do this more gracefully - log.debug( "resetting no wait" ); - facade.restorePrimaryServer((RemoteCacheNoWait<K, V>) ic); - rca0.setFailoverIndex( i.nextIndex() ); - - log.debug( "setting ALLRIGHT to true" ); - if ( i.hasPrevious() ) - { - log.debug( "Moving to Primary Recovery Mode, failover index = {0}", i ); - } - else - { - log.debug( "No need to connect to failover, the primary server is back up." ); - } - - allright.set(true); - - log.info( "CONNECTED to host = [{0}]", - rca::getRemoteLocation); - } - } - } - } - // end if !allright - // get here if while index >0 and allright, meaning that we are - // connected to some backup server. - else - { - log.debug( "ALLRIGHT is true " ); - log.info( "Failover runner is in primary recovery mode. " - + "Failover index = {0} Will now try to reconnect to " - + "primary server.", rca0::getFailoverIndex); - } - - boolean primaryRestoredSuccessfully = false; - // if we are not connected to the primary, try. - if ( rca0.getFailoverIndex() > 0 ) - { - primaryRestoredSuccessfully = restorePrimary(); - log.debug( "Primary recovery success state = {0}", - primaryRestoredSuccessfully ); - } - - if ( !primaryRestoredSuccessfully ) - { - // Time driven mode: sleep between each round of recovery - // attempt. - try - { - log.warn( "Failed to reconnect to primary server. " - + "Cache failover runner is going to sleep for " - + "{0} milliseconds.", idlePeriod ); - Thread.sleep( idlePeriod ); - } - catch ( final InterruptedException ex ) - { - // ignore; - } - } - - // try to bring the listener back to the primary - } - while ( rca0.getFailoverIndex() > 0 || !allright.get() ); - // continue if the primary is not restored or if things are not allright. + facade.connectAndRestore(); } - /** - * Try to restore the primary server. - * <p> - * Once primary is restored the failover listener must be deregistered. - * <p> - * The primary server is the first server defines in the FailoverServers - * list. - * - * @return boolean value indicating whether the restoration was successful - */ - private boolean restorePrimary() - { - final IRemoteCacheAttributes rca0 = facade.getAuxiliaryCacheAttributes(); - // try to move back to the primary - final RemoteLocation server = rca0.getFailovers().get(0); - - log.info( "Trying to restore connection to primary remote server " - + "[{0}]", server ); - - final RemoteCacheAttributes rca = (RemoteCacheAttributes) rca0.clone(); - rca.setRemoteLocation(server); - final RemoteCacheManager rcm = cacheFactory.getManager( rca ); - - if (rcm != null) - { - // add a listener if there are none, need to tell rca what number it - // is at - final ICache<K, V> ic = rcm.getCache( rca ); - // by default the listener id should be 0, else it will be the - // listener - // Originally associated with the remote cache. either way is fine. - // We just don't want the listener id from a failover being used. - // If the remote server was rebooted this could be a problem if new - // locals were also added. - - if ( ic.getStatus() == CacheStatus.ALIVE ) - { - try - { - // we could have more than one listener registered right - // now. - // this will not result in a loop, only duplication - // stop duplicate listening. - if ( facade.getPrimaryServer() != null && facade.getPrimaryServer().getStatus() == CacheStatus.ALIVE ) - { - final int fidx = rca0.getFailoverIndex(); - - if ( fidx > 0 ) - { - final RemoteLocation serverOld = rca0.getFailovers().get(fidx); - - log.debug( "Failover Index = {0} the server at that " - + "index is [{1}]", fidx, serverOld ); - - if ( serverOld != null ) - { - // create attributes that reflect the - // previous failed over configuration. - final RemoteCacheAttributes rcaOld = (RemoteCacheAttributes) rca0.clone(); - rcaOld.setRemoteLocation(serverOld); - final RemoteCacheManager rcmOld = cacheFactory.getManager( rcaOld ); - - if ( rcmOld != null ) - { - // manager can remove by name if - // necessary - rcmOld.removeRemoteCacheListener( rcaOld ); - } - log.info( "Successfully deregistered from " - + "FAILOVER remote server = {0}", serverOld ); - } - } - else if ( fidx == 0 ) - { - // this should never happen. If there are no - // failovers this shouldn't get called. - if ( log.isDebugEnabled() ) - { - log.debug( "No need to restore primary, it is already restored." ); - return true; - } - } - else { - // this should never happen - log.warn( "Failover index is less than 0, this shouldn't happen" ); - } - } - } - catch ( final IOException e ) - { - // TODO, should try again, or somehow stop the listener - log.error("Trouble trying to deregister old failover " - + "listener prior to restoring the primary = {0}", - server, e ); - } - - // Restore primary - // may need to do this more gracefully, letting the failover finish in the background - final RemoteCacheNoWait<K, V> failoverNoWait = facade.getPrimaryServer(); - - // swap in a new one - facade.restorePrimaryServer((RemoteCacheNoWait<K, V>) ic); - rca0.setFailoverIndex( 0 ); - - final String message = "Successfully reconnected to PRIMARY " - + "remote server. Substituted primary for " - + "failoverNoWait [" + failoverNoWait + "]"; - log.info( message ); - - if ( facade.getCacheEventLogger() != null ) - { - facade.getCacheEventLogger().logApplicationEvent( - "RemoteCacheFailoverRunner", "RestoredPrimary", - message ); - } - return true; - } - } - - // else all right - // if the failover index was at 0 here, we would be in a bad - // situation, unless there were just - // no failovers configured. - log.debug( "Primary server status in error, not connected." ); - - return false; - } } diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheListener.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheListener.java index 6d6974e..5e094cf 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheListener.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheListener.java @@ -22,6 +22,7 @@ package org.apache.commons.jcs3.auxiliary.remote; import java.io.IOException; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes; import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheConstants; @@ -45,7 +46,7 @@ public class RemoteCacheListener<K, V> private static final Log log = LogManager.getLog( RemoteCacheListener.class ); /** Has this client been shutdown. */ - private boolean disposed; + private AtomicBoolean disposed; /** * Only need one since it does work for all regions, just reference by multiple region names. @@ -57,9 +58,12 @@ public class RemoteCacheListener<K, V> * @param cacheMgr the cache hub * @param elementSerializer a custom serializer */ - public RemoteCacheListener( final IRemoteCacheAttributes irca, final ICompositeCacheManager cacheMgr, final IElementSerializer elementSerializer ) + public RemoteCacheListener( final IRemoteCacheAttributes irca, + final ICompositeCacheManager cacheMgr, + final IElementSerializer elementSerializer ) { super( irca, cacheMgr, elementSerializer ); + disposed = new AtomicBoolean(false); // Export this remote object to make it available to receive incoming // calls. @@ -83,7 +87,7 @@ public class RemoteCacheListener<K, V> public synchronized void dispose() throws IOException { - if ( !disposed ) + if (disposed.compareAndSet(false, true)) { log.info( "Unexporting listener." ); try @@ -95,7 +99,6 @@ public class RemoteCacheListener<K, V> log.error( "Problem unexporting the listener.", ex ); throw new IllegalStateException( ex.getMessage() ); } - disposed = true; } } diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheManager.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheManager.java index 2a8f8d0..b2f5323 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheManager.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheManager.java @@ -208,11 +208,9 @@ public class RemoteCacheManager private void removeListenerFromCache(final RemoteCacheNoWait<?, ?> cache) throws IOException { final IRemoteCacheClient<?, ?> rc = cache.getRemoteCache(); - log.debug( "Found cache for [{0}], deregistering listener.", - cache::getCacheName); + log.debug( "Found cache for [{0}], deregistering listener.", cache::getCacheName); // could also store the listener for a server in the manager. - final IRemoteCacheListener<?, ?> listener = rc.getListener(); - remoteWatch.removeCacheListener( cache.getCacheName(), listener ); + remoteWatch.removeCacheListener(cache.getCacheName(), rc.getListener()); } /** @@ -228,9 +226,9 @@ public class RemoteCacheManager @SuppressWarnings("unchecked") // Need to cast because of common map for all caches public <K, V> RemoteCacheNoWait<K, V> getCache( final IRemoteCacheAttributes cattr ) { - // might want to do some listener sanity checking here. - return (RemoteCacheNoWait<K, V>) caches.computeIfAbsent(cattr.getCacheName(), key -> newRemoteCacheNoWait(cattr)); + return (RemoteCacheNoWait<K, V>) caches.computeIfAbsent(cattr.getCacheName(), + key -> newRemoteCacheNoWait(cattr)); } /** @@ -256,8 +254,9 @@ public class RemoteCacheManager listener, e ); } + @SuppressWarnings("unchecked") final IRemoteCacheClient<K, V> remoteCacheClient = - new RemoteCache<>( cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor ); + new RemoteCache<>(cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor); remoteCacheClient.setCacheEventLogger( cacheEventLogger ); remoteCacheClient.setElementSerializer( elementSerializer ); @@ -271,21 +270,19 @@ public class RemoteCacheManager /** Shutdown all. */ public void release() { - for (final RemoteCacheNoWait<?, ?> c : caches.values()) - { + caches.forEach((name, cache) -> { try { - log.info( "freeCache [{0}]", c::getCacheName); + log.info("freeCache [{0}]", name); - removeListenerFromCache(c); - c.dispose(); + removeListenerFromCache(cache); + cache.dispose(); } catch ( final IOException ex ) { - log.error( "Problem releasing {0}", c.getCacheName(), ex ); + log.error("Problem releasing {0}", name, ex); } - } - + }); caches.clear(); } @@ -302,13 +299,9 @@ public class RemoteCacheManager log.info( "Fixing caches. ICacheServiceNonLocal {0} | IRemoteCacheObserver {1}", remoteService, remoteWatch ); - for (final RemoteCacheNoWait<?, ?> c : caches.values()) - { - if (c.getStatus() == CacheStatus.ERROR) - { - c.fixCache( remoteService ); - } - } + caches.values().stream() + .filter(cache -> cache.getStatus() == CacheStatus.ERROR) + .forEach(cache -> cache.fixCache(remoteService)); if ( log.isInfoEnabled() ) { diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheMonitor.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheMonitor.java index f69c7f4..807bebe 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheMonitor.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheMonitor.java @@ -83,8 +83,7 @@ public class RemoteCacheMonitor extends AbstractAuxiliaryCacheMonitor { // Monitor each RemoteCacheManager instance one after the other. // Each RemoteCacheManager corresponds to one remote connection. - for (final RemoteCacheManager mgr : managers.values()) - { + managers.values().forEach(mgr -> { // If we can't fix them, just skip and re-try in // the next round. if ( mgr.canFixCaches() ) @@ -95,6 +94,6 @@ public class RemoteCacheMonitor extends AbstractAuxiliaryCacheMonitor { allright.set(false); } - } + }); } } diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWait.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWait.java index a9022fd..e0d7733 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWait.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWait.java @@ -98,7 +98,7 @@ public class RemoteCacheNoWait<K, V> */ public RemoteCacheNoWait( final IRemoteCacheClient<K, V> cache ) { - remoteCacheClient = cache; + this.remoteCacheClient = cache; this.cacheEventQueue = createCacheEventQueue(cache); if ( remoteCacheClient.getStatus() == CacheStatus.ERROR ) diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWaitFacade.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWaitFacade.java index 8744c1c..c79459b 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWaitFacade.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/remote/RemoteCacheNoWaitFacade.java @@ -1,5 +1,7 @@ package org.apache.commons.jcs3.auxiliary.remote; +import java.io.IOException; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,10 +22,13 @@ package org.apache.commons.jcs3.auxiliary.remote; */ import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes; import org.apache.commons.jcs3.auxiliary.remote.server.behavior.RemoteType; import org.apache.commons.jcs3.engine.CacheStatus; +import org.apache.commons.jcs3.engine.behavior.ICache; import org.apache.commons.jcs3.engine.behavior.IElementSerializer; import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger; import org.apache.commons.jcs3.log.Log; @@ -46,6 +51,9 @@ public class RemoteCacheNoWaitFacade<K, V> /** Provide factory instance to RemoteCacheFailoverRunner */ private final RemoteCacheFactory cacheFactory; + /** Time in ms to sleep between failover attempts */ + private static final long idlePeriod = 20000L; + /** * Constructs with the given remote cache, and fires events to any listeners. * <p> @@ -80,10 +88,9 @@ public class RemoteCacheNoWaitFacade<K, V> if ( rcnw.getStatus() == CacheStatus.ERROR ) { // start failover, primary recovery process - final RemoteCacheFailoverRunner<K, V> runner = new RemoteCacheFailoverRunner<>( this, this.cacheFactory ); + final Thread runner = new Thread(this::connectAndRestore); runner.setDaemon( true ); runner.start(); - runner.notifyError(); if ( getCacheEventLogger() != null ) { @@ -98,4 +105,290 @@ public class RemoteCacheNoWaitFacade<K, V> } } + /** + * The thread tries to establish a connection with a failover + * server, if any are defined. Once a failover connection is made, it will + * attempt to replace the failover with the primary remote server. + * <p> + * It works by switching out the RemoteCacheNoWait inside the Facade. + * <p> + * Client (i.e.) the CompositeCache has reference to a RemoteCacheNoWaitFacade. + * This facade is created by the RemoteCacheFactory. The factory maintains a set + * of managers, one for each remote server. Typically, there will only be one + * manager. + * <p> + * If you use multiple remote servers, you may want to set one or more as + * failovers. If a local cache cannot connect to the primary server, or looses + * its connection to the primary server, it will attempt to restore that + * Connection in the background. If failovers are defined, the Failover runner + * will try to connect to a failover until the primary is restored. + * If no failovers are defined, this will exit automatically. + */ + protected void connectAndRestore() + { + final IRemoteCacheAttributes rca0 = getAuxiliaryCacheAttributes(); + final AtomicBoolean allright = new AtomicBoolean(false); + + do + { + log.info( "Remote cache FAILOVER RUNNING." ); + + // there is no active listener + if ( !allright.get() ) + { + // Monitor each RemoteCacheManager instance one after the other. + // Each RemoteCacheManager corresponds to one remote connection. + final List<RemoteLocation> failovers = rca0.getFailovers(); + // we should probably check to see if there are any failovers, + // even though the caller + // should have already. + + if ( failovers == null ) + { + log.warn( "Remote is misconfigured, failovers was null." ); + return; + } + if ( failovers.size() == 1 ) + { + // if there is only the primary, return out of this + log.info( "No failovers defined, exiting failover runner." ); + return; + } + + final int fidx = rca0.getFailoverIndex(); + log.debug( "fidx = {0} failovers.size = {1}", () -> fidx, failovers::size); + + // shouldn't we see if the primary is backup? + // If we don't check the primary, if it gets connected in the + // background, + // we will disconnect it only to put it right back + final ListIterator<RemoteLocation> i = failovers.listIterator(fidx); // + 1; // +1 skips the primary + log.debug( "starting at failover i = {0}", i ); + + // try them one at a time until successful + while (i.hasNext() && !allright.get()) + { + final RemoteLocation server = i.next(); + log.debug( "Trying server [{0}] at failover index i = {1}", server, i ); + + final RemoteCacheAttributes rca = (RemoteCacheAttributes) rca0.clone(); + rca.setRemoteLocation(server); + final RemoteCacheManager rcm = cacheFactory.getManager( rca ); + + log.debug( "RemoteCacheAttributes for failover = {0}", rca ); + + if (rcm != null) + { + // add a listener if there are none, need to tell rca + // what number it is at + final ICache<K, V> ic = rcm.getCache( rca ); + if ( ic.getStatus() == CacheStatus.ALIVE ) + { + // may need to do this more gracefully + log.debug( "resetting no wait" ); + restorePrimaryServer((RemoteCacheNoWait<K, V>) ic); + rca0.setFailoverIndex( i.nextIndex() ); + + log.debug( "setting ALLRIGHT to true" ); + if ( i.hasPrevious() ) + { + log.debug( "Moving to Primary Recovery Mode, failover index = {0}", i ); + } + else + { + log.debug( "No need to connect to failover, the primary server is back up." ); + } + + allright.set(true); + + log.info( "CONNECTED to host = [{0}]", rca::getRemoteLocation); + } + } + } + } + // end if !allright + // get here if while index >0 and allright, meaning that we are + // connected to some backup server. + else + { + log.debug( "ALLRIGHT is true " ); + log.info( "Failover runner is in primary recovery mode. " + + "Failover index = {0} Will now try to reconnect to " + + "primary server.", rca0::getFailoverIndex); + } + + boolean primaryRestoredSuccessfully = false; + // if we are not connected to the primary, try. + if ( rca0.getFailoverIndex() > 0 ) + { + primaryRestoredSuccessfully = restorePrimary(); + log.debug( "Primary recovery success state = {0}", + primaryRestoredSuccessfully ); + } + + if ( !primaryRestoredSuccessfully ) + { + // Time driven mode: sleep between each round of recovery + // attempt. + try + { + log.warn( "Failed to reconnect to primary server. " + + "Cache failover runner is going to sleep for " + + "{0} milliseconds.", idlePeriod ); + Thread.sleep( idlePeriod ); + } + catch ( final InterruptedException ex ) + { + // ignore; + } + } + + // try to bring the listener back to the primary + } + while ( rca0.getFailoverIndex() > 0 || !allright.get() ); + // continue if the primary is not restored or if things are not allright. + + if ( log.isInfoEnabled() ) + { + final int failoverIndex = getAuxiliaryCacheAttributes().getFailoverIndex(); + log.info( "Exiting failover runner. Failover index = {0}", failoverIndex); + + if ( failoverIndex <= 0 ) + { + log.info( "Failover index is <= 0, meaning we are not connected to a failover server." ); + } + else + { + log.info( "Failover index is > 0, meaning we are connected to a failover server." ); + } + } + } + + /** + * Try to restore the primary server. + * <p> + * Once primary is restored the failover listener must be deregistered. + * <p> + * The primary server is the first server defines in the FailoverServers + * list. + * + * @return boolean value indicating whether the restoration was successful + */ + private boolean restorePrimary() + { + final IRemoteCacheAttributes rca0 = getAuxiliaryCacheAttributes(); + // try to move back to the primary + final RemoteLocation server = rca0.getFailovers().get(0); + + log.info( "Trying to restore connection to primary remote server " + + "[{0}]", server ); + + final RemoteCacheAttributes rca = (RemoteCacheAttributes) rca0.clone(); + rca.setRemoteLocation(server); + final RemoteCacheManager rcm = cacheFactory.getManager( rca ); + + if (rcm != null) + { + // add a listener if there are none, need to tell rca what number it + // is at + final ICache<K, V> ic = rcm.getCache( rca ); + // by default the listener id should be 0, else it will be the + // listener + // Originally associated with the remote cache. either way is fine. + // We just don't want the listener id from a failover being used. + // If the remote server was rebooted this could be a problem if new + // locals were also added. + + if ( ic.getStatus() == CacheStatus.ALIVE ) + { + try + { + // we could have more than one listener registered right + // now. + // this will not result in a loop, only duplication + // stop duplicate listening. + if (getPrimaryServer() != null && getPrimaryServer().getStatus() == CacheStatus.ALIVE ) + { + final int fidx = rca0.getFailoverIndex(); + + if ( fidx > 0 ) + { + final RemoteLocation serverOld = rca0.getFailovers().get(fidx); + + log.debug( "Failover Index = {0} the server at that " + + "index is [{1}]", fidx, serverOld ); + + if ( serverOld != null ) + { + // create attributes that reflect the + // previous failed over configuration. + final RemoteCacheAttributes rcaOld = (RemoteCacheAttributes) rca0.clone(); + rcaOld.setRemoteLocation(serverOld); + final RemoteCacheManager rcmOld = cacheFactory.getManager( rcaOld ); + + if ( rcmOld != null ) + { + // manager can remove by name if + // necessary + rcmOld.removeRemoteCacheListener( rcaOld ); + } + log.info( "Successfully deregistered from " + + "FAILOVER remote server = {0}", serverOld ); + } + } + else if ( fidx == 0 ) + { + // this should never happen. If there are no + // failovers this shouldn't get called. + if ( log.isDebugEnabled() ) + { + log.debug( "No need to restore primary, it is already restored." ); + return true; + } + } + else { + // this should never happen + log.warn( "Failover index is less than 0, this shouldn't happen" ); + } + } + } + catch ( final IOException e ) + { + // TODO, should try again, or somehow stop the listener + log.error("Trouble trying to deregister old failover " + + "listener prior to restoring the primary = {0}", + server, e ); + } + + // Restore primary + // may need to do this more gracefully, letting the failover finish in the background + final RemoteCacheNoWait<K, V> failoverNoWait = getPrimaryServer(); + + // swap in a new one + restorePrimaryServer((RemoteCacheNoWait<K, V>) ic); + rca0.setFailoverIndex( 0 ); + + final String message = "Successfully reconnected to PRIMARY " + + "remote server. Substituted primary for " + + "failoverNoWait [" + failoverNoWait + "]"; + log.info( message ); + + if (getCacheEventLogger() != null) + { + getCacheEventLogger().logApplicationEvent( + "RemoteCacheFailoverRunner", "RestoredPrimary", + message ); + } + return true; + } + } + + // else all right + // if the failover index was at 0 here, we would be in a bad + // situation, unless there were just + // no failovers configured. + log.debug( "Primary server status in error, not connected." ); + + return false; + } }