http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 0ecaf97..3236bb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -97,7 +97,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private final AtomicReference<AffinityTopologyVersion> readyTopVer = new AtomicReference<>(AffinityTopologyVersion.NONE); - /** * Partition map futures. * This set also contains already completed exchange futures to address race conditions when coordinator @@ -150,8 +149,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana else { DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; - if (customEvt.data() instanceof DynamicCacheChangeBatch) { - DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.data(); + if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage(); Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size()); @@ -554,7 +553,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * Partition refresh callback. */ void refreshPartitions() { - ClusterNode oldest = CU.oldest(cctx); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + + if (oldest == null) { + if (log.isDebugEnabled()) + log.debug("Skip partitions refresh, there are no server nodes [loc=" + cctx.localNodeId() + ']'); + + return; + } if (log.isDebugEnabled()) log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); @@ -564,7 +570,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { - rmts = CU.remoteNodes(cctx); + rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE); if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); @@ -641,7 +647,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ private boolean sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last()); + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + cctx.kernalContext().clientNode(), + cctx.versions().last()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) { @@ -687,6 +695,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param exchId Exchange ID. * @param discoEvt Discovery event. + * @param reqs Cache change requests. * @return Exchange future. */ GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, @@ -696,9 +705,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture old = exchFuts.addx( fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs)); - if (old != null) + if (old != null) { fut = old; + if (reqs != null) + fut.cacheChangeRequests(reqs); + } + if (discoEvt != null) fut.onEvent(exchId, discoEvt); @@ -827,7 +840,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param node Node ID. * @param msg Message. */ - private void processSinglePartitionUpdate(ClusterNode node, GridDhtPartitionsSingleMessage msg) { + private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { if (!enterBusy()) return; @@ -858,8 +871,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (updated) scheduleResendPartitions(); } - else - exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); + else { + if (msg.client()) { + final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), + null, + null); + + exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + // Finished future should reply only to sender client node. + exchFut.onReceive(node.id(), msg); + } + }); + } + else + exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); + } } finally { leaveBusy(); @@ -982,7 +1009,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana busy = true; - Map<Integer, GridDhtPreloaderAssignments<K, V>> assignsMap = new HashMap<>(); + Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; boolean dummyReassign = exchFut.dummyReassign(); boolean forcePreload = exchFut.forcePreload(); @@ -1017,7 +1044,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana changed |= cacheCtx.topology().afterExchange(exchFut); // Preload event notification. - if (cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) { + if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) { if (!cacheCtx.isReplicated() || !startEvtFired) { DiscoveryEvent discoEvt = exchFut.discoveryEvent(); @@ -1043,16 +1070,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - long delay = cacheCtx.config().getRebalanceDelay(); + if (!exchFut.skipPreload()) { + assignsMap = new HashMap<>(); - GridDhtPreloaderAssignments<K, V> assigns = null; + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + long delay = cacheCtx.config().getRebalanceDelay(); + + GridDhtPreloaderAssignments assigns = null; - // Don't delay for dummy reassigns to avoid infinite recursion. - if (delay == 0 || forcePreload) - assigns = cacheCtx.preloader().assign(exchFut); + // Don't delay for dummy reassigns to avoid infinite recursion. + if (delay == 0 || forcePreload) + assigns = cacheCtx.preloader().assign(exchFut); - assignsMap.put(cacheCtx.cacheId(), assigns); + assignsMap.put(cacheCtx.cacheId(), assigns); + } } } finally { @@ -1061,7 +1092,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (assignsMap != null) { - for (Map.Entry<Integer, GridDhtPreloaderAssignments<K, V>> e : assignsMap.entrySet()) { + for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { int cacheId = e.getKey(); GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); @@ -1113,20 +1144,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** {@inheritDoc} */ @Override public void onTimeout() { - if (!busyLock.readLock().tryLock()) - return; + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + if (!busyLock.readLock().tryLock()) + return; - try { - if (started.compareAndSet(false, true)) - refreshPartitions(); - } - finally { - busyLock.readLock().unlock(); + try { + if (started.compareAndSet(false, true)) + refreshPartitions(); + } + finally { + busyLock.readLock().unlock(); - cctx.time().removeTimeoutObject(this); + cctx.time().removeTimeoutObject(ResendTimeoutObject.this); - pendingResend.compareAndSet(this, null); - } + pendingResend.compareAndSet(ResendTimeoutObject.this, null); + } + } + }); } /**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 2e181f9..e0f6181 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -30,7 +30,7 @@ import java.util.*; * Cache preloader that is responsible for loading cache entries either from remote * nodes (for distributed cache) or anywhere else at cache startup. */ -public interface GridCachePreloader<K, V> { +public interface GridCachePreloader { /** * Starts preloading. * @@ -78,7 +78,7 @@ public interface GridCachePreloader<K, V> { * @param exchFut Exchange future to assign. * @return Assignments. */ - public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut); + public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut); /** * Adds assignments to preloader. @@ -86,7 +86,7 @@ public interface GridCachePreloader<K, V> { * @param assignments Assignments to add. * @param forcePreload Force preload flag. */ - public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload); + public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload); /** * @param p Preload predicate. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 80d3d6b..b4f386f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -31,9 +31,9 @@ import java.util.*; /** * Adapter for preloading which always assumes that preloading finished. */ -public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> { +public class GridCachePreloaderAdapter implements GridCachePreloader { /** Cache context. */ - protected final GridCacheContext<K, V> cctx; + protected final GridCacheContext<?, ?> cctx; /** Logger.*/ protected final IgniteLogger log; @@ -50,7 +50,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> /** * @param cctx Cache context. */ - public GridCachePreloaderAdapter(GridCacheContext<K, V> cctx) { + public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) { assert cctx != null; this.cctx = cctx; @@ -126,17 +126,18 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> // No-op. } + /** {@inheritDoc} */ @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { // No-op. } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { return null; } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) { + @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { // No-op. } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 7c2dfe9..33b25c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.datastructures.*; @@ -47,7 +48,6 @@ import org.apache.ignite.internal.processors.plugin.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -153,7 +153,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfg.setMemoryMode(DFLT_MEMORY_MODE); if (cfg.getNodeFilter() == null) - cfg.setNodeFilter(CacheConfiguration.SERVER_NODES); + cfg.setNodeFilter(CacheConfiguration.ALL_NODES); if (cfg.getAffinity() == null) { if (cfg.getCacheMode() == PARTITIONED) { @@ -265,7 +265,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Suppress warning if at least one ATOMIC cache found. perf.add("Enable ATOMIC mode if not using transactions (set 'atomicityMode' to ATOMIC)", - cfg.getAtomicityMode() == ATOMIC); + cfg.getAtomicityMode() == ATOMIC); // Suppress warning if at least one non-FULL_SYNC mode found. perf.add("Disable fully synchronous writes (set 'writeSynchronizationMode' to PRIMARY_SYNC or FULL_ASYNC)", @@ -425,7 +425,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cc.getAtomicityMode() == ATOMIC) assertParameter(cc.getTransactionManagerLookupClassName() == null, - "transaction manager can not be used with ATOMIC cache"); + "transaction manager can not be used with ATOMIC cache"); } /** @@ -541,10 +541,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration()); - ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() { - @Override public void apply(Serializable evt) { - if (evt instanceof DynamicCacheChangeBatch) - onCacheChangeRequested((DynamicCacheChangeBatch)evt); + ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, + new CustomEventListener<DynamicCacheChangeBatch>() { + @Override public void onCustomEvent(ClusterNode snd, DynamicCacheChangeBatch msg) { + onCacheChangeRequested(msg); } }); @@ -567,7 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); - sharedCtx = createSharedContext(ctx); + sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx, + ctx.config().getCacheStoreSessionListenerFactories())); ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)", !ctx.config().getTransactionConfiguration().isTxSerializableEnabled()); @@ -622,9 +623,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.discovery().setCacheFilter( cfg.getName(), cfg.getNodeFilter(), - cfg.getNearConfiguration() != null, + cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED, cfg.getCacheMode() == LOCAL); + ctx.discovery().addClientNode(cfg.getName(), + ctx.localNodeId(), + cfg.getNearConfiguration() != null); + if (!cacheType.userCache()) stopSeq.addLast(cfg.getName()); else @@ -669,6 +674,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { for (ClusterNode n : ctx.discovery().remoteNodes()) { + if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)) + continue; + checkTransactionConfiguration(n); DeploymentMode locDepMode = ctx.config().getDeploymentMode(); @@ -683,7 +691,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (rmtCfg != null) { CacheConfiguration locCfg = desc.cacheConfiguration(); - checkCache(locCfg, rmtCfg, n); + checkCache(locCfg, rmtCfg, n, desc); // Check plugin cache configurations. CachePluginManager pluginMgr = desc.pluginManager(); @@ -706,12 +714,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgnitePredicate filter = ccfg.getNodeFilter(); - if (filter.apply(locNode)) { + boolean loc = desc.locallyConfigured(); + + if (loc || CU.affinityNode(locNode, filter)) { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); CachePluginManager pluginMgr = desc.pluginManager(); - GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); + GridCacheContext ctx = createCache( + ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed()); ctx.dynamicDeploymentId(desc.deploymentId()); @@ -754,8 +765,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { int order = cfg.getRebalanceOrder(); if (order > 0 && order != maxRebalanceOrder && cfg.getCacheMode() != LOCAL) { - GridCompoundFuture<Object, Object> fut = (GridCompoundFuture<Object, Object>)preloadFuts - .get(order); + GridCompoundFuture fut = (GridCompoundFuture)preloadFuts.get(order); if (fut == null) { fut = new GridCompoundFuture<>(); @@ -776,20 +786,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Wait for caches in SYNC preload mode. for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { - GridCacheAdapter<?, ?> cache = caches.get(maskNull(cfg.getName())); + GridCacheAdapter cache = caches.get(maskNull(cfg.getName())); - if (cache == null) - continue; + if (cache != null) { + if (cfg.getRebalanceMode() == SYNC) { + if (cfg.getCacheMode() == REPLICATED || + (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) { + cache.preloader().syncFuture().get(); - if (cfg.getRebalanceMode() == SYNC) { - if (cfg.getCacheMode() == REPLICATED || - (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) - cache.preloader().syncFuture().get(); + if (CU.isUtilityCache(cache.name())) + ctx.cacheObjects().onUtilityCacheStarted(); + } + } } - - if (CU.isUtilityCache(cache.name())) - ctx.cacheObjects().onUtilityCacheStarted(); } + + assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started"; + assert caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started"; } /** {@inheritDoc} */ @@ -816,6 +829,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { mgr.stop(cancel); } + CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners()); + sharedCtx.cleanup(); if (log.isDebugEnabled()) @@ -1051,7 +1066,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, @Nullable CachePluginManager pluginMgr, CacheType cacheType, - CacheObjectContext cacheObjCtx) + CacheObjectContext cacheObjCtx, + boolean updatesAllowed) throws IgniteCheckedException { assert cfg != null; @@ -1109,6 +1125,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfg, cacheType, ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), + updatesAllowed, /* * Managers in starting order! @@ -1238,6 +1255,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfg, cacheType, ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), + true, /* * Managers in starting order! @@ -1427,7 +1445,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ClusterNode locNode = ctx.discovery().localNode(); - boolean affNodeStart = !clientStartOnly && nodeFilter.apply(locNode); + boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter); boolean clientNodeStart = locNode.id().equals(initiatingNodeId); if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null) @@ -1441,7 +1459,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx); + GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true); cacheCtx.startTopologyVersion(topVer); @@ -1566,10 +1584,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Creates shared context. * * @param kernalCtx Kernal context. + * @param storeSesLsnrs Store session listeners. * @return Shared context. */ @SuppressWarnings("unchecked") - private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx) { + private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx, + Collection<CacheStoreSessionListener> storeSesLsnrs) { IgniteTxManager tm = new IgniteTxManager(); GridCacheMvccManager mvccMgr = new GridCacheMvccManager(); GridCacheVersionManager verMgr = new GridCacheVersionManager(); @@ -1584,7 +1604,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { mvccMgr, depMgr, exchMgr, - ioMgr + ioMgr, + storeSesLsnrs ); } @@ -1871,7 +1892,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Check if we were asked to start a near cache. if (nearCfg != null) { - if (descCfg.getNodeFilter().apply(ctx.discovery().localNode())) { + if (CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter())) { // If we are on a data node and near cache was enabled, return success, else - fail. if (descCfg.getNearConfiguration() != null) return new GridFinishedFuture<>(); @@ -1918,7 +1939,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridFinishedFuture<>(new CacheExistsException("Failed to start near cache " + "(a cache with the given name is not started): " + cacheName)); - if (ccfg.getNodeFilter().apply(ctx.discovery().localNode())) { + if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter())) { if (ccfg.getNearConfiguration() != null) return new GridFinishedFuture<>(); else @@ -2206,11 +2227,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Checks that remote caches has configuration compatible with the local. * + * @param locCfg Local configuration. + * @param rmtCfg Remote configuration. * @param rmtNode Remote node. + * @param desc Cache descriptor. * @throws IgniteCheckedException If check failed. */ - private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode) - throws IgniteCheckedException { + private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode, + DynamicCacheDescriptor desc) throws IgniteCheckedException { ClusterNode locNode = ctx.discovery().localNode(); UUID rmt = rmtNode.id(); @@ -2218,6 +2242,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg); GridCacheAttributes locAttr = new GridCacheAttributes(locCfg); + boolean isLocAff = CU.affinityNode(locNode, locCfg.getNodeFilter()); + boolean isRmtAff = CU.affinityNode(rmtNode, rmtCfg.getNodeFilter()); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode", locAttr.cacheMode(), rmtAttr.cacheMode(), true); @@ -2231,8 +2258,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode", "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true); - if (locCfg.getAtomicityMode() == TRANSACTIONAL || - (rmtCfg.getNodeFilter().apply(rmtNode) && locCfg.getNodeFilter().apply(locNode))) + boolean checkStore; + + if (!isLocAff && isRmtAff && locCfg.getAtomicityMode() == TRANSACTIONAL) { + checkStore = locAttr.storeFactoryClassName() != null; + + if (locAttr.storeFactoryClassName() == null && rmtAttr.storeFactoryClassName() != null) + desc.updatesAllowed(false); + } + else + checkStore = isLocAff && isRmtAff; + + if (checkStore) CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory", locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true); @@ -2551,7 +2588,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache instance for given name. * @throws IgniteCheckedException If failed. */ - public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException { + public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException { return publicJCache(cacheName, true); } @@ -2565,7 +2602,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - @Nullable public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted) + @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { if (log.isDebugEnabled()) @@ -2573,7 +2610,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { String masked = maskNull(cacheName); - IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(masked); + IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked); DynamicCacheDescriptor desc = registeredCaches.get(masked); @@ -2583,7 +2620,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cache == null) cache = startJCache(cacheName, failIfNotStarted); - return cache; + return (IgniteCacheProxy<K, V>)cache; } /** @@ -2593,7 +2630,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache instance for given name. * @throws IgniteCheckedException If failed. */ - private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { + private IgniteCacheProxy startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { String masked = maskNull(cacheName); DynamicCacheDescriptor desc = registeredCaches.get(masked); @@ -2623,7 +2660,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { F.first(initiateCacheChanges(F.asList(req))).get(); - IgniteCache cache = jCacheProxies.get(masked); + IgniteCacheProxy cache = jCacheProxies.get(masked); if (cache == null && failIfNotStarted) throw new IllegalArgumentException("Cache is not started: " + cacheName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 55d2f84..63ba242 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -329,7 +329,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Nullable @Override public Map<K, V> getAllOutTx(List<K> keys) throws IgniteCheckedException { + @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); try { @@ -341,6 +341,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Nullable @Override public IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getAllOutTxAsync(keys); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public boolean isIgfsDataCache() { CacheOperationContext prev = gate.enter(opCtx); @@ -741,6 +753,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public Set<K> keySetx() { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.keySetx(); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { CacheOperationContext prev = gate.enter(opCtx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 294c2b0..1071ef2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.timeout.*; @@ -76,6 +78,9 @@ public class GridCacheSharedContext<K, V> { /** Preloaders start future. */ private IgniteInternalFuture<Object> preloadersStartFut; + /** Store session listeners. */ + private Collection<CacheStoreSessionListener> storeSesLsnrs; + /** * @param txMgr Transaction manager. * @param verMgr Version manager. @@ -88,7 +93,8 @@ public class GridCacheSharedContext<K, V> { GridCacheMvccManager mvccMgr, GridCacheDeploymentManager<K, V> depMgr, GridCachePartitionExchangeManager<K, V> exchMgr, - GridCacheIoManager ioMgr + GridCacheIoManager ioMgr, + Collection<CacheStoreSessionListener> storeSesLsnrs ) { this.kernalCtx = kernalCtx; this.mvccMgr = add(mvccMgr); @@ -97,6 +103,7 @@ public class GridCacheSharedContext<K, V> { this.depMgr = add(depMgr); this.exchMgr = add(exchMgr); this.ioMgr = add(ioMgr); + this.storeSesLsnrs = storeSesLsnrs; txMetrics = new TransactionMetricsAdapter(); @@ -427,27 +434,38 @@ public class GridCacheSharedContext<K, V> { * @param tx Transaction to check. * @param activeCacheIds Active cache IDs. * @param cacheCtx Cache context. - * @return {@code True} if cross-cache transaction can include this new cache. + * @return Error message if transactions are incompatible. */ - public boolean txCompatible(IgniteInternalTx tx, Iterable<Integer> activeCacheIds, GridCacheContext<K, V> cacheCtx) { - if (cacheCtx.systemTx() ^ tx.system()) - return false; + @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, Iterable<Integer> activeCacheIds, + GridCacheContext<K, V> cacheCtx) { + if (cacheCtx.systemTx() && !tx.system()) + return "system cache can be enlisted only in system transaction"; + + if (!cacheCtx.systemTx() && tx.system()) + return "non-system cache can't be enlisted in system transaction"; for (Integer cacheId : activeCacheIds) { GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId); - // System transactions may sap only one cache. if (cacheCtx.systemTx()) { if (activeCacheCtx.cacheId() != cacheCtx.cacheId()) - return false; + return "system transaction can include only one cache"; } - // Check that caches have the same store. - if (activeCacheCtx.store().store() != cacheCtx.store().store()) - return false; + CacheStoreManager store = cacheCtx.store(); + CacheStoreManager activeStore = activeCacheCtx.store(); + + if (store.isLocal() != activeStore.isLocal()) + return "caches with local and non-local stores can't be enlisted in one transaction"; + + if (store.isWriteBehind() != activeStore.isWriteBehind()) + return "caches with different write-behind setting can't be enlisted in one transaction"; + + // If local and write-behind validations passed, this must be true. + assert store.isWriteToStoreFromDht() == activeStore.isWriteToStoreFromDht(); } - return true; + return null; } /** @@ -499,6 +517,7 @@ public class GridCacheSharedContext<K, V> { /** * @param tx Transaction to rollback. * @throws IgniteCheckedException If failed. + * @return Rollback future. */ public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException { Collection<Integer> cacheIds = tx.activeCacheIds(); @@ -512,6 +531,13 @@ public class GridCacheSharedContext<K, V> { } /** + * @return Store session listeners. + */ + @Nullable public Collection<CacheStoreSessionListener> storeSessionListeners() { + return storeSesLsnrs; + } + + /** * @param mgr Manager to add. * @return Added manager. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index eb82218..772e849 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -121,6 +121,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { warnFirstEvict(); writeToSwap(part, cctx.toCacheKeyObject(kb), vb); + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapEvict(); } catch (IgniteCheckedException e) { log.error("Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e); @@ -395,8 +398,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return Reconstituted swap entry or {@code null} if entry is obsolete. * @throws IgniteCheckedException If failed. */ - @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException - { + @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException { assert e != null; checkIteratorQueue(); @@ -425,9 +427,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); // First check off-heap store. - if (offheapEnabled) - if (offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()))) + if (offheapEnabled) { + boolean contains = offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRead(contains); + + if (contains) return true; + } if (swapEnabled) { assert key != null; @@ -436,6 +444,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())), cctx.deploy().globalLoader()); + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRead(valBytes != null); + return valBytes != null; } @@ -444,7 +455,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param key Key to read. - * @param keyBytes Key bytes. * @param part Key partition. * @param entryLocked {@code True} if cache entry is locked. * @param readOffheap Read offheap flag. @@ -481,6 +491,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (readOffheap && offheapEnabled) { byte[] bytes = offheap.get(spaceName, part, key, keyBytes); + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRead(bytes != null); + if (bytes != null) return swapEntry(unmarshalSwapEntry(bytes)); } @@ -524,6 +537,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (offheapEnabled) { byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + if (cctx.config().isStatisticsEnabled()) { + if (entryBytes != null) + cctx.cache().metrics0().onOffHeapRemove(); + + cctx.cache().metrics0().onOffHeapRead(entryBytes != null); + } + if (entryBytes != null) { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); @@ -567,8 +587,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return Value from swap or {@code null}. * @throws IgniteCheckedException If failed. */ - @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key, - final int part) + @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key, final int part) throws IgniteCheckedException { if (!swapEnabled) return null; @@ -582,6 +601,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() { @Override public void apply(byte[] rmv) { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRead(rmv != null); + if (rmv != null) { try { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv)); @@ -611,6 +633,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { null); } + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRemove(); + // Always fire this event, since preloading depends on it. onUnswapped(part, key, entry); @@ -649,12 +674,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (!offheapEnabled && !swapEnabled) return null; - return read(entry.key(), - entry.key().valueBytes(cctx.cacheObjectContext()), - entry.partition(), - locked, - readOffheap, - readSwap); + return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked, + readOffheap, readSwap); } /** @@ -730,6 +751,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { final GridCacheQueryManager qryMgr = cctx.queries(); Collection<SwapKey> unprocessedKeys = null; + final Collection<GridCacheBatchSwapEntry> res = new ArrayList<>(keys.size()); // First try removing from offheap. @@ -737,8 +759,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { for (KeyCacheObject key : keys) { int part = cctx.affinity().partition(key); - byte[] entryBytes = - offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(entryBytes != null && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); if (entryBytes != null) { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); @@ -848,6 +872,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { null); } + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRemove(); + // Always fire this event, since preloading depends on it. onUnswapped(swapKey.partition(), key, entry); @@ -880,7 +907,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - return offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(rmv && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); + + return rmv; } /** @@ -925,6 +957,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return; try { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRemove(); + GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv)); if (entry == null) @@ -942,11 +977,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First try offheap. if (offheapEnabled) { - byte[] val = offheap.remove(spaceName, - part, - key.value(cctx.cacheObjectContext(), false), + byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false), key.valueBytes(cctx.cacheObjectContext())); + if(val != null && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); + if (val != null) { if (c != null) c.apply(val); // Probably we should read value and apply closure before removing... @@ -1007,6 +1043,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (offheapEnabled) { offheap.put(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()), entry.marshal()); + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapWrite(); + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP)) cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null); @@ -1035,11 +1074,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (offheapEnabled) { for (GridCacheBatchSwapEntry swapEntry : swapped) { - offheap.put(spaceName, - swapEntry.partition(), - swapEntry.key(), - swapEntry.key().valueBytes(cctx.cacheObjectContext()), - swapEntry.marshal()); + offheap.put(spaceName, swapEntry.partition(), swapEntry.key(), + swapEntry.key().valueBytes(cctx.cacheObjectContext()), swapEntry.marshal()); + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapWrite(); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP)) cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(), @@ -1071,6 +1110,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { qryMgr.onSwap(batchSwapEntry.key()); } } + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapWrite(batch.size()); } } @@ -1082,17 +1124,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @param entry Entry bytes. * @throws IgniteCheckedException If failed. */ - private void writeToSwap(int part, - KeyCacheObject key, - byte[] entry) - throws IgniteCheckedException - { + private void writeToSwap(int part, KeyCacheObject key, byte[] entry) throws IgniteCheckedException { checkIteratorQueue(); swapMgr.write(spaceName, new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())), - entry, - cctx.deploy().globalLoader()); + entry, cctx.deploy().globalLoader()); + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapWrite(); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_SWAPPED)) cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid) null, null, @@ -1274,7 +1314,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(rmv && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); } else it.removeX(); @@ -1432,6 +1475,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return it.hasNext(); } + @SuppressWarnings("unchecked") @Override protected void onRemove() throws IgniteCheckedException { if (cur == null) throw new IllegalStateException("Method next() has not yet been called, or the remove() method " + @@ -1616,7 +1660,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(rmv && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); } @Override protected void onClose() throws IgniteCheckedException { @@ -1646,7 +1693,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(rmv && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 5f9049a..9bd6321 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -43,7 +43,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { - if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl()) + boolean cleanupDisabled = cctx.kernalContext().isDaemon() || + !cctx.config().isEagerTtl() || + CU.isAtomicsCache(cctx.name()) || + CU.isMarshallerCache(cctx.name()) || + CU.isUtilityCache(cctx.name()) || + (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null); + + if (cleanupDisabled) return; cleanupWorker = new CleanupWorker(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 549f42f..3bd2a45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -34,12 +35,14 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; import org.apache.ignite.plugin.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import org.jsr166.*; import javax.cache.*; +import javax.cache.configuration.*; import javax.cache.expiry.*; import javax.cache.integration.*; import java.io.*; @@ -114,13 +117,6 @@ public class GridCacheUtils { } }; - /** Not evicted partitions. */ - private static final IgnitePredicate PART_NOT_EVICTED = new P1<GridDhtLocalPartition>() { - @Override public boolean apply(GridDhtLocalPartition p) { - return p.state() != GridDhtPartitionState.EVICTED; - } - }; - /** */ private static final IgniteClosure<Integer, GridCacheVersion[]> VER_ARR_FACTORY = new C1<Integer, GridCacheVersion[]>() { @@ -398,30 +394,11 @@ public class GridCacheUtils { * @return Partition to state transformer. */ @SuppressWarnings({"unchecked"}) - public static <K, V> IgniteClosure<GridDhtLocalPartition, GridDhtPartitionState> part2state() { + public static IgniteClosure<GridDhtLocalPartition, GridDhtPartitionState> part2state() { return PART2STATE; } /** - * @return Not evicted partitions. - */ - @SuppressWarnings( {"unchecked"}) - public static <K, V> IgnitePredicate<GridDhtLocalPartition> notEvicted() { - return PART_NOT_EVICTED; - } - - /** - * Gets all nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @return All nodes on which cache with the same name is started (including nodes - * that may have already left). - */ - public static Collection<ClusterNode> allNodes(GridCacheContext ctx) { - return allNodes(ctx, AffinityTopologyVersion.NONE); - } - - /** * Gets all nodes on which cache with the same name is started. * * @param ctx Cache context. @@ -446,59 +423,6 @@ public class GridCacheUtils { } /** - * Gets alive nodes. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection<ClusterNode> aliveNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveCacheNodes(ctx.namex(), topOrder); - } - - /** - * Gets remote nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @return Remote nodes on which cache with the same name is started. - */ - public static Collection<ClusterNode> remoteNodes(final GridCacheContext ctx) { - return remoteNodes(ctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets remote node with at least one cache configured. - * - * @param ctx Shared cache context. - * @return Collection of nodes with at least one cache configured. - */ - public static Collection<ClusterNode> remoteNodes(GridCacheSharedContext ctx) { - return remoteNodes(ctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets remote nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Remote nodes on which cache with the same name is started. - */ - public static Collection<ClusterNode> remoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().remoteCacheNodes(ctx.namex(), topOrder); - } - - /** - * Gets alive nodes. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection<ClusterNode> aliveRemoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveRemoteCacheNodes(ctx.namex(), topOrder); - } - - /** * Gets remote nodes with at least one cache configured. * * @param ctx Cache shared context. @@ -510,25 +434,15 @@ public class GridCacheUtils { } /** - * Gets alive nodes with at least one cache configured. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection<ClusterNode> aliveCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveNodesWithCaches(topOrder); - } - - /** * Gets alive remote nodes with at least one cache configured. * * @param ctx Cache context. * @param topOrder Maximum allowed node order. * @return Affinity nodes. */ - public static Collection<ClusterNode> aliveRemoteCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveRemoteNodesWithCaches(topOrder); + public static Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final GridCacheSharedContext ctx, + AffinityTopologyVersion topOrder) { + return ctx.discovery().aliveRemoteServerNodesWithCaches(topOrder); } /** @@ -577,90 +491,34 @@ public class GridCacheUtils { } /** - * Checks if given node has specified cache started. - * - * @param cacheName Cache name. - * @param node Node to check. - * @return {@code True} if given node has specified cache started. - */ - public static boolean cacheNode(String cacheName, ClusterNode node) { - return cacheNode(cacheName, (GridCacheAttributes[])node.attribute(ATTR_CACHE)); - } - - /** - * Checks if given attributes relate the the node which has (or had) specified cache started. - * - * @param cacheName Cache name. - * @param caches Node cache attributes. - * @return {@code True} if given node has specified cache started. - */ - public static boolean cacheNode(String cacheName, GridCacheAttributes[] caches) { - if (caches != null) - for (GridCacheAttributes attrs : caches) - if (F.eq(cacheName, attrs.cacheName())) - return true; - - return false; - } - - /** - * Gets oldest alive node for specified topology version. - * - * @param cctx Cache context. - * @return Oldest node for the current topology version. - */ - public static ClusterNode oldest(GridCacheContext cctx) { - return oldest(cctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets oldest alive node across nodes with at least one cache configured. - * - * @param ctx Cache context. - * @return Oldest node. - */ - public static ClusterNode oldest(GridCacheSharedContext ctx) { - return oldest(ctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets oldest alive node for specified topology version. + * Gets oldest alive server node with at least one cache configured for specified topology version. * - * @param cctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Oldest node for the given topology version. + * @param ctx Context. + * @param topVer Maximum allowed topology version. + * @return Oldest alive cache server node. */ - public static ClusterNode oldest(GridCacheContext cctx, AffinityTopologyVersion topOrder) { - ClusterNode oldest = null; + @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx, + AffinityTopologyVersion topVer) { + Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer); - for (ClusterNode n : aliveNodes(cctx, topOrder)) - if (oldest == null || n.order() < oldest.order()) - oldest = n; - - assert oldest != null : "Failed to find oldest node for cache context [name=" + cctx.name() + ", topOrder=" + topOrder + ']'; - assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder); + if (nodes.isEmpty()) + return null; - return oldest; + return oldest(nodes); } /** - * Gets oldest alive node with at least one cache configured for specified topology version. - * - * @param cctx Shared cache context. - * @param topOrder Maximum allowed node order. + * @param nodes Nodes. * @return Oldest node for the given topology version. */ - public static ClusterNode oldest(GridCacheSharedContext cctx, AffinityTopologyVersion topOrder) { + @Nullable public static ClusterNode oldest(Collection<ClusterNode> nodes) { ClusterNode oldest = null; - for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) { + for (ClusterNode n : nodes) { if (oldest == null || n.order() < oldest.order()) oldest = n; } - assert oldest != null : "Failed to find oldest node with caches: " + topOrder; - assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder); - return oldest; } @@ -718,30 +576,6 @@ public class GridCacheUtils { } /** - * @return Closure that converts tx entry to key. - */ - @SuppressWarnings({"unchecked"}) - public static <K, V> IgniteClosure<IgniteTxEntry, K> tx2key() { - return (IgniteClosure<IgniteTxEntry, K>)tx2key; - } - - /** - * @return Closure that converts tx entry collection to key collection. - */ - @SuppressWarnings({"unchecked"}) - public static <K, V> IgniteClosure<Collection<IgniteTxEntry>, Collection<K>> txCol2Key() { - return (IgniteClosure<Collection<IgniteTxEntry>, Collection<K>>)txCol2key; - } - - /** - * @return Converts transaction entry to cache entry. - */ - @SuppressWarnings( {"unchecked"}) - public static <K, V> IgniteClosure<IgniteTxEntry, GridCacheEntryEx> tx2entry() { - return (IgniteClosure<IgniteTxEntry, GridCacheEntryEx>)tx2entry; - } - - /** * @return Closure which converts transaction entry xid to XID version. */ @SuppressWarnings( {"unchecked"}) @@ -1451,13 +1285,7 @@ public class GridCacheUtils { } /** - * @return Cache ID for utility cache. - */ - public static int utilityCacheId() { - return cacheId(UTILITY_CACHE_NAME); - } - - /** + * @param cacheName Cache name. * @return Cache ID. */ public static int cacheId(String cacheName) { @@ -1688,7 +1516,7 @@ public class GridCacheUtils { /** * @param aff Affinity. * @param n Node. - * @return Predicate that evaulates to {@code true} if entry is primary for node. + * @return Predicate that evaluates to {@code true} if entry is primary for node. */ public static CacheEntryPredicate cachePrimary( final Affinity aff, @@ -1790,4 +1618,76 @@ public class GridCacheUtils { return res; } + + /** + * @param node Node. + * @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). + */ + public static boolean clientNode(ClusterNode node) { + Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); + + assert clientModeAttr != null : node; + + return clientModeAttr != null && clientModeAttr; + } + + /** + * @param node Node. + * @param filter Node filter. + * @return {@code True} if node is not client node and pass given filter. + */ + public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) { + return !clientNode(node) && filter.apply(node); + } + + /** + * Creates and starts store session listeners. + * + * @param ctx Kernal context. + * @param factories Factories. + * @return Listeners. + * @throws IgniteCheckedException In case of error. + */ + public static Collection<CacheStoreSessionListener> startStoreSessionListeners(GridKernalContext ctx, + Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException { + if (factories == null) + return null; + + Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length); + + for (Factory<CacheStoreSessionListener> factory : factories) { + CacheStoreSessionListener lsnr = factory.create(); + + if (lsnr != null) { + ctx.resource().injectGeneric(lsnr); + + if (lsnr instanceof LifecycleAware) + ((LifecycleAware)lsnr).start(); + + lsnrs.add(lsnr); + } + } + + return lsnrs; + } + + /** + * Stops store session listeners. + * + * @param ctx Kernal context. + * @param sesLsnrs Session listeners. + * @throws IgniteCheckedException In case of error. + */ + public static void stopStoreSessionListeners(GridKernalContext ctx, Collection<CacheStoreSessionListener> sesLsnrs) + throws IgniteCheckedException { + if (sesLsnrs == null) + return; + + for (CacheStoreSessionListener lsnr : sesLsnrs) { + if (lsnr instanceof LifecycleAware) + ((LifecycleAware)lsnr).stop(); + + ctx.resource().cleanupGeneric(lsnr); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f840015..4390993 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -699,6 +699,29 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } + /** {@inheritDoc} */ + @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { + try { + CacheOperationContext prev = onEnter(opCtx); + + try { + if (isAsync()) { + setFuture(delegate.getAllOutTxAsync(keys)); + + return null; + } + else + return delegate.getAllOutTx(keys); + } + finally { + onLeave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + /** * @param keys Keys. * @return Values map. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 5184115..9972f92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -775,6 +775,11 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public Set<K> keySet(); /** + * @return Set of keys including internal keys. + */ + public Set<K> keySetx(); + + /** * Set of keys for which this node is primary. * This set is dynamic and may change with grid topology changes. * Note that this set will contain mappings for all keys, even if their values are @@ -1130,11 +1135,9 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public IgniteInternalFuture<Boolean> removeAsync(K key, V val); /** - * Removes given key mappings from cache for entries for which the optionally passed in filters do - * pass. + * Removes given key mappings from cache. * <p> - * If write-through is enabled, the values will be removed from {@link CacheStore} - * via <code>@link CacheStore#removeAll(Transaction, Collection)</code> method. + * If write-through is enabled, the values will be removed from {@link CacheStore} via {@link IgniteDataStreamer}. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -1145,11 +1148,9 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public void removeAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException; /** - * Asynchronously removes given key mappings from cache for entries for which the optionally - * passed in filters do pass. + * Asynchronously removes given key mappings from cache for entries. * <p> - * If write-through is enabled, the values will be removed from {@link CacheStore} - * via <code>@link CacheStore#removeAll(Transaction, Collection)</code> method. + * If write-through is enabled, the values will be removed from {@link CacheStore} via {@link IgniteDataStreamer}. * <h2 class="header">Transactions</h2> * This method is transactional and will enlist the entry into ongoing transaction * if there is one. @@ -1161,20 +1162,13 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public IgniteInternalFuture<?> removeAllAsync(@Nullable Collection<? extends K> keys); /** - * Removes mappings from cache for entries for which the optionally passed in filters do - * pass. If passed in filters are {@code null}, then all entries in cache will be enrolled - * into transaction. + * Removes mappings from cache. * <p> - * <b>USE WITH CARE</b> - if your cache has many entries that pass through the filter or if filter - * is empty, then transaction will quickly become very heavy and slow. Also, locks - * are acquired in undefined order, so it may cause a deadlock when used with - * other concurrent transactional updates. + * <b>USE WITH CARE</b> - if your cache has many entries then transaction will quickly become very heavy and slow. * <p> - * If write-through is enabled, the values will be removed from {@link CacheStore} - * via <code>@link CacheStore#removeAll(Transaction, Collection)</code> method. + * If write-through is enabled, the values will be removed from {@link CacheStore} via {@link IgniteDataStreamer}. * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. + * This method is not transactional. * * @throws IgniteCheckedException If remove failed. */ @@ -1618,7 +1612,16 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { * @return Value. * @throws IgniteCheckedException If failed. */ - @Nullable public Map<K, V> getAllOutTx(List<K> keys) throws IgniteCheckedException; + public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException; + + /** + * Gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries + * and will not lock any keys if pessimistic transaction is started by thread. + * + * @param keys Keys to get values for. + * @return Future for getAllOutTx operation. + */ + public IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys); /** * Checks whether this cache is IGFS data cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 61ca882..e5fa891 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -23,7 +23,7 @@ import org.jetbrains.annotations.*; /** * */ -public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheObject, Comparable<KeyCacheObjectImpl> { +public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheObject { /** */ private static final long serialVersionUID = 0L; @@ -46,15 +46,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public int compareTo(KeyCacheObjectImpl other) { - assert val instanceof Comparable : val; - assert other.val instanceof Comparable : val; - - return ((Comparable)val).compareTo(other.val); - } - - /** {@inheritDoc} */ @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { if (valBytes == null) valBytes = ctx.processor().marshal(ctx, val); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java index 0186a90..0790052 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java @@ -84,9 +84,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { @Override public int[] primaryPartitions(ClusterNode n) { A.notNull(n, "n"); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); - - Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer); + Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topologyVersion()); return U.toIntArray(parts); } @@ -95,9 +93,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { @Override public int[] backupPartitions(ClusterNode n) { A.notNull(n, "n"); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); - - Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer); + Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topologyVersion()); return U.toIntArray(parts); } @@ -108,7 +104,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { Collection<Integer> parts = new HashSet<>(); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); + AffinityTopologyVersion topVer = topologyVersion(); for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) { for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index fa8d192..b5c5161 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -218,7 +218,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { } }, new QueueHeaderPredicate(), - cctx.isLocal() || cctx.isReplicated(), + cctx.isLocal() || (cctx.isReplicated() && cctx.affinityNode()), true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index b79f9d5..bd72764 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -327,13 +327,6 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { } /** - * - */ - public void onUnlock() { - // No-op. - } - - /** * Unlocks local lock. * * @return Removed candidate, or <tt>null</tt> if thread still holds the lock. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index fded3c9..bd1dedf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -63,6 +63,9 @@ public class GridDistributedTxMapping implements Externalizable { /** {@code True} if mapping is for near caches, {@code false} otherwise. */ private boolean near; + /** {@code True} if this is first mapping for optimistic tx on client node. */ + private boolean clientFirst; + /** * Empty constructor required for {@link Externalizable}. */ @@ -108,6 +111,20 @@ public class GridDistributedTxMapping implements Externalizable { } /** + * @return {@code True} if this is first mapping for optimistic tx on client node. + */ + public boolean clientFirst() { + return clientFirst; + } + + /** + * @param clientFirst {@code True} if this is first mapping for optimistic tx on client node. + */ + public void clientFirst(boolean clientFirst) { + this.clientFirst = clientFirst; + } + + /** * @return {@code True} if mapping is for near caches, {@code false} otherwise. */ public boolean near() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 331de4e..c3f3e7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -210,7 +210,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + + assert oldest != null; if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -218,7 +220,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) { + if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -665,7 +667,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); // If this node became the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -715,7 +717,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); ClusterNode loc = cctx.localNode(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index 303d649..7bae7f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -96,12 +96,12 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl /** * @param node Node. - * @param res Reponse. + * @param res Response. */ public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) { if (!res.topologyVersion().equals(topVer)) { if (log.isDebugEnabled()) - log.debug("Received affinity assignment for wrong topolgy version (will ignore) " + + log.debug("Received affinity assignment for wrong topology version (will ignore) " + "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']'); return;