IGNITE-45 - Reshuffling code to reuse in dynamic cache start.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/501bd5c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/501bd5c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/501bd5c3 Branch: refs/heads/ignite-45 Commit: 501bd5c39d7626094dad2e4068f823a4e05dcf42 Parents: fe2985b Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Mar 3 15:20:48 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Mar 3 15:20:48 2015 -0800 ---------------------------------------------------------------------- .../affinity/AffinityTopologyVersion.java | 12 +- .../processors/cache/GridCacheProcessor.java | 964 ++++++++++--------- .../cache/GridCacheSharedContext.java | 14 +- .../cache/IgniteDynamicCacheStartSelfTest.java | 4 +- 4 files changed, 522 insertions(+), 472 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/501bd5c3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java index fc5f193..be6fae5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java @@ -35,6 +35,13 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi private long topVer; /** + * Empty constructor required by {@link Externalizable}. + */ + public AffinityTopologyVersion() { + // No-op. + } + + /** * @param ver Version. */ public AffinityTopologyVersion(long ver) { @@ -69,10 +76,7 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi /** {@inheritDoc} */ @Override public boolean equals(Object o) { - if (!(o instanceof AffinityTopologyVersion)) - return false; - - return topVer == ((AffinityTopologyVersion)o).topVer; + return o instanceof AffinityTopologyVersion && topVer == ((AffinityTopologyVersion)o).topVer; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/501bd5c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5b4852e..260cab0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -111,6 +111,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Dynamic caches. */ private ConcurrentMap<String, DynamicCacheDescriptor> dynamicCaches = new ConcurrentHashMap<>(); + /** */ + private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>(); + /** * @param ctx Kernal context. */ @@ -592,358 +595,598 @@ public class GridCacheProcessor extends GridProcessorAdapter { Collection<GridCacheAdapter<?, ?>> startSeq = new ArrayList<>(cfgs.length); - IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>(); - for (int i = 0; i < cfgs.length; i++) { CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]); // Initialize defaults. initialize(cfg); - CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null; + cfgs[i] = cfg; // Replace original configuration value. - validate(ctx.config(), cfg, cfgStore); + if (caches.containsKey(cfg.getName())) { + String cacheName = cfg.getName(); - CacheJtaManagerAdapter jta = JTA.create(cfg.getTransactionManagerLookupClassName() == null); + if (cacheName != null) + throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + + "assign unique name to each cache): " + cacheName); + else + throw new IgniteCheckedException("Default cache has already been configured (check configuration and " + + "assign unique name to each cache)."); + } - jta.createTmLookup(cfg); + GridCacheContext cacheCtx = createCache(cfg); - // Skip suggestions for system caches. - if (!sysCaches.contains(cfg.getName())) - suggestOptimizations(cfg, cfgStore != null); + sharedCtx.addCacheContext(cacheCtx); - List<Object> toPrepare = new ArrayList<>(); + startSeq.add(cacheCtx.cache()); - toPrepare.add(jta.tmLookup()); - toPrepare.add(cfgStore); + caches.put(cfg.getName(), cacheCtx.cache()); - if (cfgStore instanceof GridCacheLoaderWriterStore) { - toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader()); - toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer()); - } + if (sysCaches.contains(cfg.getName())) + stopSeq.addLast(cacheCtx.cache()); + else + stopSeq.addFirst(cacheCtx.cache()); + } - prepare(cfg, toPrepare.toArray(new Object[toPrepare.size()])); + // Start shared managers. + for (GridCacheSharedManager mgr : sharedCtx.managers()) + mgr.start(sharedCtx); - U.startLifecycleAware(lifecycleAwares(cfg, jta.tmLookup(), cfgStore)); + for (GridCacheAdapter<?, ?> cache : startSeq) + startCache(cache); - // Init default key mapper. - CacheAffinityKeyMapper dfltAffMapper; + for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { + GridCacheAdapter cache = e.getValue(); - if (cfg.getAffinityMapper().getClass().equals(GridCacheDefaultAffinityKeyMapper.class)) - dfltAffMapper = cfg.getAffinityMapper(); - else { - dfltAffMapper = new GridCacheDefaultAffinityKeyMapper(); + proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); - prepare(cfg, dfltAffMapper, false); - } + jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false)); + } - cfgs[i] = cfg; // Replace original configuration value. + // Internal caches which should not be returned to user. + for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { + GridCacheAdapter cache = e.getValue(); - GridCacheAffinityManager affMgr = new GridCacheAffinityManager(); - GridCacheEventManager evtMgr = new GridCacheEventManager(); - GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || !GridCacheUtils.isNearEnabled(cfg)); - GridCacheEvictionManager evictMgr = new GridCacheEvictionManager(); - GridCacheQueryManager qryMgr = queryManager(cfg); - CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager(); - CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager(); - GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); - GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class); - IgniteCacheSerializationManager serMgr = ctx.createComponent(IgniteCacheSerializationManager.class); + if (!sysCaches.contains(e.getKey())) + publicProxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); + } - GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, sesHolders, cfgStore, cfg); + transactions = new IgniteTransactionsImpl(sharedCtx); - GridCacheContext<?, ?> cacheCtx = new GridCacheContext( - ctx, - sharedCtx, - cfg, + if (log.isDebugEnabled()) + log.debug("Started cache processor."); + } - /* - * Managers in starting order! - * =========================== - */ - evtMgr, - swapMgr, - serMgr, - storeMgr, - evictMgr, - qryMgr, - contQryMgr, - affMgr, - dataStructuresMgr, - ttlMgr, - drMgr, - jta); + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void onKernalStart() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; - cacheCtx.defaultAffMapper(dfltAffMapper); + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) + checkCache(n); + } - GridCacheAdapter cache = null; + for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { + GridCacheAdapter cache = e.getValue(); - switch (cfg.getCacheMode()) { - case LOCAL: { - switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { - cache = new GridLocalCache(cacheCtx); + if (maxPreloadOrder > 0) { + CacheConfiguration cfg = cache.configuration(); - break; - } - case ATOMIC: { - cache = new GridLocalAtomicCache(cacheCtx); + int order = cfg.getPreloadOrder(); - break; - } + if (order > 0 && order != maxPreloadOrder && cfg.getCacheMode() != LOCAL) { + GridCompoundFuture<Object, Object> fut = (GridCompoundFuture<Object, Object>)preloadFuts + .get(order); - default: { - assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); - } - } + if (fut == null) { + fut = new GridCompoundFuture<>(ctx); - break; - } - case PARTITIONED: - case REPLICATED: { - if (GridCacheUtils.isNearEnabled(cfg)) { - switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { - cache = new GridNearTransactionalCache(cacheCtx); - - break; - } - case ATOMIC: { - cache = new GridNearAtomicCache(cacheCtx); - - break; - } - - default: { - assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); - } - } - } - else { - switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { - cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtColocatedCache(cacheCtx) : - new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); - - break; - } - case ATOMIC: { - cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) : - new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); - - break; - } - - default: { - assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); - } - } + preloadFuts.put(order, fut); } - break; - } - - default: { - assert false : "Invalid cache mode: " + cfg.getCacheMode(); + fut.add(cache.preloader().syncFuture()); } } + } - cacheCtx.cache(cache); + for (IgniteInternalFuture<?> fut : preloadFuts.values()) + ((GridCompoundFuture<Object, Object>)fut).markInitialized(); - if (caches.containsKey(cfg.getName())) { - String cacheName = cfg.getName(); + for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) + mgr.onKernalStart(); - if (cacheName != null) - throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + - "assign unique name to each cache): " + cacheName); - else - throw new IgniteCheckedException("Default cache has already been configured (check configuration and " + - "assign unique name to each cache)."); + for (GridCacheAdapter<?, ?> cache : caches.values()) + onKernalStart(cache); + + // Wait for caches in SYNC preload mode. + for (GridCacheAdapter<?, ?> cache : caches.values()) { + CacheConfiguration cfg = cache.configuration(); + + if (cfg.getPreloadMode() == SYNC) { + if (cfg.getCacheMode() == REPLICATED || + (cfg.getCacheMode() == PARTITIONED && cfg.getPreloadPartitionedDelay() >= 0)) + cache.preloader().syncFuture().get(); } + } - caches.put(cfg.getName(), cache); + ctx.portable().onCacheProcessorStarted(); + } - if (sysCaches.contains(cfg.getName())) - stopSeq.addLast(cache); - else - stopSeq.addFirst(cache); + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void stop(boolean cancel) throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; - startSeq.add(cache); + for (GridCacheAdapter<?, ?> cache : stopSeq) + stopCache(cache, cancel); - /* - * Create DHT cache. - * ================ - */ - if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { - /* - * Specifically don't create the following managers - * here and reuse the one from Near cache: - * 1. GridCacheVersionManager - * 2. GridCacheIoManager - * 3. GridCacheDeploymentManager - * 4. GridCacheQueryManager (note, that we start it for DHT cache though). - * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though). - * 6. GridCacheDgcManager - * 7. GridCacheTtlManager. - * =============================================== - */ - swapMgr = new GridCacheSwapManager(true); - evictMgr = new GridCacheEvictionManager(); - evtMgr = new GridCacheEventManager(); - drMgr = ctx.createComponent(GridCacheDrManager.class); + List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers(); - cacheCtx = new GridCacheContext( - ctx, - sharedCtx, - cfg, + for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridCacheSharedManager<?, ?> mgr = it.previous(); - /* - * Managers in starting order! - * =========================== - */ - evtMgr, - swapMgr, - serMgr, - storeMgr, - evictMgr, - qryMgr, - contQryMgr, - affMgr, - dataStructuresMgr, - ttlMgr, - drMgr, - jta); - - cacheCtx.defaultAffMapper(dfltAffMapper); - - GridDhtCacheAdapter dht = null; + mgr.stop(cancel); + } - switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { - assert cache instanceof GridNearTransactionalCache; + sharedCtx.cleanup(); - GridNearTransactionalCache near = (GridNearTransactionalCache)cache; + if (log.isDebugEnabled()) + log.debug("Stopped cache processor."); + } - GridDhtCache dhtCache = !GridCacheUtils.isAffinityNode(cfg) ? - new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)) : - new GridDhtCache(cacheCtx); + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void onKernalStop(boolean cancel) { + if (ctx.config().isDaemon()) + return; - dhtCache.near(near); + for (GridCacheAdapter<?, ?> cache : stopSeq) + onKernalStop(cache, cancel); - near.dht(dhtCache); + List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers(); - dht = dhtCache; + for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size()); + it.hasPrevious();) { + GridCacheSharedManager<?, ?> mgr = it.previous(); - break; - } - case ATOMIC: { - assert cache instanceof GridNearAtomicCache; + mgr.onKernalStop(cancel); + } + } - GridNearAtomicCache near = (GridNearAtomicCache)cache; + /** + * @param cache Cache to start. + * @throws IgniteCheckedException If failed to start cache. + */ + @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) + private void startCache(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException { + GridCacheContext<?, ?> cacheCtx = cache.context(); - GridDhtAtomicCache dhtCache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) : - new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + CacheConfiguration cfg = cacheCtx.config(); - dhtCache.near(near); + // Start managers. + for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx)))) + mgr.start(cacheCtx); - near.dht(dhtCache); + cacheCtx.initConflictResolver(); - dht = dhtCache; + if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { + GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context(); - break; - } + // Start DHT managers. + for (GridCacheManager mgr : dhtManagers(dhtCtx)) + mgr.start(dhtCtx); - default: { - assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); - } - } + dhtCtx.initConflictResolver(); - cacheCtx.cache(dht); - } + // Start DHT cache. + dhtCtx.cache().start(); - sharedCtx.addCacheContext(cache.context()); + if (log.isDebugEnabled()) + log.debug("Started DHT cache: " + dhtCtx.cache().name()); } - // Start shared managers. - for (GridCacheSharedManager mgr : sharedCtx.managers()) - mgr.start(sharedCtx); + cacheCtx.cache().start(); - for (GridCacheAdapter<?, ?> cache : startSeq) { - GridCacheContext<?, ?> cacheCtx = cache.context(); + if (log.isInfoEnabled()) + log.info("Started cache [name=" + cfg.getName() + ", mode=" + cfg.getCacheMode() + ']'); + } + + /** + * @param cache Cache to stop. + * @param cancel Cancel flag. + */ + @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) + private void stopCache(GridCacheAdapter<?, ?> cache, boolean cancel) { + GridCacheContext ctx = cache.context(); - CacheConfiguration cfg = cacheCtx.config(); + sharedCtx.removeCacheContext(ctx); - // Start managers. - for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx)))) - mgr.start(cacheCtx); + cache.stop(); - cacheCtx.initConflictResolver(); + if (isNearEnabled(ctx)) { + GridDhtCacheAdapter dht = ctx.near().dht(); - if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { - GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context(); + // Check whether dht cache has been started. + if (dht != null) { + dht.stop(); - // Start DHT managers. - for (GridCacheManager mgr : dhtManagers(dhtCtx)) - mgr.start(dhtCtx); + GridCacheContext<?, ?> dhtCtx = dht.context(); - dhtCtx.initConflictResolver(); + List<GridCacheManager> dhtMgrs = dhtManagers(dhtCtx); - // Start DHT cache. - dhtCtx.cache().start(); + for (ListIterator<GridCacheManager> it = dhtMgrs.listIterator(dhtMgrs.size()); it.hasPrevious();) { + GridCacheManager mgr = it.previous(); - if (log.isDebugEnabled()) - log.debug("Started DHT cache: " + dhtCtx.cache().name()); + mgr.stop(cancel); + } } + } + + List<GridCacheManager> mgrs = ctx.managers(); + + Collection<GridCacheManager> excludes = dhtExcludes(ctx); - cacheCtx.cache().start(); + // Reverse order. + for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridCacheManager mgr = it.previous(); - if (log.isInfoEnabled()) - log.info("Started cache [name=" + cfg.getName() + ", mode=" + cfg.getCacheMode() + ']'); + if (!excludes.contains(mgr)) + mgr.stop(cancel); } - for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { - GridCacheAdapter cache = e.getValue(); + U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.jta().tmLookup(), + ctx.store().configuredStore())); - proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); + if (log.isInfoEnabled()) + log.info("Stopped cache: " + cache.name()); - jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false)); - } + cleanup(ctx); + } - // Internal caches which should not be returned to user. - for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { - GridCacheAdapter cache = e.getValue(); + /** + * @param cache Cache. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException { + GridCacheContext<?, ?> ctx = cache.context(); - if (!sysCaches.contains(e.getKey())) - publicProxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); + // Start DHT cache as well. + if (isNearEnabled(ctx)) { + GridDhtCacheAdapter dht = ctx.near().dht(); + + GridCacheContext<?, ?> dhtCtx = dht.context(); + + for (GridCacheManager mgr : dhtManagers(dhtCtx)) + mgr.onKernalStart(); + + dht.onKernalStart(); + + if (log.isDebugEnabled()) + log.debug("Executed onKernalStart() callback for DHT cache: " + dht.name()); } - transactions = new IgniteTransactionsImpl(sharedCtx); + for (GridCacheManager mgr : F.view(ctx.managers(), F0.notContains(dhtExcludes(ctx)))) + mgr.onKernalStart(); + + cache.onKernalStart(); if (log.isDebugEnabled()) - log.debug("Started cache processor."); + log.debug("Executed onKernalStart() callback for cache [name=" + cache.name() + ", mode=" + + cache.configuration().getCacheMode() + ']'); } /** - * Callback invoked when first exchange future for dynamic cache is completed. - * - * @param startDesc Cache start descriptor. + * @param cache Cache to stop. + * @param cancel Cancel flag. */ - public void onCacheStartFinished(DynamicCacheDescriptor startDesc) { - CacheConfiguration ccfg = startDesc.cacheConfiguration(); + @SuppressWarnings("unchecked") + private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) { + GridCacheContext ctx = cache.context(); - DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName()); + if (isNearEnabled(ctx)) { + GridDhtCacheAdapter dht = ctx.near().dht(); - if (fut != null && fut.startId().equals(startDesc.startId())) { - fut.onDone(); + if (dht != null) { + GridCacheContext<?, ?> dhtCtx = dht.context(); - pendingStarts.remove(ccfg.getName(), fut); + for (GridCacheManager mgr : dhtManagers(dhtCtx)) + mgr.onKernalStop(cancel); + + dht.onKernalStop(); + } + } + + List<GridCacheManager> mgrs = ctx.managers(); + + Collection<GridCacheManager> excludes = dhtExcludes(ctx); + + // Reverse order. + for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) { + GridCacheManager mgr = it.previous(); + + if (!excludes.contains(mgr)) + mgr.onKernalStop(cancel); } + + cache.onKernalStop(); } /** - * Creates shared context. - * - * @param kernalCtx Kernal context. + * @param cfg Cache configuration to use to create cache. + * @return Cache context. + * @throws IgniteCheckedException If failed to create cache. + */ + @SuppressWarnings( {"unchecked"}) + private GridCacheContext createCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException { + CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null; + + validate(ctx.config(), cfg, cfgStore); + + CacheJtaManagerAdapter jta = JTA.create(cfg.getTransactionManagerLookupClassName() == null); + + jta.createTmLookup(cfg); + + // Skip suggestions for system caches. + if (!sysCaches.contains(cfg.getName())) + suggestOptimizations(cfg, cfgStore != null); + + List<Object> toPrepare = new ArrayList<>(); + + toPrepare.add(jta.tmLookup()); + toPrepare.add(cfgStore); + + if (cfgStore instanceof GridCacheLoaderWriterStore) { + toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader()); + toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer()); + } + + prepare(cfg, toPrepare.toArray(new Object[toPrepare.size()])); + + U.startLifecycleAware(lifecycleAwares(cfg, jta.tmLookup(), cfgStore)); + + // Init default key mapper. + CacheAffinityKeyMapper dfltAffMapper; + + if (cfg.getAffinityMapper().getClass().equals(GridCacheDefaultAffinityKeyMapper.class)) + dfltAffMapper = cfg.getAffinityMapper(); + else { + dfltAffMapper = new GridCacheDefaultAffinityKeyMapper(); + + prepare(cfg, dfltAffMapper, false); + } + + GridCacheAffinityManager affMgr = new GridCacheAffinityManager(); + GridCacheEventManager evtMgr = new GridCacheEventManager(); + GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || !GridCacheUtils.isNearEnabled(cfg)); + GridCacheEvictionManager evictMgr = new GridCacheEvictionManager(); + GridCacheQueryManager qryMgr = queryManager(cfg); + CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager(); + CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager(); + GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); + GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class); + IgniteCacheSerializationManager serMgr = ctx.createComponent(IgniteCacheSerializationManager.class); + + GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, sesHolders, cfgStore, cfg); + + GridCacheContext<?, ?> cacheCtx = new GridCacheContext( + ctx, + sharedCtx, + cfg, + + /* + * Managers in starting order! + * =========================== + */ + evtMgr, + swapMgr, + serMgr, + storeMgr, + evictMgr, + qryMgr, + contQryMgr, + affMgr, + dataStructuresMgr, + ttlMgr, + drMgr, + jta); + + cacheCtx.defaultAffMapper(dfltAffMapper); + + GridCacheAdapter cache = null; + + switch (cfg.getCacheMode()) { + case LOCAL: { + switch (cfg.getAtomicityMode()) { + case TRANSACTIONAL: { + cache = new GridLocalCache(cacheCtx); + + break; + } + case ATOMIC: { + cache = new GridLocalAtomicCache(cacheCtx); + + break; + } + + default: { + assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); + } + } + + break; + } + case PARTITIONED: + case REPLICATED: { + if (GridCacheUtils.isNearEnabled(cfg)) { + switch (cfg.getAtomicityMode()) { + case TRANSACTIONAL: { + cache = new GridNearTransactionalCache(cacheCtx); + + break; + } + case ATOMIC: { + cache = new GridNearAtomicCache(cacheCtx); + + break; + } + + default: { + assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); + } + } + } + else { + switch (cfg.getAtomicityMode()) { + case TRANSACTIONAL: { + cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtColocatedCache(cacheCtx) : + new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + + break; + } + case ATOMIC: { + cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) : + new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + + break; + } + + default: { + assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); + } + } + } + + break; + } + + default: { + assert false : "Invalid cache mode: " + cfg.getCacheMode(); + } + } + + cacheCtx.cache(cache); + + GridCacheContext<?, ?> ret = cacheCtx; + + /* + * Create DHT cache. + * ================ + */ + if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { + /* + * Specifically don't create the following managers + * here and reuse the one from Near cache: + * 1. GridCacheVersionManager + * 2. GridCacheIoManager + * 3. GridCacheDeploymentManager + * 4. GridCacheQueryManager (note, that we start it for DHT cache though). + * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though). + * 6. GridCacheDgcManager + * 7. GridCacheTtlManager. + * =============================================== + */ + swapMgr = new GridCacheSwapManager(true); + evictMgr = new GridCacheEvictionManager(); + evtMgr = new GridCacheEventManager(); + drMgr = ctx.createComponent(GridCacheDrManager.class); + + cacheCtx = new GridCacheContext( + ctx, + sharedCtx, + cfg, + + /* + * Managers in starting order! + * =========================== + */ + evtMgr, + swapMgr, + serMgr, + storeMgr, + evictMgr, + qryMgr, + contQryMgr, + affMgr, + dataStructuresMgr, + ttlMgr, + drMgr, + jta); + + cacheCtx.defaultAffMapper(dfltAffMapper); + + GridDhtCacheAdapter dht = null; + + switch (cfg.getAtomicityMode()) { + case TRANSACTIONAL: { + assert cache instanceof GridNearTransactionalCache; + + GridNearTransactionalCache near = (GridNearTransactionalCache)cache; + + GridDhtCache dhtCache = !GridCacheUtils.isAffinityNode(cfg) ? + new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)) : + new GridDhtCache(cacheCtx); + + dhtCache.near(near); + + near.dht(dhtCache); + + dht = dhtCache; + + break; + } + case ATOMIC: { + assert cache instanceof GridNearAtomicCache; + + GridNearAtomicCache near = (GridNearAtomicCache)cache; + + GridDhtAtomicCache dhtCache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) : + new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + + dhtCache.near(near); + + near.dht(dhtCache); + + dht = dhtCache; + + break; + } + + default: { + assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); + } + } + + cacheCtx.cache(dht); + } + + return ret; + } + + /** + * Callback invoked when first exchange future for dynamic cache is completed. + * + * @param startDesc Cache start descriptor. + */ + public void onCacheStartFinished(DynamicCacheDescriptor startDesc) { + CacheConfiguration ccfg = startDesc.cacheConfiguration(); + + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName()); + + if (fut != null && fut.startId().equals(startDesc.startId())) { + fut.onDone(); + + pendingStarts.remove(ccfg.getName(), fut); + } + } + + /** + * Creates shared context. + * + * @param kernalCtx Kernal context. * @return Shared context. */ @SuppressWarnings("unchecked") @@ -1003,7 +1246,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param nodeFilter Node filter to select nodes on which the cache should be deployed. * @return Future that will be completed when cache is deployed. */ - public IgniteInternalFuture<?> startCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) { + public IgniteInternalFuture<?> dynamicStartCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) { if (nodeFilter == null) nodeFilter = F.alwaysTrue(); @@ -1048,9 +1291,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param startDesc Cache start descriptor. */ private void onCacheDeploymentRequested(DynamicCacheDescriptor startDesc) { - // TODO IGNITE-45 remove debug - U.debug(log, "Received start notification: " + startDesc); - CacheConfiguration ccfg = startDesc.cacheConfiguration(); // Check if cache with the same name was concurrently started form different node. @@ -1367,208 +1607,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param cache Cache. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException { - GridCacheContext<?, ?> ctx = cache.context(); - - // Start DHT cache as well. - if (isNearEnabled(ctx)) { - GridDhtCacheAdapter dht = ctx.near().dht(); - - GridCacheContext<?, ?> dhtCtx = dht.context(); - - for (GridCacheManager mgr : dhtManagers(dhtCtx)) - mgr.onKernalStart(); - - dht.onKernalStart(); - - if (log.isDebugEnabled()) - log.debug("Executed onKernalStart() callback for DHT cache: " + dht.name()); - } - - for (GridCacheManager mgr : F.view(ctx.managers(), F0.notContains(dhtExcludes(ctx)))) - mgr.onKernalStart(); - - cache.onKernalStart(); - - if (log.isDebugEnabled()) - log.debug("Executed onKernalStart() callback for cache [name=" + cache.name() + ", mode=" + - cache.configuration().getCacheMode() + ']'); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) - checkCache(n); - } - - for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { - GridCacheAdapter cache = e.getValue(); - - if (maxPreloadOrder > 0) { - CacheConfiguration cfg = cache.configuration(); - - int order = cfg.getPreloadOrder(); - - if (order > 0 && order != maxPreloadOrder && cfg.getCacheMode() != LOCAL) { - GridCompoundFuture<Object, Object> fut = (GridCompoundFuture<Object, Object>)preloadFuts - .get(order); - - if (fut == null) { - fut = new GridCompoundFuture<>(ctx); - - preloadFuts.put(order, fut); - } - - fut.add(cache.preloader().syncFuture()); - } - } - } - - for (IgniteInternalFuture<?> fut : preloadFuts.values()) - ((GridCompoundFuture<Object, Object>)fut).markInitialized(); - - for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) - mgr.onKernalStart(); - - for (GridCacheAdapter<?, ?> cache : caches.values()) - onKernalStart(cache); - - // Wait for caches in SYNC preload mode. - for (GridCacheAdapter<?, ?> cache : caches.values()) { - CacheConfiguration cfg = cache.configuration(); - - if (cfg.getPreloadMode() == SYNC) { - if (cfg.getCacheMode() == REPLICATED || - (cfg.getCacheMode() == PARTITIONED && cfg.getPreloadPartitionedDelay() >= 0)) - cache.preloader().syncFuture().get(); - } - } - - ctx.portable().onCacheProcessorStarted(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void onKernalStop(boolean cancel) { - if (ctx.config().isDaemon()) - return; - - for (GridCacheAdapter<?, ?> cache : stopSeq) { - GridCacheContext ctx = cache.context(); - - if (isNearEnabled(ctx)) { - GridDhtCacheAdapter dht = ctx.near().dht(); - - if (dht != null) { - GridCacheContext<?, ?> dhtCtx = dht.context(); - - for (GridCacheManager mgr : dhtManagers(dhtCtx)) - mgr.onKernalStop(cancel); - - dht.onKernalStop(); - } - } - - List<GridCacheManager> mgrs = ctx.managers(); - - Collection<GridCacheManager> excludes = dhtExcludes(ctx); - - // Reverse order. - for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) { - GridCacheManager mgr = it.previous(); - - if (!excludes.contains(mgr)) - mgr.onKernalStop(cancel); - } - - cache.onKernalStop(); - } - - List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers(); - - for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size()); - it.hasPrevious();) { - GridCacheSharedManager<?, ?> mgr = it.previous(); - - mgr.onKernalStop(cancel); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void stop(boolean cancel) throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - for (GridCacheAdapter<?, ?> cache : stopSeq) { - cache.stop(); - - GridCacheContext ctx = cache.context(); - - if (isNearEnabled(ctx)) { - GridDhtCacheAdapter dht = ctx.near().dht(); - - // Check whether dht cache has been started. - if (dht != null) { - dht.stop(); - - GridCacheContext<?, ?> dhtCtx = dht.context(); - - List<GridCacheManager> dhtMgrs = dhtManagers(dhtCtx); - - for (ListIterator<GridCacheManager> it = dhtMgrs.listIterator(dhtMgrs.size()); it.hasPrevious();) { - GridCacheManager mgr = it.previous(); - - mgr.stop(cancel); - } - } - } - - List<GridCacheManager> mgrs = ctx.managers(); - - Collection<GridCacheManager> excludes = dhtExcludes(ctx); - - // Reverse order. - for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { - GridCacheManager mgr = it.previous(); - - if (!excludes.contains(mgr)) - mgr.stop(cancel); - } - - U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.jta().tmLookup(), - ctx.store().configuredStore())); - - if (log.isInfoEnabled()) - log.info("Stopped cache: " + cache.name()); - - cleanup(ctx); - } - - List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers(); - - for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { - GridCacheSharedManager<?, ?> mgr = it.previous(); - - mgr.stop(cancel); - } - - sharedCtx.cleanup(); - - if (log.isDebugEnabled()) - log.debug("Stopped cache processor."); - } - - /** * Gets preload finish future for preload-ordered cache with given order. I.e. will get compound preload future * with maximum order less than {@code order}. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/501bd5c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index de9ec0e..e133a17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -36,6 +36,7 @@ import org.apache.ignite.marshaller.*; import org.jetbrains.annotations.*; import java.util.*; +import java.util.concurrent.*; import static org.apache.ignite.internal.processors.cache.CacheFlag.*; @@ -69,7 +70,7 @@ public class GridCacheSharedContext<K, V> { private GridCacheDeploymentManager<K, V> depMgr; /** Cache contexts map. */ - private Map<Integer, GridCacheContext<K, V>> ctxMap; + private ConcurrentMap<Integer, GridCacheContext<K, V>> ctxMap; /** Tx metrics. */ private volatile TransactionMetricsAdapter txMetrics; @@ -101,7 +102,7 @@ public class GridCacheSharedContext<K, V> { txMetrics = new TransactionMetricsAdapter(); - ctxMap = new HashMap<>(); + ctxMap = new ConcurrentHashMap<>(); } /** @@ -116,7 +117,7 @@ public class GridCacheSharedContext<K, V> { /** * Adds cache context to shared cache context. * - * @param cacheCtx Cache context. + * @param cacheCtx Cache context to add. */ @SuppressWarnings("unchecked") public void addCacheContext(GridCacheContext cacheCtx) throws IgniteCheckedException { @@ -132,6 +133,13 @@ public class GridCacheSharedContext<K, V> { } /** + * @param cacheCtx Cache context to remove. + */ + public void removeCacheContext(GridCacheContext cacheCtx) { + ctxMap.remove(cacheCtx.cacheId(), cacheCtx); + } + + /** * @return List of shared context managers in starting order. */ public List<GridCacheSharedManager<K, V>> managers() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/501bd5c3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index dddf4a2..5d515e3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -65,7 +65,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setName("TestCacheName"); - futs.add(kernal.context().cache().startCache(ccfg, F.<ClusterNode>alwaysTrue())); + futs.add(kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue())); return null; } @@ -109,7 +109,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount())); - futs.add(kernal.context().cache().startCache(ccfg, F.<ClusterNode>alwaysTrue())); + futs.add(kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue())); return null; }