# ignite-709_3
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6d110437 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6d110437 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6d110437 Branch: refs/heads/ignite-709_3 Commit: 6d1104372142898f7d46fa72265473d06e1eb959 Parents: c1913c4 Author: sboikov <sboi...@gridgain.com> Authored: Fri May 15 13:37:24 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri May 15 15:14:10 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgniteEx.java | 3 +- .../apache/ignite/internal/IgniteKernal.java | 24 ++- .../discovery/GridDiscoveryManager.java | 37 ++-- .../cache/DynamicCacheDescriptor.java | 2 + .../processors/cache/GridCacheProcessor.java | 183 ++++------------ .../processors/cache/GridCacheUtils.java | 12 +- .../cache/affinity/GridCacheAffinityImpl.java | 10 +- .../preloader/GridDhtPartitionDemandPool.java | 4 - .../datastructures/DataStructuresProcessor.java | 12 +- .../affinity/IgniteClientNodeAffinityTest.java | 3 - .../IgniteDynamicClientCacheStartSelfTest.java | 213 +++++++++++++++++++ .../cache/IgniteSystemCacheOnClientTest.java | 2 +- .../testsuites/IgniteCacheTestSuite4.java | 1 + 13 files changed, 313 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java index bc7e722..4845d51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java @@ -35,9 +35,8 @@ public interface IgniteEx extends Ignite { * Gets utility cache. * * @return Utility cache. - * @throws IgniteCheckedException If failed. */ - public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() throws IgniteCheckedException; + public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache(); /** * Gets the cache instance for the given name if one is configured or http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 1c68a82..49b5f22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2362,7 +2362,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true).get(); - return ctx.cache().publicJCache(cacheName); + IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName); + + checkNearCacheStarted(cache); + + return cache; } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -2382,7 +2386,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get(); - return ctx.cache().publicJCache(cacheName); + IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName); + + checkNearCacheStarted(cache); + + return cache; } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -2392,6 +2400,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } + /** + * @param cache Cache. + */ + private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) { + if (!cache.context().isNear()) + throw new IgniteException("Failed to start near cache " + + "(a cache with the same name without near cache is already started)"); + } + /** {@inheritDoc} */ @Override public void destroyCache(String cacheName) { guard(); @@ -2462,8 +2479,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ - @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() - throws IgniteCheckedException { + @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() { guard(); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 59240b8..eac96b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -214,6 +214,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheName Cache name. * @param filter Cache filter. + * @param nearEnabled Near enabled flag. * @param loc {@code True} if cache is local. */ public void setCacheFilter( @@ -240,12 +241,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheName Cache name. * @param clientNodeId Near node ID. + * @param nearEnabled Near enabled flag. */ public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - if (predicate != null) - predicate.addClientNode(clientNodeId, nearEnabled); + if (pred != null) + pred.addClientNode(clientNodeId, nearEnabled); } /** @@ -1256,9 +1258,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node is a cache data node. */ public boolean cacheAffinityNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.dataNode(node); + return pred != null && pred.dataNode(node); } /** @@ -1267,9 +1269,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node has near cache enabled. */ public boolean cacheNearNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.nearNode(node); + return pred != null && pred.nearNode(node); } /** @@ -1278,9 +1280,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node has client cache (without near cache). */ public boolean cacheClientNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.clientNode(node); + return pred != null && pred.clientNode(node); } /** @@ -1289,9 +1291,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return If cache with the given name is accessible on the given node. */ public boolean cacheNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.cacheNode(node); + return pred != null && pred.cacheNode(node); } /** @@ -2480,11 +2482,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private boolean loc; /** Collection of client near nodes. */ - private Map<UUID, Boolean> clientNodes; + private ConcurrentHashMap<UUID, Boolean> clientNodes; /** * @param cacheFilter Cache filter. * @param nearEnabled Near enabled flag. + * @param loc {@code True} if cache is local. */ private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, boolean loc) { assert cacheFilter != null; @@ -2498,9 +2501,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @param nodeId Near node ID to add. + * @param nearEnabled Near enabled flag. */ public void addClientNode(UUID nodeId, boolean nearEnabled) { - clientNodes.put(nodeId, nearEnabled); + clientNodes.putIfAbsent(nodeId, nearEnabled); } /** @@ -2515,7 +2519,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if this node is a data node for given cache. */ public boolean dataNode(ClusterNode node) { - return !node.isDaemon() && !CU.clientModeNode(node) && cacheFilter.apply(node); + return !node.isDaemon() && CU.affinityNode(node, cacheFilter); } /** @@ -2523,8 +2527,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if cache is accessible on the given node. */ public boolean cacheNode(ClusterNode node) { - return !node.isDaemon() && - ((!CU.clientModeNode(node) && cacheFilter.apply(node)) || clientNodes.containsKey(node.id())); + return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id())); } /** @@ -2535,7 +2538,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (node.isDaemon()) return false; - if (nearEnabled && cacheFilter.apply(node)) + if (nearEnabled && CU.affinityNode(node, cacheFilter)) return true; Boolean near = clientNodes.get(node.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 6f6f422..a27ebd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -62,6 +62,7 @@ public class DynamicCacheDescriptor { private final CachePluginManager pluginMgr; /** + * @param ctx Context. * @param cacheCfg Cache configuration. * @param cacheType Cache type. * @param template {@code True} if this is template configuration. @@ -76,6 +77,7 @@ public class DynamicCacheDescriptor { this.cacheType = cacheType; this.template = template; this.deploymentId = deploymentId; + pluginMgr = new CachePluginManager(ctx, cacheCfg); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 6ac5afc..9469e65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -127,9 +127,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Count down latch for caches. */ private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); - /** */ - private final GridFutureAdapter<Object> sysCacheStartFut = new GridFutureAdapter<>(); - /** * @param ctx Kernal context. */ @@ -628,6 +625,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfg.getNearConfiguration() != null, cfg.getCacheMode() == LOCAL); + ctx.discovery().addClientNode(cfg.getName(), + ctx.localNodeId(), + cfg.getNearConfiguration() != null); + if (!cacheType.userCache()) stopSeq.addLast(cfg.getName()); else @@ -664,9 +665,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart() throws IgniteCheckedException { - DynamicCacheDescriptor marshCacheDesc = null; - DynamicCacheDescriptor utilityCacheDesc = null; - DynamicCacheDescriptor atomicsCacheDesc = null; + List<GridCacheAdapter<?, ?>> locCaches = new ArrayList<>(registeredCaches.size()); try { if (ctx.config().isDaemon()) @@ -713,7 +712,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgnitePredicate filter = ccfg.getNodeFilter(); - if (!CU.clientModeNode(locNode) && filter.apply(locNode)) { + boolean loc = desc.locallyConfigured(); + + if (loc || CU.affinityNode(locNode, filter)) { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); CachePluginManager pluginMgr = desc.pluginManager(); @@ -733,49 +734,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { startCache(cache); jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); - } - if (CU.MARSH_CACHE_NAME.equals(ccfg.getName())) - marshCacheDesc = desc; - else if (CU.UTILITY_CACHE_NAME.equals(ccfg.getName())) - utilityCacheDesc = desc; - else if (CU.ATOMICS_CACHE_NAME.equals(ccfg.getName())) - atomicsCacheDesc = desc; + if (loc) + locCaches.add(cache); + } } } finally { cacheStartedLatch.countDown(); } - if (ctx.config().isClientMode()) { - assert marshCacheDesc != null; - assert utilityCacheDesc != null; - assert atomicsCacheDesc != null; - - Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(); - - reqs.add(clientSystemCacheRequest(marshCacheDesc, new NearCacheConfiguration())); - reqs.add(clientSystemCacheRequest(utilityCacheDesc, null)); - reqs.add(clientSystemCacheRequest(atomicsCacheDesc, new NearCacheConfiguration())); - - startClientSystemCaches(reqs); - - sysCacheStartFut.listen(new CI1<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> fut) { - try { - marshallerCacheCallbacks(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize marshaller context.", e); - } - } - }); - } - else { - sysCacheStartFut.onDone(); + ctx.marshallerContext().onMarshallerCacheStarted(ctx); - marshallerCacheCallbacks(); - } + marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() { + @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { + ctx.marshallerContext().onMarshallerCachePreloaded(ctx); + } + }); // Must call onKernalStart on shared managers after creation of fetched caches. for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) @@ -811,80 +786,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { onKernalStart(cache); // Wait for caches in SYNC preload mode. - for (GridCacheAdapter<?, ?> cache : caches.values()) { - if (cache.context().started()) { - CacheConfiguration cfg = cache.configuration(); + for (GridCacheAdapter<?, ?> cache : locCaches) { + CacheConfiguration cfg = cache.configuration(); - if (cfg.getRebalanceMode() == SYNC) { - if (cfg.getCacheMode() == REPLICATED || - (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) - cache.preloader().syncFuture().get(); - } + if (cfg.getRebalanceMode() == SYNC) { + if (cfg.getCacheMode() == REPLICATED || + (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) + cache.preloader().syncFuture().get(); } } ctx.cacheObjects().onCacheProcessorStarted(); } - /** - * @param reqs Start requests. - */ - private void startClientSystemCaches(Collection<DynamicCacheChangeRequest> reqs) { - assert !F.isEmpty(reqs) : reqs; - - GridCompoundFuture<Object, Object> fut = new GridCompoundFuture<>(); - - for (DynamicCacheStartFuture startFut : initiateCacheChanges(reqs)) - fut.add(startFut); - - fut.markInitialized(); - - fut.listen(new CI1<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> fut) { - sysCacheStartFut.onDone(); - } - }); - } - - /** - * @param cacheDesc Cache descriptor. - * @return Cache change request. - */ - private DynamicCacheChangeRequest clientSystemCacheRequest( - DynamicCacheDescriptor cacheDesc, - @Nullable NearCacheConfiguration nearCfg) - { - DynamicCacheChangeRequest desc = new DynamicCacheChangeRequest( - cacheDesc.cacheConfiguration().getName(), - ctx.localNodeId()); - - desc.clientStartOnly(true); - - desc.nearCacheConfiguration(nearCfg); - - desc.deploymentId(cacheDesc.deploymentId()); - - desc.startCacheConfiguration(cacheDesc.cacheConfiguration()); - - desc.cacheType(cacheDesc.cacheType()); - - return desc; - } - - /** - * @throws IgniteCheckedException If failed. - */ - private void marshallerCacheCallbacks() throws IgniteCheckedException { - ctx.marshallerContext().onMarshallerCacheStarted(ctx); - - marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() { - @Override - public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { - ctx.marshallerContext().onMarshallerCachePreloaded(ctx); - } - }); - } - /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void stop(boolean cancel) throws IgniteCheckedException { @@ -928,8 +842,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { @Override public void onKernalStop(boolean cancel) { cacheStartedLatch.countDown(); - sysCacheStartFut.onDone(); - if (ctx.config().isDaemon()) return; @@ -1522,7 +1434,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ClusterNode locNode = ctx.discovery().localNode(); - boolean affNodeStart = !clientStartOnly && !CU.clientModeNode(locNode) && nodeFilter.apply(locNode); + boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter); boolean clientNodeStart = locNode.id().equals(initiatingNodeId); if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null) @@ -1968,7 +1880,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (nearCfg != null) { ClusterNode locNode = ctx.discovery().localNode(); - if (!CU.clientModeNode(locNode) && descCfg.getNodeFilter().apply(locNode)) { + if (CU.affinityNode(locNode, descCfg.getNodeFilter())) { // If we are on a data node and near cache was enabled, return success, else - fail. if (descCfg.getNearConfiguration() != null) return new GridFinishedFuture<>(); @@ -2017,7 +1929,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ClusterNode locNode = ctx.discovery().localNode(); - if (!CU.clientModeNode(locNode) && ccfg.getNodeFilter().apply(locNode)) { + if (CU.affinityNode(locNode, ccfg.getNodeFilter())) { if (ccfg.getNearConfiguration() != null) return new GridFinishedFuture<>(); else @@ -2305,6 +2217,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Checks that remote caches has configuration compatible with the local. * + * @param locCfg Local configuration. + * @param rmtCfg Remote configuration. * @param rmtNode Remote node. * @throws IgniteCheckedException If check failed. */ @@ -2330,13 +2244,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode", "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true); - boolean checkStore = locCfg.getAtomicityMode() == TRANSACTIONAL || - (!CU.clientModeNode(rmtNode) && - !CU.clientModeNode(locNode) && - rmtCfg.getNodeFilter().apply(rmtNode) && - locCfg.getNodeFilter().apply(locNode)); - - if (checkStore) + if (locCfg.getAtomicityMode() == TRANSACTIONAL || + (CU.affinityNode(rmtNode, rmtCfg.getNodeFilter()) && CU.affinityNode(locNode, locCfg.getNodeFilter()))) CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory", locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true); @@ -2607,26 +2516,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Gets utility cache. * * @return Utility cache. - * @throws IgniteCheckedException If failed. */ - public <K, V> GridCacheAdapter<K, V> utilityCache() throws IgniteCheckedException { - GridCacheAdapter<K, V> cache = internalCache(CU.UTILITY_CACHE_NAME); - - if (cache != null) - return cache; - - assert ctx.config().isClientMode() : "Utility cache is missed on server node."; - - sysCacheStartFut.get(); - + public <K, V> GridCacheAdapter<K, V> utilityCache() { return internalCache(CU.UTILITY_CACHE_NAME); } /** - * @return Utility cache start future. + * Gets utility cache for atomic data structures. + * + * @return Utility cache for atomic data structures. */ - public IgniteInternalFuture<?> systemCachesStartFuture() { - return sysCacheStartFut; + public <K, V> IgniteInternalCache<K, V> atomicsCache() { + return cache(CU.ATOMICS_CACHE_NAME); } /** @@ -2663,7 +2564,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache instance for given name. * @throws IgniteCheckedException If failed. */ - public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException { + public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException { return publicJCache(cacheName, true); } @@ -2677,7 +2578,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - @Nullable public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted) + @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { if (log.isDebugEnabled()) @@ -2685,7 +2586,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { String masked = maskNull(cacheName); - IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(masked); + IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked); DynamicCacheDescriptor desc = registeredCaches.get(masked); @@ -2695,7 +2596,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cache == null) cache = startJCache(cacheName, failIfNotStarted); - return cache; + return (IgniteCacheProxy<K, V>)cache; } /** @@ -2705,7 +2606,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache instance for given name. * @throws IgniteCheckedException If failed. */ - private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { + private IgniteCacheProxy startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { String masked = maskNull(cacheName); DynamicCacheDescriptor desc = registeredCaches.get(masked); @@ -2735,7 +2636,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { F.first(initiateCacheChanges(F.asList(req))).get(); - IgniteCache cache = jCacheProxies.get(masked); + IgniteCacheProxy cache = jCacheProxies.get(masked); if (cache == null && failIfNotStarted) throw new IllegalArgumentException("Cache is not started: " + cacheName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 163e09a..ef04ff4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1697,7 +1697,7 @@ public class GridCacheUtils { /** * @param aff Affinity. * @param n Node. - * @return Predicate that evaulates to {@code true} if entry is primary for node. + * @return Predicate that evaluates to {@code true} if entry is primary for node. */ public static CacheEntryPredicate cachePrimary( final Affinity aff, @@ -1799,15 +1799,19 @@ public class GridCacheUtils { return res; } + /** * @param node Node. - * @return {@code True} if flag {@link IgniteConfiguration#isClientMode()} is set given node. + * @param filter Node filter. + * @return {@code True} if node is not client node and pass given filter. */ - public static boolean clientModeNode(ClusterNode node) { + public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) { Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); assert clientModeAttr != null : node; - return clientModeAttr != null && clientModeAttr; + boolean clientMode = clientModeAttr != null && clientModeAttr; + + return !clientMode && filter.apply(node); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java index 0186a90..0790052 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java @@ -84,9 +84,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { @Override public int[] primaryPartitions(ClusterNode n) { A.notNull(n, "n"); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); - - Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer); + Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topologyVersion()); return U.toIntArray(parts); } @@ -95,9 +93,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { @Override public int[] backupPartitions(ClusterNode n) { A.notNull(n, "n"); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); - - Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer); + Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topologyVersion()); return U.toIntArray(parts); } @@ -108,7 +104,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { Collection<Integer> parts = new HashSet<>(); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); + AffinityTopologyVersion topVer = topologyVersion(); for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) { for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 4153c5f..633f237 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -824,10 +824,6 @@ public class GridDhtPartitionDemandPool<K, V> { log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']'); try { - cctx.kernalContext().cache().systemCachesStartFuture().get(); - - cctx.kernalContext().cache().marshallerCache().context().awaitStarted(); - cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get(); } catch (IgniteInterruptedCheckedException ignored) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index b6d4b40..72911af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -112,7 +112,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void onKernalStart() { if (ctx.config().isDaemon()) return; @@ -123,15 +123,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert utilityCache != null; if (atomicCfg != null) { - IgniteInternalCache atomicsCache = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME); - - if (atomicsCache == null) { - assert ctx.config().isClientMode() : "Atomics cache is missed on server node."; - - ctx.cache().systemCachesStartFuture().get(); - - atomicsCache = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME); - } + IgniteInternalCache atomicsCache = ctx.cache().atomicsCache(); assert atomicsCache != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java index f0af2c1..467349f 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java @@ -55,9 +55,6 @@ public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest { /** */ private static final String CACHE4 = "cache4"; - /** */ - private static final String CACHE5 = "cache5"; - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java new file mode 100644 index 0000000..9745ad8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; + +/** + * Tests that cache specified in configuration start on client nodes. + */ +public class IgniteDynamicClientCacheStartSelfTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private CacheConfiguration ccfg; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + if (ccfg != null) + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testConfiguredCacheOnClientNode() throws Exception { + ccfg = new CacheConfiguration(); + + final String cacheName = null; + + Ignite ignite0 = startGrid(0); + + checkCache(ignite0, cacheName, true, false); + + client = true; + + Ignite ignite1 = startGrid(1); + + checkCache(ignite1, cacheName, false, false); + + ccfg = new CacheConfiguration(); + + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + Ignite ignite2 = startGrid(2); + + checkCache(ignite2, cacheName, false, true); + + ccfg = null; + + Ignite ignite3 = startGrid(3); + + checkNoCache(ignite3, cacheName); + + assertNotNull(ignite3.cache(cacheName)); + + checkCache(ignite3, cacheName, false, false); + + Ignite ignite4 = startGrid(4); + + checkNoCache(ignite4, cacheName); + + assertNotNull(ignite4.createNearCache(cacheName, new NearCacheConfiguration<>())); + + checkCache(ignite4, cacheName, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testNearCacheStartError() throws Exception { + ccfg = new CacheConfiguration(); + + final String cacheName = null; + + Ignite ignite0 = startGrid(0); + + checkCache(ignite0, cacheName, true, false); + + client = true; + + final Ignite ignite1 = startGrid(1); + + checkCache(ignite1, cacheName, false, false); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ignite1.getOrCreateNearCache(cacheName, new NearCacheConfiguration<>()); + + return null; + } + }, IgniteException.class, null); + + checkCache(ignite1, cacheName, false, false); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ignite1.createNearCache(cacheName, new NearCacheConfiguration<>()); + + return null; + } + }, IgniteException.class, null); + + checkCache(ignite1, cacheName, false, false); + } + + /** + * @param ignite Node. + * @param cacheName Cache name + * @param srv {@code True} if server cache is expected. + * @param near {@code True} if near cache is expected. + */ + private void checkCache(Ignite ignite, String cacheName, boolean srv, boolean near) { + GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); + + assertNotNull("No cache on node " + ignite.name(), cache); + + assertEquals(near, cache.context().isNear()); + + if (near) + cache = ((GridNearCacheAdapter)cache).dht(); + + if (srv) + assertSame(GridCacheConcurrentMap.class, cache.map().getClass()); + else + assertSame(GridNoStorageCacheMap.class, cache.map().getClass()); + + ClusterNode node = ((IgniteKernal)ignite).localNode(); + + for (Ignite ignite0 : Ignition.allGrids()) { + GridDiscoveryManager disco = ((IgniteKernal)ignite0).context().discovery(); + + assertTrue(disco.cacheNode(node, cacheName)); + assertEquals(srv, disco.cacheAffinityNode(node, cacheName)); + assertEquals(near, disco.cacheNearNode(node, cacheName)); + + if (srv) + assertTrue(ignite0.affinity(null).primaryPartitions(node).length > 0); + else + assertEquals(0, ignite0.affinity(null).primaryPartitions(node).length); + } + + assertNotNull(ignite.cache(cacheName)); + } + + /** + * @param ignite Node. + * @param cacheName Cache name. + */ + private void checkNoCache(Ignite ignite, String cacheName) { + GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); + + assertNull("Unexpected cache on node " + ignite.name(), cache); + + ClusterNode node = ((IgniteKernal)ignite).localNode(); + + for (Ignite ignite0 : Ignition.allGrids()) { + GridDiscoveryManager disco = ((IgniteKernal)ignite0).context().discovery(); + + assertFalse(disco.cacheNode(node, cacheName)); + assertFalse(disco.cacheAffinityNode(node, cacheName)); + assertFalse(disco.cacheNearNode(node, cacheName)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java index 52ef7e2..a7b2df6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java @@ -76,7 +76,7 @@ public class IgniteSystemCacheOnClientTest extends GridCommonAbstractTest { assertNotNull(marshCache); - assertTrue("Marshaller cache on client should have near cache", marshCache.context().isNear()); + assertFalse(marshCache.context().isNear()); marshCache = ((IgniteKernal)ignite(0)).internalCache(CU.MARSH_CACHE_NAME); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index b3eb899..18cc453 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -101,6 +101,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class); suite.addTestSuite(IgniteCacheConfigurationTemplateTest.class); suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class); + suite.addTestSuite(IgniteDynamicClientCacheStartSelfTest.class); suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);