http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 52c71c1,de180ee..9dfb356 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@@ -184,8 -180,11 +183,11 @@@ public class GridCacheContext<K, V> imp /** Cache weak query iterator holder. */ private CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder; + /** Affinity node. */ + private boolean affNode; + /** Conflict resolver. */ - private GridCacheVersionAbstractConflictResolver conflictRslvr; + private CacheVersionConflictResolver conflictRslvr; /** */ private CacheObjectContext cacheObjCtx;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 407b8d5,c918ed4..598130a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -924,10 -895,574 +895,577 @@@ public class GridCacheProcessor extend } /** - * Creates shared context. - * - * @param kernalCtx Kernal context. - * @return Shared context. + * @param cache Cache. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException { + GridCacheContext<?, ?> ctx = cache.context(); + + // Start DHT cache as well. + if (isNearEnabled(ctx)) { + GridDhtCacheAdapter dht = ctx.near().dht(); + + GridCacheContext<?, ?> dhtCtx = dht.context(); + + for (GridCacheManager mgr : dhtManagers(dhtCtx)) + mgr.onKernalStart(); + + dht.onKernalStart(); + + if (log.isDebugEnabled()) + log.debug("Executed onKernalStart() callback for DHT cache: " + dht.name()); + } + + for (GridCacheManager mgr : F.view(ctx.managers(), F0.notContains(dhtExcludes(ctx)))) + mgr.onKernalStart(); + + cache.onKernalStart(); + + if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED)) + ctx.events().addEvent(EventType.EVT_CACHE_STARTED); + + if (log.isDebugEnabled()) + log.debug("Executed onKernalStart() callback for cache [name=" + cache.name() + ", mode=" + + cache.configuration().getCacheMode() + ']'); + } + + /** + * @param cache Cache to stop. + * @param cancel Cancel flag. + */ + @SuppressWarnings("unchecked") + private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) { + GridCacheContext ctx = cache.context(); + + if (isNearEnabled(ctx)) { + GridDhtCacheAdapter dht = ctx.near().dht(); + + if (dht != null) { + GridCacheContext<?, ?> dhtCtx = dht.context(); + + for (GridCacheManager mgr : dhtManagers(dhtCtx)) + mgr.onKernalStop(cancel); + + dht.onKernalStop(); + } + } + + List<GridCacheManager> mgrs = ctx.managers(); + + Collection<GridCacheManager> excludes = dhtExcludes(ctx); + + // Reverse order. + for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) { + GridCacheManager mgr = it.previous(); + + if (!excludes.contains(mgr)) + mgr.onKernalStop(cancel); + } + + cache.onKernalStop(); + + if (ctx.events().isRecordable(EventType.EVT_CACHE_STOPPED)) + ctx.events().addEvent(EventType.EVT_CACHE_STOPPED); + } + + /** + * @param cfg Cache configuration to use to create cache. + * @return Cache context. + * @throws IgniteCheckedException If failed to create cache. + */ + @SuppressWarnings({"unchecked"}) + private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, CacheObjectContext cacheObjCtx) throws IgniteCheckedException { + assert cfg != null; + + CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null; + + validate(ctx.config(), cfg, cfgStore); + + CacheJtaManagerAdapter jta = JTA.create(cfg.getTransactionManagerLookupClassName() == null); + + jta.createTmLookup(cfg); + + // Skip suggestions for system caches. + if (!sysCaches.contains(maskNull(cfg.getName()))) + suggestOptimizations(cfg, cfgStore != null); + + Collection<Object> toPrepare = new ArrayList<>(); + + toPrepare.add(jta.tmLookup()); + + if (cfgStore instanceof GridCacheLoaderWriterStore) { + toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader()); + toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer()); + } + else + toPrepare.add(cfgStore); + + prepare(cfg, toPrepare); + + U.startLifecycleAware(lifecycleAwares(cfg, jta.tmLookup(), cfgStore)); + + GridCacheAffinityManager affMgr = new GridCacheAffinityManager(); + GridCacheEventManager evtMgr = new GridCacheEventManager(); + GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || !GridCacheUtils.isNearEnabled(cfg)); + GridCacheEvictionManager evictMgr = new GridCacheEvictionManager(); + GridCacheQueryManager qryMgr = queryManager(cfg); + CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager(); + CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager(); + GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); + GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class); ++ CacheConflictManager rslvrMgr = ctx.createComponent(CacheConflictManager.class); + + GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, sesHolders, cfgStore, cfg); + + GridCacheContext<?, ?> cacheCtx = new GridCacheContext( + ctx, + sharedCtx, + cfg, + ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), + + /* + * Managers in starting order! + * =========================== + */ + evtMgr, + swapMgr, + storeMgr, + evictMgr, + qryMgr, + contQryMgr, + affMgr, + dataStructuresMgr, + ttlMgr, + drMgr, - jta); ++ jta, ++ rslvrMgr); + + cacheCtx.cacheObjectContext(cacheObjCtx); + + GridCacheAdapter cache = null; + + switch (cfg.getCacheMode()) { + case LOCAL: { + switch (cfg.getAtomicityMode()) { + case TRANSACTIONAL: { + cache = new GridLocalCache(cacheCtx); + + break; + } + case ATOMIC: { + cache = new GridLocalAtomicCache(cacheCtx); + + break; + } + + default: { + assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); + } + } + + break; + } + case PARTITIONED: + case REPLICATED: { + if (GridCacheUtils.isNearEnabled(cfg)) { + switch (cfg.getAtomicityMode()) { + case TRANSACTIONAL: { + cache = new GridNearTransactionalCache(cacheCtx); + + break; + } + case ATOMIC: { + cache = new GridNearAtomicCache(cacheCtx); + + break; + } + + default: { + assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); + } + } + } + else { + switch (cfg.getAtomicityMode()) { + case TRANSACTIONAL: { + cache = cacheCtx.affinityNode() ? + new GridDhtColocatedCache(cacheCtx) : + new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + + break; + } + case ATOMIC: { + cache = cacheCtx.affinityNode() ? + new GridDhtAtomicCache(cacheCtx) : + new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + + break; + } + + default: { + assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); + } + } + } + + break; + } + + default: { + assert false : "Invalid cache mode: " + cfg.getCacheMode(); + } + } + + cacheCtx.cache(cache); + + GridCacheContext<?, ?> ret = cacheCtx; + + /* + * Create DHT cache. + * ================ + */ + if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { + /* + * Specifically don't create the following managers + * here and reuse the one from Near cache: + * 1. GridCacheVersionManager + * 2. GridCacheIoManager + * 3. GridCacheDeploymentManager + * 4. GridCacheQueryManager (note, that we start it for DHT cache though). + * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though). + * 6. GridCacheDgcManager + * 7. GridCacheTtlManager. + * =============================================== + */ + swapMgr = new GridCacheSwapManager(true); + evictMgr = new GridCacheEvictionManager(); + evtMgr = new GridCacheEventManager(); + drMgr = ctx.createComponent(GridCacheDrManager.class); + + cacheCtx = new GridCacheContext( + ctx, + sharedCtx, + cfg, + ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), + + /* + * Managers in starting order! + * =========================== + */ + evtMgr, + swapMgr, + storeMgr, + evictMgr, + qryMgr, + contQryMgr, + affMgr, + dataStructuresMgr, + ttlMgr, + drMgr, - jta); ++ jta, ++ rslvrMgr); + + cacheCtx.cacheObjectContext(cacheObjCtx); + + GridDhtCacheAdapter dht = null; + + switch (cfg.getAtomicityMode()) { + case TRANSACTIONAL: { + assert cache instanceof GridNearTransactionalCache; + + GridNearTransactionalCache near = (GridNearTransactionalCache)cache; + + GridDhtCache dhtCache = cacheCtx.affinityNode() ? + new GridDhtCache(cacheCtx) : + new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + + dhtCache.near(near); + + near.dht(dhtCache); + + dht = dhtCache; + + break; + } + case ATOMIC: { + assert cache instanceof GridNearAtomicCache; + + GridNearAtomicCache near = (GridNearAtomicCache)cache; + + GridDhtAtomicCache dhtCache = cacheCtx.affinityNode() ? + new GridDhtAtomicCache(cacheCtx) : + new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + + dhtCache.near(near); + + near.dht(dhtCache); + + dht = dhtCache; + + break; + } + + default: { + assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); + } + } + + cacheCtx.cache(dht); + } + + return ret; + } + + /** + * Gets a collection of currentlty started caches. + * + * @return Collection of started cache names. + */ + public Collection<String> cacheNames() { + return F.viewReadOnly(registeredCaches.keySet(), + new IgniteClosure<String, String>() { + @Override public String apply(String s) { + return unmaskNull(s); + } + }); + } + + /** + * Gets cache mode. + * + * @param cacheName Cache name to check. + * @return Cache mode. + */ + public CacheMode cacheMode(String cacheName) { + DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName)); + + return desc != null ? desc.cacheConfiguration().getCacheMode() : null; + } + + /** + * @param req Request to check. + * @return {@code True} if change request was registered to apply. + */ + @SuppressWarnings("IfMayBeConditional") + public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) { + DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); + + if (desc != null) { + if (desc.deploymentId().equals(req.deploymentId())) { + if (req.start()) + return !desc.cancelled(); + else + return desc.cancelled(); + } + + // If client requested cache start + if (req.initiatingNodeId() != null) + return true; + } + + return false; + } + + /** + * @param reqs Requests to start. + * @throws IgniteCheckedException If failed to start cache. + */ + @SuppressWarnings("TypeMayBeWeakened") + public void prepareCachesStart( + Collection<DynamicCacheChangeRequest> reqs, + AffinityTopologyVersion topVer + ) throws IgniteCheckedException { + for (DynamicCacheChangeRequest req : reqs) { + assert req.start(); + + prepareCacheStart( + req.startCacheConfiguration(), + req.nearCacheConfiguration(), + req.clientStartOnly(), + req.initiatingNodeId(), + req.deploymentId(), + topVer + ); + } + + // Start statically configured caches received from remote nodes during exchange. + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (desc.staticallyConfigured() && !desc.locallyConfigured()) { + if (desc.onStart()) { + prepareCacheStart( + desc.cacheConfiguration(), + null, + false, + null, + desc.deploymentId(), + topVer + ); + } + } + } + } + + /** + * @param cfg Start configuration. + * @param nearCfg Near configuration. + * @param clientStartOnly Client only start request. + * @param initiatingNodeId Initiating node ID. + * @param deploymentId Deployment ID. + */ + private void prepareCacheStart( + CacheConfiguration cfg, + NearCacheConfiguration nearCfg, + boolean clientStartOnly, + UUID initiatingNodeId, + IgniteUuid deploymentId, + AffinityTopologyVersion topVer + ) throws IgniteCheckedException { + CacheConfiguration ccfg = new CacheConfiguration(cfg); + + IgnitePredicate nodeFilter = ccfg.getNodeFilter(); + + ClusterNode locNode = ctx.discovery().localNode(); + + boolean affNodeStart = !clientStartOnly && nodeFilter.apply(locNode); + boolean clientNodeStart = locNode.id().equals(initiatingNodeId); + + if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null) + return; + + if (affNodeStart || clientNodeStart) { + if (clientNodeStart && !affNodeStart) { + if (nearCfg != null) + ccfg.setNearConfiguration(nearCfg); + } + + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null, ccfg.getName(), ccfg); + + GridCacheContext cacheCtx = createCache(ccfg, cacheObjCtx); + + cacheCtx.startTopologyVersion(topVer); + + cacheCtx.dynamicDeploymentId(deploymentId); + + sharedCtx.addCacheContext(cacheCtx); + + caches.put(maskNull(cacheCtx.name()), cacheCtx.cache()); + + startCache(cacheCtx.cache()); + onKernalStart(cacheCtx.cache()); + } + } + + /** + * @param req Stop request. + */ + public void blockGateway(DynamicCacheChangeRequest req) { + assert req.stop(); + + // Break the proxy before exchange future is done. + IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName())); + + if (proxy != null) + proxy.gate().block(); + } + + /** + * @param req Request. + */ + private void stopGateway(DynamicCacheChangeRequest req) { + assert req.stop(); + + // Break the proxy before exchange future is done. + IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName())); + + if (proxy != null) + proxy.gate().onStopped(); + } + + /** + * @param req Stop request. + */ + public void prepareCacheStop(DynamicCacheChangeRequest req) { + assert req.stop(); + + GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName())); + + if (cache != null) { + GridCacheContext<?, ?> ctx = cache.context(); + + sharedCtx.removeCacheContext(ctx); + + assert req.deploymentId().equals(ctx.dynamicDeploymentId()) : "Different deployment IDs [req=" + req + + ", ctxDepId=" + ctx.dynamicDeploymentId() + ']'; + + onKernalStop(cache, true); + stopCache(cache, true); + } + } + + /** + * Callback invoked when first exchange future for dynamic cache is completed. + * + * @param topVer Completed topology version. + * @param reqs Change requests. + */ + @SuppressWarnings("unchecked") + public void onExchangeDone( + AffinityTopologyVersion topVer, + Collection<DynamicCacheChangeRequest> reqs, + Throwable err + ) { + for (GridCacheAdapter<?, ?> cache : caches.values()) { + GridCacheContext<?, ?> cacheCtx = cache.context(); + + if (F.eq(cacheCtx.startTopologyVersion(), topVer)) { + cacheCtx.preloader().onInitialExchangeComplete(err); + + String masked = maskNull(cacheCtx.name()); + + jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false)); + } + } + + if (!F.isEmpty(reqs) && err == null) { + for (DynamicCacheChangeRequest req : reqs) { + String masked = maskNull(req.cacheName()); + + if (req.stop()) { + stopGateway(req); + + prepareCacheStop(req); + + DynamicCacheDescriptor desc = registeredCaches.get(masked); + + if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) + registeredCaches.remove(masked, desc); + } + + completeStartFuture(req); + } + } + } + + /** + * @param req Request to complete future for. + */ + public void completeStartFuture(DynamicCacheChangeRequest req) { + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); + + assert req.deploymentId() != null; + assert fut == null || fut.deploymentId != null; + + if (fut != null && fut.deploymentId().equals(req.deploymentId()) && + F.eq(req.initiatingNodeId(), ctx.localNodeId())) + fut.onDone(); + } + + /** + * Creates shared context. + * + * @param kernalCtx Kernal context. + * @return Shared context. */ @SuppressWarnings("unchecked") private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java index 05311db,0000000..a18eedd mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java @@@ -1,85 -1,0 +1,85 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; + +/** + * Cache version conflict resolver. + */ +public class CacheVersionConflictResolver { + /** + * Resolve the conflict. + * + * @param oldEntry Old entry. + * @param newEntry New entry. + * @param atomicVerComparator Whether to use atomic version comparator. + * @return Conflict resolution context. + * @throws IgniteCheckedException If failed. + */ + public <K, V> GridCacheVersionConflictContext<K, V> resolve(GridCacheVersionedEntryEx<K, V> oldEntry, + GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComparator) throws IgniteCheckedException { + GridCacheVersionConflictContext<K, V> ctx = new GridCacheVersionConflictContext<>(oldEntry, newEntry); + + resolve0(ctx, oldEntry, newEntry, atomicVerComparator); + + return ctx; + } + + /** + * Internal conflict resolution routine. + * + * @param ctx Context. + * @param oldEntry Old entry. + * @param newEntry New entry. + * @param atomicVerComparator Whether to use atomic version comparator. + * @throws IgniteCheckedException If failed. + */ + protected <K, V> void resolve0(GridCacheVersionConflictContext<K, V> ctx, + GridCacheVersionedEntryEx<K, V> oldEntry, GridCacheVersionedEntryEx<K, V> newEntry, + boolean atomicVerComparator) throws IgniteCheckedException { + if (newEntry.dataCenterId() != oldEntry.dataCenterId()) + ctx.useNew(); + else { + if (oldEntry.isStartVersion()) + ctx.useNew(); + else { + if (atomicVerComparator) { + // Handle special case when version check using ATOMIC cache comparator is required. - if (GridCacheMapEntry.ATOMIC_VER_COMPARATOR.compare(oldEntry.version(), newEntry.version()) >= 0) ++ if (GridCacheMapEntry.ATOMIC_VER_COMPARATOR.compare(oldEntry.version(), newEntry.version(), false) >= 0) + ctx.useOld(); + else + ctx.useNew(); + } + else { + long topVerDiff = newEntry.topologyVersion() - oldEntry.topologyVersion(); + + if (topVerDiff > 0) + ctx.useNew(); + else if (topVerDiff < 0) + ctx.useOld(); + else if (newEntry.order() > oldEntry.order()) + ctx.useNew(); + else + ctx.useOld(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ----------------------------------------------------------------------