http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 237371e,953d15a..06f2982 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@@ -129,9 -127,9 +128,9 @@@ public class GridCacheEvictionManager e /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { - CacheConfiguration<K, V> cfg = cctx.config(); + CacheConfiguration cfg = cctx.config(); - plc = cctx.isNear() ? cfg.getNearEvictionPolicy() : cfg.getEvictionPolicy(); + plc = cctx.isNear() ? cfg.getNearConfiguration().getNearEvictionPolicy() : cfg.getEvictionPolicy(); memoryMode = cctx.config().getMemoryMode(); @@@ -499,8 -495,7 +496,8 @@@ if (!cctx.isNear()) { try { - GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, - GridDhtLocalPartition part = cctx.dht().topology().localPartition(p, -1, false); ++ GridDhtLocalPartition part = cctx.dht().topology().localPartition(p, + AffinityTopologyVersion.NONE, false); assert part != null; @@@ -527,8 -522,7 +524,8 @@@ if (!cctx.isNear()) { try { - GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE, - GridDhtLocalPartition part = cctx.dht().topology().localPartition(p, -1, false); ++ GridDhtLocalPartition part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE, + false); if (part != null && part.reserve()) { part.lock(); @@@ -564,8 -558,7 +561,8 @@@ if (!cctx.isNear()) { try { - GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE, - GridDhtLocalPartition part = cctx.dht().topology().localPartition(p, -1, false); ++ GridDhtLocalPartition part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE, + false); if (part != null) { part.unlock(); @@@ -742,7 -739,7 +743,7 @@@ * @param e Entry for eviction policy notification. * @param topVer Topology version. */ - public void touch(GridCacheEntryEx<K, V> e, AffinityTopologyVersion topVer) { - public void touch(GridCacheEntryEx e, long topVer) { ++ public void touch(GridCacheEntryEx e, AffinityTopologyVersion topVer) { if (e.detached() || e.isInternal()) return; @@@ -1243,16 -1239,21 +1243,21 @@@ * @param info Eviction info. * @return Version aware filter. */ - private IgnitePredicate<Cache.Entry<K, V>>[] versionFilter(final EvictionInfo info) { + private CacheEntryPredicate[] versionFilter(final EvictionInfo info) { // If version has changed since we started the whole process // then we should not evict entry. - return cctx.vararg(new P1<Cache.Entry<K, V>>() { - @Override public boolean apply(Cache.Entry<K, V> e) { - GridCacheVersion ver = (GridCacheVersion)((CacheVersionedEntryImpl)e).version(); + return new CacheEntryPredicate[]{new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + try { + GridCacheVersion ver = e.version(); - return info.version().equals(ver) && F.isAll(info.filter()); + return info.version().equals(ver) && F.isAll(info.filter()); + } - catch (GridCacheEntryRemovedException err) { ++ catch (GridCacheEntryRemovedException ignored) { + return false; + } } - }); + }}; } /** @@@ -1267,8 -1268,8 +1272,8 @@@ * execution. */ @SuppressWarnings( {"IfMayBeConditional"}) - private IgniteBiTuple<Collection<ClusterNode>, Collection<ClusterNode>> remoteNodes(GridCacheEntryEx<K, V> entry, + private IgniteBiTuple<Collection<ClusterNode>, Collection<ClusterNode>> remoteNodes(GridCacheEntryEx entry, - long topVer) + AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException { assert entry != null; @@@ -1567,23 -1567,9 +1572,9 @@@ private GridTimeoutObject timeoutObj; /** Topology version future is processed on. */ - private long topVer; + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; /** - * @param ctx Context. - */ - EvictionFuture(GridKernalContext ctx) { - super(ctx); - } - - /** - * Required by {@code Externalizable}. - */ - public EvictionFuture() { - assert false : "This should never happen."; - } - - /** * @return {@code True} if prepare lock was acquired. */ boolean prepareLock() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java index 1344495,17d9457..fba0771 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java @@@ -19,14 -19,10 +19,12 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.version.*; - import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; - import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; import java.io.*; import java.nio.*; @@@ -44,15 -40,11 +42,11 @@@ public class GridCacheEvictionRequest e /** Entries to clear from near and backup nodes. */ @GridToStringInclude - @GridDirectTransient - private Collection<GridTuple3<K, GridCacheVersion, Boolean>> entries; - - /** Serialized entries. */ - @GridToStringExclude - private byte[] entriesBytes; + @GridDirectCollection(CacheEvictionEntry.class) + private Collection<CacheEvictionEntry> entries; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** * Required by {@link Externalizable}. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 70f184e,04b2745..dd6e741 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@@ -977,8 -949,8 +950,8 @@@ public abstract class GridCacheMapEntr long ttl, boolean evt, boolean metrics, - long topVer, + AffinityTopologyVersion topVer, - IgnitePredicate<Cache.Entry<K, V>>[] filter, + CacheEntryPredicate[] filter, GridDrType drType, long drExpireTime, @Nullable GridCacheVersion explicitVer, @@@ -1115,8 -1109,8 +1110,8 @@@ boolean retval, boolean evt, boolean metrics, - long topVer, + AffinityTopologyVersion topVer, - IgnitePredicate<Cache.Entry<K, V>>[] filter, + CacheEntryPredicate[] filter, GridDrType drType, @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, @@@ -2726,10 -2776,10 +2777,10 @@@ } /** {@inheritDoc} */ - @Nullable @Override public V peek(boolean heap, + @Nullable @Override public CacheObject peek(boolean heap, boolean offheap, boolean swap, - long topVer, + AffinityTopologyVersion topVer, @Nullable IgniteCacheExpiryPolicy expiryPlc) throws GridCacheEntryRemovedException, IgniteCheckedException { @@@ -2958,11 -2997,11 +2998,10 @@@ * @throws IgniteCheckedException If unexpected cache failure occurred. */ @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable private GridTuple<V> peekGlobal(boolean failFast, + @Nullable private GridTuple<CacheObject> peekGlobal(boolean failFast, - long topVer, + AffinityTopologyVersion topVer, - IgnitePredicate<Cache.Entry<K, V>>[] filter, - @Nullable IgniteCacheExpiryPolicy expiryPlc - ) + CacheEntryPredicate[] filter, - @Nullable IgniteCacheExpiryPolicy expiryPlc - ) ++ @Nullable IgniteCacheExpiryPolicy expiryPlc) throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException { if (!valid(topVer)) return null; @@@ -3148,12 -3188,9 +3188,9 @@@ long ttl, long expireTime, boolean preload, - long topVer, + AffinityTopologyVersion topVer, GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException { - if (cctx.isUnmarshalValues() && valBytes != null && val == null && isNewLocked()) - val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); - synchronized (this) { checkObsolete(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java index 73a0996,9437a84..1e56674 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java @@@ -35,6 -34,12 +35,12 @@@ public interface GridCacheMapEntryFacto * @param hdrId Header id. * @return New cache entry. */ - public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, K key, int hash, - V val, @Nullable GridCacheMapEntry<K, V> next, long ttl, int hdrId); + public GridCacheMapEntry create(GridCacheContext ctx, - long topVer, ++ AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val, + @Nullable GridCacheMapEntry next, + long ttl, + int hdrId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java index a96bce5,f2f513b..c5f26cb --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java @@@ -71,10 -70,10 +71,10 @@@ public class GridCacheMvccCandidate imp /** Topology version. */ @SuppressWarnings( {"TransientFieldNotInitialized"}) @GridToStringInclude - private transient volatile long topVer = -1; + private transient volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; /** Linked reentry. */ - private GridCacheMvccCandidate<K> reentry; + private GridCacheMvccCandidate reentry; /** Previous lock for the thread. */ @GridToStringExclude http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index c947592,57e6ccc..5725d6c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@@ -556,11 -554,12 +555,11 @@@ public class GridCacheMvccManager exten * * @return Remote candidates. */ - public Collection<GridCacheMvccCandidate<K>> remoteCandidates() { - Collection<GridCacheMvccCandidate<K>> rmtCands = new LinkedList<>(); + public Collection<GridCacheMvccCandidate> remoteCandidates() { + Collection<GridCacheMvccCandidate> rmtCands = new LinkedList<>(); - for (GridDistributedCacheEntry<K, V> entry : locked()) - for (GridDistributedCacheEntry entry : locked()) { ++ for (GridDistributedCacheEntry entry : locked()) rmtCands.addAll(entry.remoteMvccSnapshot()); - } return rmtCands; } @@@ -931,13 -940,13 +940,13 @@@ * @param topVer Topology version to wait for. * @return Explicit locks release future. */ - public IgniteInternalFuture<?> finishExplicitLocks(long topVer) { + public IgniteInternalFuture<?> finishExplicitLocks(AffinityTopologyVersion topVer) { - GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(cctx.kernalContext()); + GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(); - for (GridCacheExplicitLockSpan<K> span : pendingExplicit.values()) { + for (GridCacheExplicitLockSpan span : pendingExplicit.values()) { GridDiscoveryTopologySnapshot snapshot = span.topologySnapshot(); - if (snapshot != null && snapshot.topologyVersion() < topVer) + if (snapshot != null && snapshot.topologyVersion() < topVer.topologyVersion()) res.add(span.releaseFuture()); } @@@ -951,13 -960,13 +960,13 @@@ * * @return Finish update future. */ - public IgniteInternalFuture<?> finishAtomicUpdates(long topVer) { + public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) { - GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(cctx.kernalContext()); + GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(); res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class); - for (GridCacheAtomicFuture<K, ?> fut : atomicFuts.values()) { + for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) { - if (fut.waitForPartitionExchange() && fut.topologyVersion() < topVer) + if (fut.waitForPartitionExchange() && fut.topologyVersion().compareTo(topVer) < 0) res.add((IgniteInternalFuture<Object>)fut); } @@@ -972,7 -981,7 +981,7 @@@ * @return Future that signals when all locks for given keys are released. */ @SuppressWarnings("unchecked") - public IgniteInternalFuture<?> finishKeys(Collection<K> keys, AffinityTopologyVersion topVer) { - public IgniteInternalFuture<?> finishKeys(Collection<KeyCacheObject> keys, long topVer) { ++ public IgniteInternalFuture<?> finishKeys(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { if (!(keys instanceof Set)) keys = new HashSet<>(keys); @@@ -990,11 -999,11 +999,11 @@@ * @param topVer Topology version. * @return Future that signals when all locks for given partitions will be released. */ - private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<K> keyFilter, AffinityTopologyVersion topVer) { - private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<KeyCacheObject> keyFilter, long topVer) { - assert topVer != 0; ++ private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<KeyCacheObject> keyFilter, AffinityTopologyVersion topVer) { + assert topVer.topologyVersion() != 0; - if (topVer < 0) + if (topVer.equals(AffinityTopologyVersion.NONE)) - return new GridFinishedFuture(context().kernalContext()); + return new GridFinishedFuture(); final FinishLockFuture finishFut = new FinishLockFuture( keyFilter == null ? @@@ -1056,31 -1066,22 +1066,20 @@@ * @param topVer Topology version. * @param entries Entries. */ - FinishLockFuture(Iterable<GridDistributedCacheEntry<K, V>> entries, AffinityTopologyVersion topVer) { - super(cctx.kernalContext(), true); - - FinishLockFuture(Iterable<GridDistributedCacheEntry> entries, long topVer) { - assert topVer > 0; ++ FinishLockFuture(Iterable<GridDistributedCacheEntry> entries, AffinityTopologyVersion topVer) { + assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; this.topVer = topVer; - for (GridCacheEntryEx<K, V> entry : entries) { + for (GridCacheEntryEx entry : entries) { // Either local or near local candidates. try { - Collection<GridCacheMvccCandidate<K>> locs = entry.localCandidates(); + Collection<GridCacheMvccCandidate> locs = entry.localCandidates(); if (!F.isEmpty(locs)) { - Collection<GridCacheMvccCandidate<K>> cands = new ConcurrentLinkedQueue<>(); - Collection<GridCacheMvccCandidate> cands = - new ConcurrentLinkedQueue<>(); ++ Collection<GridCacheMvccCandidate> cands = new ConcurrentLinkedQueue<>(); - if (locs != null) - cands.addAll(F.view(locs, versionFilter())); + cands.addAll(F.view(locs, versionFilter())); if (!F.isEmpty(cands)) pendingLocks.put(entry.txKey(), cands); @@@ -1099,11 -1100,11 +1098,11 @@@ /** * @return Filter. */ - private IgnitePredicate<GridCacheMvccCandidate<K>> versionFilter() { + private IgnitePredicate<GridCacheMvccCandidate> versionFilter() { - assert topVer > 0; + assert topVer.topologyVersion() > 0; - return new P1<GridCacheMvccCandidate<K>>() { - @Override public boolean apply(GridCacheMvccCandidate<K> c) { + return new P1<GridCacheMvccCandidate>() { + @Override public boolean apply(GridCacheMvccCandidate c) { assert c.nearLocal() || c.dhtLocal(); // Wait for explicit locks. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 571c1a7,b7e3473..a7b91f9 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -82,13 -79,10 +82,13 @@@ public class GridCachePartitionExchange /** */ @GridToStringExclude - private final ConcurrentMap<Integer, GridClientPartitionTopology<K, V>> clientTops = new ConcurrentHashMap8<>(); + private final ConcurrentMap<Integer, GridClientPartitionTopology> clientTops = new ConcurrentHashMap8<>(); + /** Minor topology version incremented each time a new dynamic cache is started. */ + private volatile int minorTopVer; + /** */ - private volatile GridDhtPartitionsExchangeFuture<K, V> lastInitializedFuture; + private volatile GridDhtPartitionsExchangeFuture lastInitializedFuture; /** * Partition map futures. @@@ -117,81 -108,47 +117,81 @@@ final ClusterNode n = e.eventNode(); - assert !loc.id().equals(n.id()); + GridDhtPartitionExchangeId exchId = null; + GridDhtPartitionsExchangeFuture<K, V> exchFut = null; - if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { - assert cctx.discovery().node(n.id()) == null; + if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) { + assert !loc.id().equals(n.id()); - for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) - f.onNodeLeft(n.id()); - } + if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { + assert cctx.discovery().node(n.id()) == null; + - for (GridDhtPartitionsExchangeFuture<K, V> f : exchFuts.values()) ++ for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) + f.onNodeLeft(n.id()); + } - assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + - "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; + assert + e.type() != EVT_NODE_JOINED || n.order() > loc.order() : + "Node joined with smaller-than-local " + + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - GridDhtPartitionExchangeId exchId = exchangeId(n.id(), e.topologyVersion(), e.type()); + exchId = exchangeId(n.id(), + new AffinityTopologyVersion(e.topologyVersion(), minorTopVer = 0), + e.type()); - GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(exchId, e); + exchFut = exchangeFuture(exchId, e, null); + } + else { + DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; - // Start exchange process. - pendingExchangeFuts.add(exchFut); + if (customEvt.data() instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.data(); - // Event callback - without this callback future will never complete. - exchFut.onEvent(exchId, e); + Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size()); - if (log.isDebugEnabled()) - log.debug("Discovery event (will start exchange): " + exchId); - - locExchFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - if (!enterBusy()) - return; - - try { - // Unwind in the order of discovery events. - for (GridDhtPartitionsExchangeFuture f = pendingExchangeFuts.poll(); f != null; - f = pendingExchangeFuts.poll()) - addFuture(f); + // Validate requests to check if event should trigger partition exchange. + for (DynamicCacheChangeRequest req : batch.requests()) { + if (cctx.cache().dynamicCacheRegistered(req)) + valid.add(req); } - finally { - leaveBusy(); + + if (!F.isEmpty(valid)) { + exchId = exchangeId(n.id(), + new AffinityTopologyVersion(e.topologyVersion(), ++minorTopVer), + e.type()); + + exchFut = exchangeFuture(exchId, e, valid); } } - }); + } + + if (exchId != null) { + // Start exchange process. + pendingExchangeFuts.add(exchFut); + + // Event callback - without this callback future will never complete. + exchFut.onEvent(exchId, e); + + if (log.isDebugEnabled()) + log.debug("Discovery event (will start exchange): " + exchId); + - locExchFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { ++ locExchFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + if (!enterBusy()) + return; + + try { + // Unwind in the order of discovery events. - for (GridDhtPartitionsExchangeFuture<K, V> f = pendingExchangeFuts.poll(); f != null; ++ for (GridDhtPartitionsExchangeFuture f = pendingExchangeFuts.poll(); f != null; + f = pendingExchangeFuts.poll()) + addFuture(f); + } + finally { + leaveBusy(); + } + } + }); + } } finally { leaveBusy(); @@@ -207,12 -164,11 +207,12 @@@ exchWorker = new ExchangeWorker(); - cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); + cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, + EVT_DISCOVERY_CUSTOM_EVT); cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class, - new MessageHandler<GridDhtPartitionsSingleMessage<K, V>>() { - @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage<K, V> msg) { + new MessageHandler<GridDhtPartitionsSingleMessage>() { + @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { processSinglePartitionUpdate(node, msg); } }); @@@ -251,9 -207,9 +251,9 @@@ assert discoEvt != null; - assert discoEvt.topologyVersion() == startTopVer; + assert discoEvt.topologyVersion() == startTopVer.topologyVersion(); - GridDhtPartitionsExchangeFuture<K, V> fut = exchangeFuture(exchId, discoEvt, null); - GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt); ++ GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null); new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); @@@ -531,7 -487,7 +531,7 @@@ */ private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) throws IgniteCheckedException { - GridDhtPartitionsFullMessage<K, V> m = new GridDhtPartitionsFullMessage<>(null, null, AffinityTopologyVersion.NONE); - GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, -1); ++ GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE); for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) @@@ -596,12 -552,12 +596,12 @@@ * @param discoEvt Discovery event. * @return Exchange future. */ - GridDhtPartitionsExchangeFuture<K, V> exchangeFuture(GridDhtPartitionExchangeId exchId, + GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, - @Nullable DiscoveryEvent discoEvt) { + @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest> reqs) { - GridDhtPartitionsExchangeFuture<K, V> fut; + GridDhtPartitionsExchangeFuture fut; - GridDhtPartitionsExchangeFuture<K, V> old = exchFuts.addx( - fut = new GridDhtPartitionsExchangeFuture<>(cctx, busyLock, exchId, reqs)); + GridDhtPartitionsExchangeFuture old = exchFuts.addx( - fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId)); ++ fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs)); if (old != null) fut = old; @@@ -619,8 -575,8 +619,8 @@@ ExchangeFutureSet exchFuts0 = exchFuts; if (exchFuts0 != null) { - for (GridDhtPartitionsExchangeFuture<K, V> fut : exchFuts0.values()) { + for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) { - if (fut.exchangeId().topologyVersion() < exchFut.exchangeId().topologyVersion() - 10) + if (fut.exchangeId().topologyVersion().topologyVersion() < exchFut.exchangeId().topologyVersion().topologyVersion() - 10) fut.cleanUp(); } } @@@ -999,20 -955,18 +999,21 @@@ * Creates ordered, not strict list set. */ private ExchangeFutureSet() { - super(new Comparator<GridDhtPartitionsExchangeFuture<K, V>>() { + super(new Comparator<GridDhtPartitionsExchangeFuture>() { @Override public int compare( - GridDhtPartitionsExchangeFuture<K, V> f1, - GridDhtPartitionsExchangeFuture<K, V> f2) { + GridDhtPartitionsExchangeFuture f1, - GridDhtPartitionsExchangeFuture f2) { - long t1 = f1.exchangeId().topologyVersion(); - long t2 = f2.exchangeId().topologyVersion(); ++ GridDhtPartitionsExchangeFuture f2 ++ ) { + AffinityTopologyVersion t1 = f1.exchangeId().topologyVersion(); + AffinityTopologyVersion t2 = f2.exchangeId().topologyVersion(); - assert t1 > 0; - assert t2 > 0; + assert t1.topologyVersion() > 0; + assert t2.topologyVersion() > 0; // Reverse order. - return t1 < t2 ? 1 : t1 == t2 ? 0 : -1; + int cmp = t1.compareTo(t2); + + return cmp < 0 ? 1 : cmp == 0 ? 0 : -1; } }, /*not strict*/false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index c2b2b88,6ed7dcc..2e181f9 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@@ -116,7 -115,7 +116,7 @@@ public interface GridCachePreloader<K, * @param topVer Topology version, {@code -1} if not required. * @return Future to complete when all keys are preloaded. */ - public IgniteInternalFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer); - public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, long topVer); ++ public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer); /** * Force preload process. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index fd334f7,18f0283..f253fb8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@@ -112,8 -111,8 +112,8 @@@ public class GridCachePreloaderAdapter< } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Object> request(Collection<? extends K> keys, AffinityTopologyVersion topVer) { - return new GridFinishedFuture<>(cctx.kernalContext()); - @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, long topVer) { ++ @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { + return new GridFinishedFuture<>(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5d94d2f,4c50546..26f61f6 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -61,7 -55,7 +60,8 @@@ import java.util.concurrent.* import static org.apache.ignite.IgniteSystemProperties.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.configuration.CacheConfiguration.*; + import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CachePreloadMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; @@@ -596,302 -577,356 +597,313 @@@ public class GridCacheProcessor extend for (int i = 0; i < cfgs.length; i++) { CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]); + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null, cfg.getName()); + // Initialize defaults. - initialize(cfg); + initialize(cfg, cacheObjCtx); - CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null; + cfgs[i] = cfg; // Replace original configuration value. - validate(ctx.config(), cfg, cfgStore); + if (caches.containsKey(cfg.getName())) { + String cacheName = cfg.getName(); - CacheJtaManagerAdapter jta = JTA.create(cfg.getTransactionManagerLookupClassName() == null); + if (cacheName != null) + throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + + "assign unique name to each cache): " + cacheName); + else + throw new IgniteCheckedException("Default cache has already been configured (check configuration and " + + "assign unique name to each cache)."); + } - GridCacheContext cacheCtx = createCache(cfg); - jta.createTmLookup(cfg); ++ GridCacheContext cacheCtx = createCache(cfg, cacheObjCtx); // TODO IGNITE-45 - // Skip suggestions for system caches. - if (!sysCaches.contains(cfg.getName())) - suggestOptimizations(cfg, cfgStore != null); + DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cfg, IgniteUuid.randomUuid()); - List<Object> toPrepare = new ArrayList<>(); + desc.locallyConfigured(true); - toPrepare.add(jta.tmLookup()); - toPrepare.add(cfgStore); - toPrepare.add(cfg.getAffinityMapper()); + registeredCaches.put(cfg.getName(), desc); - if (cfg.getAffinityMapper() != cacheObjCtx.defaultAffMapper()) - toPrepare.add(cacheObjCtx.defaultAffMapper()); + ctx.discovery().setCacheFilter( + cfg.getName(), + cfg.getNodeFilter(), + cfg.getNearConfiguration() != null, + cfg.getCacheMode() == LOCAL); - if (cfgStore instanceof GridCacheLoaderWriterStore) { - toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader()); - toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer()); - } + sharedCtx.addCacheContext(cacheCtx); - prepare(cfg, toPrepare.toArray(new Object[toPrepare.size()])); + startSeq.add(cacheCtx.cache()); - U.startLifecycleAware(lifecycleAwares(cfg, jta.tmLookup(), cfgStore)); + caches.put(cfg.getName(), cacheCtx.cache()); - cfgs[i] = cfg; // Replace original configuration value. + if (sysCaches.contains(cfg.getName())) + stopSeq.addLast(cacheCtx.cache()); + else + stopSeq.addFirst(cacheCtx.cache()); + } - GridCacheAffinityManager affMgr = new GridCacheAffinityManager(); - GridCacheEventManager evtMgr = new GridCacheEventManager(); - GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || !GridCacheUtils.isNearEnabled(cfg)); - GridCacheEvictionManager evictMgr = new GridCacheEvictionManager(); - GridCacheQueryManager qryMgr = queryManager(cfg); - CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager(); - CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager(); - GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); - GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class); + // Start shared managers. + for (GridCacheSharedManager mgr : sharedCtx.managers()) + mgr.start(sharedCtx); - GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, sesHolders, cfgStore, cfg); + for (GridCacheAdapter<?, ?> cache : startSeq) + startCache(cache); - GridCacheContext<?, ?> cacheCtx = new GridCacheContext( - ctx, - sharedCtx, - cfg, + for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { + GridCacheAdapter cache = e.getValue(); - /* - * Managers in starting order! - * =========================== - */ - evtMgr, - swapMgr, - storeMgr, - evictMgr, - qryMgr, - contQryMgr, - affMgr, - dataStructuresMgr, - ttlMgr, - drMgr, - jta); + proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); - cacheCtx.cacheObjectContext(cacheObjCtx); + jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false)); + } - GridCacheAdapter cache = null; + // Internal caches which should not be returned to user. + for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { + GridCacheAdapter cache = e.getValue(); - switch (cfg.getCacheMode()) { - case LOCAL: { - switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { - cache = new GridLocalCache(cacheCtx); + if (!sysCaches.contains(e.getKey())) + publicProxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); + } - break; - } - case ATOMIC: { - cache = new GridLocalAtomicCache(cacheCtx); + transactions = new IgniteTransactionsImpl(sharedCtx); - break; - } ++ marshallerCache().context().preloader().syncFuture().listen(new CI1<IgniteInternalFuture<?>>() { ++ @Override public void apply(IgniteInternalFuture<?> f) { ++ ctx.marshallerContext().onMarshallerCacheReady(ctx); ++ } ++ }); + - default: { - assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); - } - } ++ if (log.isDebugEnabled()) ++ log.debug("Started cache processor."); + - break; - } - case PARTITIONED: - case REPLICATED: { - if (GridCacheUtils.isNearEnabled(cfg)) { - switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { - cache = new GridNearTransactionalCache(cacheCtx); - - break; - } - case ATOMIC: { - cache = new GridNearAtomicCache(cacheCtx); + if (log.isDebugEnabled()) + log.debug("Started cache processor."); + } - break; - } + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void onKernalStart() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; - default: { - assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); - } - } - } - else { - switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { - cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtColocatedCache(cacheCtx) : - new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) { + checkTransactionConfiguration(n); - break; - } - case ATOMIC: { - cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) : - new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + DeploymentMode locDepMode = ctx.config().getDeploymentMode(); + DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); - break; - } + CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", + locDepMode, rmtDepMode, true); + } + } - default: { - assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); - } - } - } + validateCfg = false; - break; - } + // Start dynamic caches received from collect discovery data. + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + // Check if validation failed on node start. + desc.checkValid(); - default: { - assert false : "Invalid cache mode: " + cfg.getCacheMode(); - } - } + IgnitePredicate filter = desc.cacheConfiguration().getNodeFilter(); - cacheCtx.cache(cache); + if (filter.apply(ctx.discovery().localNode()) && !desc.locallyConfigured()) { + GridCacheContext ctx = createCache(desc.cacheConfiguration()); - if (caches.containsKey(cfg.getName())) { - String cacheName = cfg.getName(); + ctx.dynamicDeploymentId(desc.deploymentId()); - if (cacheName != null) - throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + - "assign unique name to each cache): " + cacheName); - else - throw new IgniteCheckedException("Default cache has already been configured (check configuration and " + - "assign unique name to each cache)."); - } + sharedCtx.addCacheContext(ctx); - caches.put(cfg.getName(), cache); + GridCacheAdapter cache = ctx.cache(); - if (sysCaches.contains(cfg.getName())) - stopSeq.addLast(cache); - else - stopSeq.addFirst(cache); + String name = desc.cacheConfiguration().getName(); - startSeq.add(cache); + caches.put(name, cache); - /* - * Create DHT cache. - * ================ - */ - if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { - /* - * Specifically don't create the following managers - * here and reuse the one from Near cache: - * 1. GridCacheVersionManager - * 2. GridCacheIoManager - * 3. GridCacheDeploymentManager - * 4. GridCacheQueryManager (note, that we start it for DHT cache though). - * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though). - * 6. GridCacheDgcManager - * 7. GridCacheTtlManager. - * =============================================== - */ - swapMgr = new GridCacheSwapManager(true); - evictMgr = new GridCacheEvictionManager(); - evtMgr = new GridCacheEventManager(); - drMgr = ctx.createComponent(GridCacheDrManager.class); - - cacheCtx = new GridCacheContext( - ctx, - sharedCtx, - cfg, - - /* - * Managers in starting order! - * =========================== - */ - evtMgr, - swapMgr, - storeMgr, - evictMgr, - qryMgr, - contQryMgr, - affMgr, - dataStructuresMgr, - ttlMgr, - drMgr, - jta); - - cacheCtx.cacheObjectContext(cacheObjCtx); - - GridDhtCacheAdapter dht = null; + startCache(cache); - switch (cfg.getAtomicityMode()) { - case TRANSACTIONAL: { - assert cache instanceof GridNearTransactionalCache; + proxies.put(name, new GridCacheProxyImpl(ctx, cache, null)); + + jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false)); + } + } + + // Must call onKernalStart on shared managers after creation of fetched caches. + for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) + mgr.onKernalStart(); - GridNearTransactionalCache near = (GridNearTransactionalCache)cache; + for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { + GridCacheAdapter cache = e.getValue(); - GridDhtCache dhtCache = !GridCacheUtils.isAffinityNode(cfg) ? - new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)) : - new GridDhtCache(cacheCtx); + if (maxPreloadOrder > 0) { + CacheConfiguration cfg = cache.configuration(); - dhtCache.near(near); + int order = cfg.getPreloadOrder(); - near.dht(dhtCache); + if (order > 0 && order != maxPreloadOrder && cfg.getCacheMode() != LOCAL) { + GridCompoundFuture<Object, Object> fut = (GridCompoundFuture<Object, Object>)preloadFuts + .get(order); - dht = dhtCache; + if (fut == null) { + fut = new GridCompoundFuture<>(ctx); - break; + preloadFuts.put(order, fut); } - case ATOMIC: { - assert cache instanceof GridNearAtomicCache; - GridNearAtomicCache near = (GridNearAtomicCache)cache; + fut.add(cache.preloader().syncFuture()); + } + } + } + + for (IgniteInternalFuture<?> fut : preloadFuts.values()) + ((GridCompoundFuture<Object, Object>)fut).markInitialized(); + + for (GridCacheAdapter<?, ?> cache : caches.values()) + onKernalStart(cache); - GridDhtAtomicCache dhtCache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) : - new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + // Wait for caches in SYNC preload mode. + for (GridCacheAdapter<?, ?> cache : caches.values()) { + CacheConfiguration cfg = cache.configuration(); - dhtCache.near(near); + if (cfg.getPreloadMode() == SYNC) { + if (cfg.getCacheMode() == REPLICATED || + (cfg.getCacheMode() == PARTITIONED && cfg.getPreloadPartitionedDelay() >= 0)) + cache.preloader().syncFuture().get(); + } + } - near.dht(dhtCache); + ctx.portable().onCacheProcessorStarted(); + } - dht = dhtCache; + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void stop(boolean cancel) throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; - break; - } + for (GridCacheAdapter<?, ?> cache : stopSeq) + stopCache(cache, cancel); - default: { - assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode(); - } - } + List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers(); - cacheCtx.cache(dht); - } + for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridCacheSharedManager<?, ?> mgr = it.previous(); - sharedCtx.addCacheContext(cache.context()); + mgr.stop(cancel); } - // Start shared managers. - for (GridCacheSharedManager mgr : sharedCtx.managers()) - mgr.start(sharedCtx); + sharedCtx.cleanup(); - for (GridCacheAdapter<?, ?> cache : startSeq) { - GridCacheContext<?, ?> cacheCtx = cache.context(); + if (log.isDebugEnabled()) + log.debug("Stopped cache processor."); + } - CacheConfiguration cfg = cacheCtx.config(); + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void onKernalStop(boolean cancel) { + if (ctx.config().isDaemon()) + return; - // Start managers. - for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx)))) - mgr.start(cacheCtx); + for (GridCacheAdapter<?, ?> cache : stopSeq) + onKernalStop(cache, cancel); - cacheCtx.initConflictResolver(); + List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers(); - if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { - GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context(); + for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size()); + it.hasPrevious();) { + GridCacheSharedManager<?, ?> mgr = it.previous(); - // Start DHT managers. - for (GridCacheManager mgr : dhtManagers(dhtCtx)) - mgr.start(dhtCtx); + mgr.onKernalStop(cancel); + } + } - dhtCtx.initConflictResolver(); + /** + * @param cache Cache to start. + * @throws IgniteCheckedException If failed to start cache. + */ + @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) + private void startCache(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException { + GridCacheContext<?, ?> cacheCtx = cache.context(); - // Start DHT cache. - dhtCtx.cache().start(); + CacheConfiguration cfg = cacheCtx.config(); - if (log.isDebugEnabled()) - log.debug("Started DHT cache: " + dhtCtx.cache().name()); - } + // Start managers. + for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx)))) + mgr.start(cacheCtx); - cacheCtx.cache().start(); + cacheCtx.initConflictResolver(); - if (log.isInfoEnabled()) - log.info("Started cache [name=" + cfg.getName() + ", mode=" + cfg.getCacheMode() + ']'); - } + if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { + GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context(); - for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { - GridCacheAdapter cache = e.getValue(); + // Start DHT managers. + for (GridCacheManager mgr : dhtManagers(dhtCtx)) + mgr.start(dhtCtx); - proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); + dhtCtx.initConflictResolver(); - jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false)); + // Start DHT cache. + dhtCtx.cache().start(); + + if (log.isDebugEnabled()) + log.debug("Started DHT cache: " + dhtCtx.cache().name()); } - // Internal caches which should not be returned to user. - for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { - GridCacheAdapter cache = e.getValue(); + cacheCtx.cache().start(); - if (!sysCaches.contains(e.getKey())) - publicProxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); - } + if (log.isInfoEnabled()) + log.info("Started cache [name=" + cfg.getName() + ", mode=" + cfg.getCacheMode() + ']'); + } - transactions = new IgniteTransactionsImpl(sharedCtx); + /** + * @param cache Cache to stop. + * @param cancel Cancel flag. + */ + @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) + private void stopCache(GridCacheAdapter<?, ?> cache, boolean cancel) { + GridCacheContext ctx = cache.context(); - if (!(ctx.isDaemon() || F.isEmpty(ctx.config().getCacheConfiguration()))) { - GridCacheAttributes[] attrVals = new GridCacheAttributes[ctx.config().getCacheConfiguration().length]; + sharedCtx.removeCacheContext(ctx); - Map<String, String> interceptors = new HashMap<>(); + cache.stop(); + + if (isNearEnabled(ctx)) { + GridDhtCacheAdapter dht = ctx.near().dht(); - int i = 0; + // Check whether dht cache has been started. + if (dht != null) { + dht.stop(); - for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { - assert caches.containsKey(cfg.getName()) : cfg.getName(); + GridCacheContext<?, ?> dhtCtx = dht.context(); - GridCacheContext ctx = caches.get(cfg.getName()).context(); + List<GridCacheManager> dhtMgrs = dhtManagers(dhtCtx); - attrVals[i++] = new GridCacheAttributes(cfg, ctx.store().configuredStore()); + for (ListIterator<GridCacheManager> it = dhtMgrs.listIterator(dhtMgrs.size()); it.hasPrevious();) { + GridCacheManager mgr = it.previous(); - if (cfg.getInterceptor() != null) - interceptors.put(cfg.getName(), cfg.getInterceptor().getClass().getName()); + mgr.stop(cancel); + } } + } - ctx.addNodeAttribute(ATTR_CACHE, attrVals); + List<GridCacheManager> mgrs = ctx.managers(); - ctx.addNodeAttribute(ATTR_TX_CONFIG, ctx.config().getTransactionConfiguration()); + Collection<GridCacheManager> excludes = dhtExcludes(ctx); - if (!interceptors.isEmpty()) - ctx.addNodeAttribute(ATTR_CACHE_INTERCEPTORS, interceptors); + // Reverse order. + for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridCacheManager mgr = it.previous(); + + if (!excludes.contains(mgr)) + mgr.stop(cancel); } - marshallerCache().context().preloader().syncFuture().listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - ctx.marshallerContext().onMarshallerCacheReady(ctx); - } - }); + U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.jta().tmLookup(), + ctx.store().configuredStore())); - if (log.isDebugEnabled()) - log.debug("Started cache processor."); + if (log.isInfoEnabled()) + log.info("Stopped cache: " + cache.name()); + + cleanup(ctx); } /** @@@ -2288,47 -1842,6 +2311,43 @@@ /** * */ + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + public class DynamicCacheStartFuture extends GridFutureAdapter<Object> { + /** Start ID. */ + private IgniteUuid deploymentId; + + /** Cache name. */ + private String cacheName; + + /** - * @param ctx Kernal context. + */ - private DynamicCacheStartFuture(GridKernalContext ctx, String cacheName, IgniteUuid deploymentId) { - // Start future can be completed from discovery thread, notification must NOT be sync. - super(ctx, false); - ++ private DynamicCacheStartFuture(String cacheName, IgniteUuid deploymentId) { + this.deploymentId = deploymentId; + this.cacheName = cacheName; + } + + /** + * @return Start ID. + */ + public IgniteUuid deploymentId() { + return deploymentId; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + pendingFuts.remove(cacheName, this); + + return true; + } + + return false; + } + } + + /** + * + */ private static class LocalAffinityFunction implements CacheAffinityFunction { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index ef00131,82568fb..4be4ce7 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@@ -403,8 -382,8 +403,8 @@@ public class GridCacheSharedContext<K, * @return {@code true} if waiting was successful. */ @SuppressWarnings({"unchecked"}) - public IgniteInternalFuture<?> partitionReleaseFuture(long topVer) { + public IgniteInternalFuture<?> partitionReleaseFuture(AffinityTopologyVersion topVer) { - GridCompoundFuture f = new GridCompoundFuture(kernalCtx); + GridCompoundFuture f = new GridCompoundFuture(); f.add(mvcc().finishExplicitLocks(topVer)); f.add(tm().finishTxs(topVer)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 1b27c69,d6aef65..67b2a0a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@@ -1516,7 -1648,7 +1649,7 @@@ public class GridCacheSwapManager exten * @return Swap entries iterator. * @throws IgniteCheckedException If failed. */ - public Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer) - public <K, V> Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean backup, long topVer) ++ public <K, V> Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer) throws IgniteCheckedException { assert primary || backup; @@@ -1546,7 -1678,7 +1679,7 @@@ * @return Offheap entries iterator. * @throws IgniteCheckedException If failed. */ - public Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer) - public <K, V> Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, long topVer) ++ public <K, V> Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer) throws IgniteCheckedException { assert primary || backup; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java index 89f46ec,028ab12..3ff7aa8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java @@@ -80,9 -71,10 +73,10 @@@ public class GridCacheTtlUpdateRequest * @param topVer Topology version. * @param ttl TTL. */ - public GridCacheTtlUpdateRequest(@NotNull AffinityTopologyVersion topVer, long ttl) { - public GridCacheTtlUpdateRequest(int cacheId, long topVer, long ttl) { ++ public GridCacheTtlUpdateRequest(int cacheId, AffinityTopologyVersion topVer, long ttl) { assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl; + this.cacheId = cacheId; this.topVer = topVer; this.ttl = ttl; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 63f1a62,22d3278..98d39fd --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@@ -161,9 -167,9 +168,9 @@@ public abstract class GridDistributedCa /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync() { - GridFutureAdapter<Void> opFut = new GridFutureAdapter<>(ctx.kernalContext()); + GridFutureAdapter<Void> opFut = new GridFutureAdapter<>(); - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); removeAllAsync(opFut, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index d72c9b6,9a364ea..ceb8a7c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@@ -19,7 -19,7 +19,8 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; + import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@@ -468,11 -473,11 +474,11 @@@ public class GridDistributedTxRemoteAda // ensure proper lock ordering for removed entries. cctx.tm().addCommittedTx(this); - long topVer = topologyVersion(); + AffinityTopologyVersion topVer = topologyVersion(); // Node that for near transactions we grab all entries. - for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) { - GridCacheContext<K, V> cacheCtx = txEntry.context(); + for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) { + GridCacheContext cacheCtx = txEntry.context(); boolean replicate = cacheCtx.isDrEnabled(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index bfda92d,bd50ae7..d72ee3d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@@ -255,7 -254,7 +255,7 @@@ public class GridClientPartitionTopolog } /** {@inheritDoc} */ - @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, AffinityTopologyVersion topVer, boolean create) - @Nullable @Override public GridDhtLocalPartition localPartition(int p, long topVer, boolean create) ++ @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create) throws GridDhtInvalidPartitionException { if (!create) return null; @@@ -265,8 -264,8 +265,8 @@@ } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) { + @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { - return localPartition(1, -1, create); + return localPartition(1, AffinityTopologyVersion.NONE, create); } /** {@inheritDoc} */ @@@ -280,7 -279,7 +280,7 @@@ } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition<K, V> onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry<K, V> e) { - @Override public GridDhtLocalPartition onAdded(long topVer, GridDhtCacheEntry e) { ++ @Override public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e) { assert false : "Entry should not be added to client topology: " + e; return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index d74a62e,3e70756..645d590 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@@ -58,11 -64,12 +65,12 @@@ public class GridDhtAssignmentFetchFutu * @param ctx Cache context. * @param availableNodes Available nodes. */ - public GridDhtAssignmentFetchFuture(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, Collection<ClusterNode> availableNodes) { - super(ctx.kernalContext()); - + public GridDhtAssignmentFetchFuture( + GridCacheContext ctx, - long topVer, ++ AffinityTopologyVersion topVer, + Collection<ClusterNode> availableNodes + ) { this.ctx = ctx; - this.topVer = topVer; LinkedList<ClusterNode> tmp = new LinkedList<>(); @@@ -85,8 -95,8 +96,8 @@@ * @param node Node. * @param res Reponse. */ - public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse<K, V> res) { + public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) { - if (res.topologyVersion() != topVer) { + if (!res.topologyVersion().equals(topVer)) { if (log.isDebugEnabled()) log.debug("Received affinity assignment for wrong topolgy version (will ignore) " + "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00f5d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 517920e,40a4987..09466a8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@@ -95,11 -94,11 +95,11 @@@ public abstract class GridDhtCacheAdapt /** {@inheritDoc} */ @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() { + map.setEntryFactory(new GridCacheMapEntryFactory() { /** {@inheritDoc} */ - @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, AffinityTopologyVersion topVer, - K key, int hash, V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) { - return new GridDhtCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId); - @Override public GridCacheMapEntry create(GridCacheContext ctx, long topVer, KeyCacheObject key, int hash, ++ @Override public GridCacheMapEntry create(GridCacheContext ctx, AffinityTopologyVersion topVer, KeyCacheObject key, int hash, + CacheObject val, GridCacheMapEntry next, long ttl, int hdrId) { + return new GridDhtCacheEntry(ctx, topVer, key, hash, val, next, ttl, hdrId); } }); } @@@ -265,9 -264,9 +265,9 @@@ GridCompoundFuture<IgniteUuid, Object> fut = null; for (MultiUpdateFuture multiFut : multiTxFuts.values()) { - if (multiFut.topologyVersion() <= topVer) { + if (multiFut.topologyVersion().compareTo(topVer) <= 0) { if (fut == null) - fut = new GridCompoundFuture<>(ctx.kernalContext()); + fut = new GridCompoundFuture<>(); fut.add(multiFut); } @@@ -310,7 -310,7 +311,7 @@@ * * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - @Override public GridCacheEntryEx<K, V> entryEx(K key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { - @Override public GridCacheEntryEx entryEx(KeyCacheObject key, long topVer) throws GridDhtInvalidPartitionException { ++ @Override public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { return super.entryEx(key, topVer); } @@@ -329,8 -329,8 +330,8 @@@ * @return DHT entry. * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - public GridDhtCacheEntry<K, V> entryExx(K key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { - return (GridDhtCacheEntry<K, V>)entryEx(key, topVer); - public GridDhtCacheEntry entryExx(KeyCacheObject key, long topVer) throws GridDhtInvalidPartitionException { ++ public GridDhtCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { + return (GridDhtCacheEntry)entryEx(key, topVer); } /** @@@ -345,10 -345,10 +346,10 @@@ * @throws GridDhtInvalidPartitionException if entry does not belong to this node and * {@code allowDetached} is {@code false}. */ - public GridCacheEntryEx<K, V> entryExx(K key, AffinityTopologyVersion topVer, boolean allowDetached, boolean touch) { - public GridCacheEntryEx entryExx(KeyCacheObject key, long topVer, boolean allowDetached, boolean touch) { ++ public GridCacheEntryEx entryExx(KeyCacheObject key, AffinityTopologyVersion topVer, boolean allowDetached, boolean touch) { try { return allowDetached && !ctx.affinity().localNode(key, topVer) ? - new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) : + new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0, 0) : entryEx(key, touch); } catch (GridDhtInvalidPartitionException e) { @@@ -423,19 -425,18 +426,19 @@@ * @param replicate Replication flag. * @param plc Expiry policy. */ - private void loadEntry(K key, - V val, + private void loadEntry(KeyCacheObject key, + Object val, GridCacheVersion ver, @Nullable IgniteBiPredicate<K, V> p, - long topVer, + AffinityTopologyVersion topVer, boolean replicate, @Nullable ExpiryPolicy plc) { - if (p != null && !p.apply(key, val)) + if (p != null && !p.apply(key.<K>value(ctx.cacheObjectContext(), false), (V)val)) return; try { - GridDhtLocalPartition<K, V> part = top.localPartition(ctx.affinity().partition(key), - GridDhtLocalPartition part = top.localPartition(ctx.affinity().partition(key), -1, true); ++ GridDhtLocalPartition part = top.localPartition(ctx.affinity().partition(key), + AffinityTopologyVersion.NONE, true); // Reserve to make sure that partition does not get unloaded. if (part.reserve()) { @@@ -485,9 -483,9 +485,9 @@@ @Override public int primarySize() { int sum = 0; - long topVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - for (GridDhtLocalPartition<K, V> p : topology().currentLocalPartitions()) { + for (GridDhtLocalPartition p : topology().currentLocalPartitions()) { if (p.primary(topVer)) sum += p.publicSize(); } @@@ -579,16 -576,14 +578,14 @@@ * @param expiry Expiry policy. * @return DHT future. */ - public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader, + public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader, long msgId, - LinkedHashMap<? extends K, Boolean> keys, + LinkedHashMap<KeyCacheObject, Boolean> keys, boolean readThrough, boolean reload, - long topVer, + AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, - boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals) { GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, @@@ -614,8 -608,8 +610,8 @@@ * @param nodeId Node ID. * @param req Get request. */ - protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest<K, V> req) { + protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) { - assert isAffinityNode(cacheCfg); + assert ctx.affinityNode(); long ttl = req.accessTtl(); @@@ -683,11 -675,11 +678,11 @@@ assert entries != null && !entries.isEmpty(); - Map<ClusterNode, GridCacheTtlUpdateRequest<K, V>> reqMap = new HashMap<>(); + Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>(); - long topVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); - for (Map.Entry<Object, IgniteBiTuple<byte[], GridCacheVersion>> e : entries.entrySet()) { + for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) { List<ClusterNode> nodes = ctx.affinity().nodes((K)e.getKey(), topVer); for (int i = 0; i < nodes.size(); i++) { @@@ -837,10 -827,10 +830,10 @@@ /** {@inheritDoc} */ @NotNull @Override public Iterator<Cache.Entry<K, V>> iterator() { - final GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId, + final GridDhtLocalPartition part = ctx.topology().localPartition(partId, - ctx.discovery().topologyVersion(), false); + new AffinityTopologyVersion(ctx.discovery().topologyVersion()), false); - Iterator<GridDhtCacheEntry<K, V>> partIt = part == null ? null : part.entries().iterator(); + Iterator<GridDhtCacheEntry> partIt = part == null ? null : part.entries().iterator(); return new PartitionEntryIterator<>(partIt); } @@@ -890,8 -880,8 +883,8 @@@ /** {@inheritDoc} */ @Override public int size() { - GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId, + GridDhtLocalPartition part = ctx.topology().localPartition(partId, - ctx.discovery().topologyVersion(), false); + new AffinityTopologyVersion(ctx.discovery().topologyVersion()), false); return part != null ? part.publicSize() : 0; } @@@ -909,11 -901,10 +902,11 @@@ } /** {@inheritDoc} */ - @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, GridCacheVersion ver) { + @Override public void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver) { assert entry.isDht(); - GridDhtLocalPartition<K, V> part = topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE, - GridDhtLocalPartition part = topology().localPartition(entry.partition(), -1, false); ++ GridDhtLocalPartition part = topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE, + false); // Do not remove entry on replica topology. Instead, add entry to removal queue. // It will be cleared eventually. @@@ -938,14 -929,14 +931,14 @@@ if (primary && backup) return iterator(map.entries0().iterator(), !ctx.keepPortable()); else { - final long topVer = ctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - final Iterator<GridDhtLocalPartition<K, V>> partIt = topology().currentLocalPartitions().iterator(); + final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator(); - Iterator<GridCacheEntryEx<K, V>> it = new Iterator<GridCacheEntryEx<K, V>>() { - private GridCacheEntryEx<K, V> next; + Iterator<GridCacheEntryEx> it = new Iterator<GridCacheEntryEx>() { + private GridCacheEntryEx next; - private Iterator<GridDhtCacheEntry<K, V>> curIt; + private Iterator<GridDhtCacheEntry> curIt; { advance(); @@@ -1083,22 -1074,20 +1076,12 @@@ private static final long serialVersionUID = 0L; /** Topology version. */ - private long topVer; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public MultiUpdateFuture() { - // No-op. - } + private AffinityTopologyVersion topVer; /** - * Empty constructor required by {@link Externalizable}. - */ - public MultiUpdateFuture() { - // No-op. - } - - /** -- * @param ctx Kernal context. * @param topVer Topology version. */ - private MultiUpdateFuture(GridKernalContext ctx, @NotNull AffinityTopologyVersion topVer) { - super(ctx); - - private MultiUpdateFuture(GridKernalContext ctx, long topVer) { ++ private MultiUpdateFuture(@NotNull AffinityTopologyVersion topVer) { this.topVer = topVer; }