http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 78966d0..1d57ef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -80,7 +80,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec private IgniteUuid futId = IgniteUuid.randomUuid(); /** Preloader. */ - private GridDhtPreloader<K, V> preloader; + private GridDhtPreloader preloader; /** Trackable flag. */ private boolean trackable; @@ -95,7 +95,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec GridCacheContext<K, V> cctx, AffinityTopologyVersion topVer, Collection<KeyCacheObject> keys, - GridDhtPreloader<K, V> preloader + GridDhtPreloader preloader ) { assert topVer.topologyVersion() != 0 : topVer; assert !F.isEmpty(keys) : keys; @@ -208,21 +208,21 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @return {@code True} if some mapping was added. */ private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) { - Map<ClusterNode, Set<KeyCacheObject>> mappings = new HashMap<>(); - - ClusterNode loc = cctx.localNode(); - - int curTopVer = topCntr.get(); + Map<ClusterNode, Set<KeyCacheObject>> mappings = null; for (KeyCacheObject key : keys) - map(key, mappings, exc); + mappings = map(key, mappings, exc); if (isDone()) return false; boolean ret = false; - if (!mappings.isEmpty()) { + if (mappings != null) { + ClusterNode loc = cctx.localNode(); + + int curTopVer = topCntr.get(); + preloader.addFuture(this); trackable = true; @@ -275,22 +275,27 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param key Key. * @param exc Exclude nodes. * @param mappings Mappings. + * @return Mappings. */ - private void map(KeyCacheObject key, Map<ClusterNode, Set<KeyCacheObject>> mappings, Collection<ClusterNode> exc) { + private Map<ClusterNode, Set<KeyCacheObject>> map(KeyCacheObject key, + @Nullable Map<ClusterNode, Set<KeyCacheObject>> mappings, + Collection<ClusterNode> exc) + { ClusterNode loc = cctx.localNode(); - int part = cctx.affinity().partition(key); - GridCacheEntryEx e = cctx.dht().peekEx(key); try { if (e != null && !e.isNewLocked()) { - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { + int part = cctx.affinity().partition(key); + log.debug("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() + ", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']'); + } // Key has been rebalanced or retrieved already. - return; + return mappings; } } catch (GridCacheEntryRemovedException ignore) { @@ -299,6 +304,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec ", locId=" + cctx.nodeId() + ']'); } + int part = cctx.affinity().partition(key); + List<ClusterNode> owners = F.isEmpty(exc) ? top.owners(part, topVer) : new ArrayList<>(F.view(top.owners(part, topVer), F.notIn(exc))); @@ -308,7 +315,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec "topVer=" + topVer + ", locId=" + cctx.nodeId() + ']'); // Key is already rebalanced. - return; + return mappings; } // Create partition. @@ -337,9 +344,12 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec log.debug("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']'); - return; + return mappings; } + if (mappings == null) + mappings = U.newHashMap(keys.size()); + Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet()); assert mappedKeys != null; @@ -357,6 +367,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec log.debug("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() + ", key=" + key + ", part=" + locPart + ", locId=" + cctx.nodeId() + ']'); } + + return mappings; } /**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 633f237..a6e6c4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -53,12 +53,12 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*; * and populating local cache. */ @SuppressWarnings("NonConstantFieldWithUpperCaseName") -public class GridDhtPartitionDemandPool<K, V> { +public class GridDhtPartitionDemandPool { /** Dummy message to wake up a blocking queue if a node leaves. */ private final SupplyMessage DUMMY_TOP = new SupplyMessage(); /** */ - private final GridCacheContext<K, V> cctx; + private final GridCacheContext<?, ?> cctx; /** */ private final IgniteLogger log; @@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param cctx Cache context. * @param busyLock Shutdown lock. */ - public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) { + public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) { assert cctx != null; assert busyLock != null; @@ -108,9 +108,11 @@ public class GridDhtPartitionDemandPool<K, V> { log = cctx.logger(getClass()); - poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; + boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); - if (poolSize > 0) { + poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0; + + if (enabled) { barrier = new CyclicBarrier(poolSize); dmdWorkers = new ArrayList<>(poolSize); @@ -327,7 +329,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param assigns Assignments. * @param force {@code True} if dummy reassign. */ - void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) { + void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -399,7 +401,7 @@ public class GridDhtPartitionDemandPool<K, V> { private int id; /** Partition-to-node assignments. */ - private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>(); + private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>(); /** Message queue. */ private final LinkedBlockingDeque<SupplyMessage> msgQ = @@ -425,7 +427,7 @@ public class GridDhtPartitionDemandPool<K, V> { /** * @param assigns Assignments. */ - void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) { + void addAssignments(GridDhtPreloaderAssignments assigns) { assert assigns != null; assignQ.offer(assigns); @@ -885,7 +887,7 @@ public class GridDhtPartitionDemandPool<K, V> { } // Sync up all demand threads at this step. - GridDhtPreloaderAssignments<K, V> assigns = null; + GridDhtPreloaderAssignments assigns = null; while (assigns == null) assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this); @@ -995,12 +997,12 @@ public class GridDhtPartitionDemandPool<K, V> { * @param exchFut Exchange future. * @return Assignments of partitions to nodes. */ - GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { + GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { // No assignments for disabled preloader. GridDhtPartitionTopology top = cctx.dht().topology(); if (!cctx.rebalanceEnabled()) - return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); int partCnt = cctx.affinity().partitions(); @@ -1009,7 +1011,7 @@ public class GridDhtPartitionDemandPool<K, V> { "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", topVer=" + top.topologyVersion() + ']'; - GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); AffinityTopologyVersion topVer = assigns.topologyVersion(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java index facf7e3..faa6cf6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java @@ -237,7 +237,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext * @return Full string representation. */ public String toFullString() { - return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", super.toString()); + return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 5d9677d..13cfef3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Thread pool for supplying partitions to demanding nodes. */ -class GridDhtPartitionSupplyPool<K, V> { +class GridDhtPartitionSupplyPool { /** */ - private final GridCacheContext<K, V> cctx; + private final GridCacheContext<?, ?> cctx; /** */ private final IgniteLogger log; @@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool<K, V> { * @param cctx Cache context. * @param busyLock Shutdown lock. */ - GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) { + GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) { assert cctx != null; assert busyLock != null; @@ -83,16 +83,18 @@ class GridDhtPartitionSupplyPool<K, V> { top = cctx.dht().topology(); - int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; + if (!cctx.kernalContext().clientNode()) { + int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; - for (int i = 0; i < poolSize; i++) - workers.add(new SupplyWorker()); + for (int i = 0; i < poolSize; i++) + workers.add(new SupplyWorker()); - cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { - @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { - processDemandMessage(id, m); - } - }); + cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { + processDemandMessage(id, m); + } + }); + } depEnabled = cctx.gridDeploy().enabled(); } @@ -248,11 +250,6 @@ class GridDhtPartitionSupplyPool<K, V> { boolean ack = false; try { - // Partition map exchange is finished which means that all near transactions with given - // topology version are committed. We can wait for local locks here as it will not take - // much time. - cctx.mvcc().finishLocks(d.topologyVersion()).get(); - for (int part : d.partitions()) { GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4b8db00..9f18c98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -44,6 +44,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** @@ -117,8 +119,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private GridFutureAdapter<Boolean> initFut; /** Topology snapshot. */ - private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = - new AtomicReference<>(); + private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>(); /** Last committed cache version before next topology version use. */ private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>(); @@ -146,8 +147,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** Dynamic cache change requests. */ private Collection<DynamicCacheChangeRequest> reqs; + /** Cache validation results. */ private volatile Map<Integer, Boolean> cacheValidRes; + /** Skip preload flag. */ + private boolean skipPreload; + /** * Dummy future created to trigger reassignments if partition * topology changed while preloading. @@ -200,6 +205,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @param cctx Cache context. * @param busyLock Busy lock. * @param exchId Exchange ID. + * @param reqs Cache change requests. */ public GridDhtPartitionsExchangeFuture( GridCacheSharedContext cctx, @@ -221,16 +227,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT log = cctx.logger(getClass()); - // Grab all nodes with order of equal or less than last joined node. - oldestNode.set(CU.oldest(cctx, exchId.topologyVersion())); - - assert oldestNode.get() != null; - initFut = new GridFutureAdapter<>(); if (log.isDebugEnabled()) - log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + - ", fut=" + this + ']'); + log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']'); + } + + /** + * @param reqs Cache change requests. + */ + public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) { + this.reqs = reqs; } /** {@inheritDoc} */ @@ -250,6 +257,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @return Skip preload flag. + */ + public boolean skipPreload() { + return skipPreload; + } + + /** * @return Dummy flag. */ public boolean dummy() { @@ -279,9 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** * @param cacheId Cache ID to check. + * @param topVer Topology version. * @return {@code True} if cache was added during this exchange. */ - public boolean isCacheAdded(int cacheId) { + public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) { if (!F.isEmpty(reqs)) { for (DynamicCacheChangeRequest req : reqs) { if (req.start() && !req.clientStartOnly()) { @@ -291,7 +306,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } - return false; + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer); } /** @@ -312,7 +329,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * Rechecks topology. + * @param cacheCtx Cache context. + * @throws IgniteCheckedException If failed. */ private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException { if (stopping(cacheCtx.cacheId())) @@ -330,8 +348,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchId + ']'); // Fetch affinity assignment from remote node. - GridDhtAssignmentFetchFuture fetchFut = - new GridDhtAssignmentFetchFuture(cacheCtx, exchId.topologyVersion(), CU.affinityNodes(cacheCtx)); + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cacheCtx, + exchId.topologyVersion(), + CU.affinityNodes(cacheCtx, exchId.topologyVersion())); fetchFut.init(); @@ -341,11 +360,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT log.debug("Fetched affinity from remote node, initializing affinity assignment [locNodeId=" + cctx.localNodeId() + ", topVer=" + exchId.topologyVersion() + ']'); + if (affAssignment == null) { + affAssignment = new ArrayList<>(cacheCtx.affinity().partitions()); + + List<ClusterNode> empty = Collections.emptyList(); + + for (int i = 0; i < cacheCtx.affinity().partitions(); i++) + affAssignment.add(empty); + } + cacheCtx.affinity().initializeAffinity(exchId.topologyVersion(), affAssignment); } } /** + * @param cacheCtx Cache context. * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange. */ private boolean canCalculateAffinity(GridCacheContext cacheCtx) { @@ -391,20 +420,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @return Exchange id. - */ - GridDhtPartitionExchangeId key() { - return exchId; - } - - /** - * @return Oldest node. - */ - ClusterNode oldestNode() { - return oldestNode.get(); - } - - /** * @return Exchange ID. */ public GridDhtPartitionExchangeId exchangeId() { @@ -412,13 +427,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @return Init future. - */ - IgniteInternalFuture<?> initFuture() { - return initFut; - } - - /** * @return {@code true} if entered to busy state. */ private boolean enterBusy() { @@ -444,7 +452,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteInterruptedCheckedException If interrupted. */ public void init() throws IgniteInterruptedCheckedException { - assert oldestNode.get() != null; + if (isDone()) + return; if (init.compareAndSet(false, true)) { if (isDone()) @@ -455,10 +464,118 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT // will return corresponding nodes. U.await(evtLatch); + assert discoEvt != null : this; + assert !dummy && !forcePreload : this; + + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); + + oldestNode.set(oldest); + startCaches(); + // True if client node joined or failed. + boolean clientNodeEvt; + + if (F.isEmpty(reqs)) { + int type = discoEvt.type(); + + assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt; + + clientNodeEvt = CU.clientNode(discoEvt.eventNode()); + } + else { + assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt; + + boolean clientOnlyStart = true; + + for (DynamicCacheChangeRequest req : reqs) { + if (!req.clientStartOnly()) { + clientOnlyStart = false; + + break; + } + } + + clientNodeEvt = clientOnlyStart; + } + + if (clientNodeEvt) { + ClusterNode node = discoEvt.eventNode(); + + // Client need to initialize affinity for local join event or for stated client caches. + if (!node.isLocal()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + GridDhtPartitionTopology top = cacheCtx.topology(); + + top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); + + if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) { + initTopology(cacheCtx); + + top.beforeExchange(this); + } + else + cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion()); + } + + if (exchId.isLeft()) + cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); + + onDone(exchId.topologyVersion()); + + skipPreload = cctx.kernalContext().clientNode(); + + return; + } + } + + if (cctx.kernalContext().clientNode()) { + skipPreload = true; + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + GridDhtPartitionTopology top = cacheCtx.topology(); + + top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); + } + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + initTopology(cacheCtx); + } + + if (oldestNode.get() != null) { + rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx, + exchId.topologyVersion())); + + rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); + + ready.set(true); + + initFut.onDone(true); + + if (log.isDebugEnabled()) + log.debug("Initialized future: " + this); + + sendPartitions(); + } + else + onDone(exchId.topologyVersion()); + + return; + } + + assert oldestNode.get() != null; + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (isCacheAdded(cacheCtx.cacheId())) { + if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) { if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty()) U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex()); } @@ -468,8 +585,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT List<String> cachesWithoutNodes = null; - for (String name : cctx.cache().cacheNames()) { - if (exchId.isLeft()) { + if (exchId.isLeft()) { + for (String name : cctx.cache().cacheNames()) { if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) { if (cachesWithoutNodes == null) cachesWithoutNodes = new ArrayList<>(); @@ -505,7 +622,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } if (cachesWithoutNodes != null) { - StringBuilder sb = new StringBuilder("All server nodes for the following caches have left the cluster: "); + StringBuilder sb = + new StringBuilder("All server nodes for the following caches have left the cluster: "); for (int i = 0; i < cachesWithoutNodes.size(); i++) { String cache = cachesWithoutNodes.get(i); @@ -537,7 +655,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } // Grab all alive remote nodes with order of equal or less than last joined node. - rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, + rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx, exchId.topologyVersion())); rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); @@ -591,6 +709,28 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (exchId.isLeft()) cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); + IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion()); + + while (true) { + try { + locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + U.warn(log, "Failed to wait for locks release future. " + + "Dumping pending objects that might be the cause: " + cctx.localNodeId()); + + U.warn(log, "Locked entries:"); + + Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks = + cctx.mvcc().unfinishedLocks(exchId.topologyVersion()); + + for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet()) + U.warn(log, "Locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']'); + } + } + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -650,36 +790,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (log.isDebugEnabled()) log.debug("Initialized future: " + this); - if (canSkipExchange()) - onDone(exchId.topologyVersion()); + // If this node is not oldest. + if (!oldestNode.get().id().equals(cctx.localNodeId())) + sendPartitions(); else { - // If this node is not oldest. - if (!oldestNode.get().id().equals(cctx.localNodeId())) - sendPartitions(); - else { - boolean allReceived = allReceived(); + boolean allReceived = allReceived(); - if (allReceived && replied.compareAndSet(false, true)) { - if (spreadPartitions()) - onDone(exchId.topologyVersion()); - } + if (allReceived && replied.compareAndSet(false, true)) { + if (spreadPartitions()) + onDone(exchId.topologyVersion()); } - - scheduleRecheck(); } + + scheduleRecheck(); } else assert false : "Skipped init future: " + this; } /** - * @return {@code True} if no distributed exchange is needed. - */ - private boolean canSkipExchange() { - return false; // TODO ignite-23; - } - - /** * */ private void dumpPendingObjects() { @@ -755,7 +884,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteCheckedException If failed. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last()); + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + cctx.kernalContext().clientNode(), + cctx.versions().last()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) @@ -780,8 +911,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT id.topologyVersion()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + if (!cacheCtx.isLocal()) { + AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); + + boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0; + + if (ready) + m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + } } // It is important that client topologies be added after contexts. @@ -839,14 +976,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** {@inheritDoc} */ @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) { - Map<Integer, Boolean> m = new HashMap<>(); + Map<Integer, Boolean> m = null; for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) + if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) { + if (m == null) + m = new HashMap<>(); + m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes())); + } } - cacheValidRes = m; + cacheValidRes = m != null ? m : Collections.<Integer, Boolean>emptyMap(); cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err); @@ -864,8 +1005,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (timeoutObj != null) cctx.kernalContext().timeout().removeTimeoutObject(timeoutObj); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (exchId.event() == EventType.EVT_NODE_FAILED || exchId.event() == EventType.EVT_NODE_LEFT) + if (exchId.isLeft()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) cacheCtx.config().getAffinity().removeNode(exchId.nodeId()); } @@ -1018,39 +1159,39 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return; } - ClusterNode curOldest = oldestNode.get(); + if (log.isDebugEnabled()) + log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']'); - if (!nodeId.equals(curOldest.id())) { - if (log.isDebugEnabled()) - log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() + - ", unexpectedNodeId=" + nodeId + ']'); + assert exchId.topologyVersion().equals(msg.topologyVersion()); - ClusterNode sender = cctx.discovery().node(nodeId); + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> t) { + ClusterNode curOldest = oldestNode.get(); - if (sender == null) { - if (log.isDebugEnabled()) - log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId + - ", exchId=" + msg.exchangeId() + ']'); + if (!nodeId.equals(curOldest.id())) { + if (log.isDebugEnabled()) + log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() + + ", unexpectedNodeId=" + nodeId + ']'); - return; - } + ClusterNode snd = cctx.discovery().node(nodeId); - // Will process message later if sender node becomes oldest node. - if (sender.order() > curOldest.order()) - fullMsgs.put(nodeId, msg); + if (snd == null) { + if (log.isDebugEnabled()) + log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId + + ", exchId=" + msg.exchangeId() + ']'); - return; - } + return; + } - assert msg.exchangeId().equals(exchId); + // Will process message later if sender node becomes oldest node. + if (snd.order() > curOldest.order()) + fullMsgs.put(nodeId, msg); - if (log.isDebugEnabled()) - log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']'); + return; + } - assert exchId.topologyVersion().equals(msg.topologyVersion()); + assert msg.exchangeId().equals(exchId); - initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> t) { assert msg.lastVersion() != null; cctx.versions().onReceived(nodeId, msg.lastVersion()); @@ -1075,8 +1216,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (cacheCtx != null) cacheCtx.topology().update(exchId, entry.getValue()); - else if (CU.oldest(cctx).isLocal()) - cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); + else { + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + + if (oldest != null && oldest.isLocal()) + cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); + } } } @@ -1135,40 +1280,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean set = false; - ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion()); - - // If local node is now oldest. - if (newOldest.id().equals(cctx.localNodeId())) { - synchronized (mux) { - if (oldestNode.compareAndSet(oldest, newOldest)) { - // If local node is just joining. - if (exchId.nodeId().equals(cctx.localNodeId())) { - try { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - cacheCtx.topology().beforeExchange( - GridDhtPartitionsExchangeFuture.this); + ClusterNode newOldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); + + if (newOldest != null) { + // If local node is now oldest. + if (newOldest.id().equals(cctx.localNodeId())) { + synchronized (mux) { + if (oldestNode.compareAndSet(oldest, newOldest)) { + // If local node is just joining. + if (exchId.nodeId().equals(cctx.localNodeId())) { + try { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) + cacheCtx.topology().beforeExchange( + GridDhtPartitionsExchangeFuture.this); + } } - } - catch (IgniteCheckedException e) { - onDone(e); + catch (IgniteCheckedException e) { + onDone(e); - return; + return; + } } - } - set = true; + set = true; + } } } - } - else { - synchronized (mux) { - set = oldestNode.compareAndSet(oldest, newOldest); - } + else { + synchronized (mux) { + set = oldestNode.compareAndSet(oldest, newOldest); + } - if (set && log.isDebugEnabled()) - log.debug("Reassigned oldest node [this=" + cctx.localNodeId() + - ", old=" + oldest.id() + ", new=" + newOldest.id() + ']'); + if (set && log.isDebugEnabled()) + log.debug("Reassigned oldest node [this=" + cctx.localNodeId() + + ", old=" + oldest.id() + ", new=" + newOldest.id() + ']'); + } } if (set) { @@ -1190,9 +1337,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert rmtNodes != null; - for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) + for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) { if (it.next().id().equals(nodeId)) it.remove(); + } if (allReceived() && ready.get() && replied.compareAndSet(false, true)) if (spreadPartitions()) @@ -1254,30 +1402,34 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter( cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) { @Override public void onTimeout() { - if (isDone()) - return; - - if (!enterBusy()) - return; + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + if (isDone()) + return; + + if (!enterBusy()) + return; + + try { + U.warn(log, + "Retrying preload partition exchange due to timeout [done=" + isDone() + + ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) + + ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) + + ", init=" + init + ", initFut=" + initFut.isDone() + + ", ready=" + ready + ", replied=" + replied + ", added=" + added + + ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" + + oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() + + ", locNodeOrder=" + cctx.localNode().order() + + ", locNodeId=" + cctx.localNode().id() + ']', + "Retrying preload partition exchange due to timeout."); - try { - U.warn(log, - "Retrying preload partition exchange due to timeout [done=" + isDone() + - ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) + - ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) + - ", init=" + init + ", initFut=" + initFut.isDone() + - ", ready=" + ready + ", replied=" + replied + ", added=" + added + - ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" + - oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() + - ", locNodeOrder=" + cctx.localNode().order() + - ", locNodeId=" + cctx.localNode().id() + ']', - "Retrying preload partition exchange due to timeout."); - - recheck(); - } - finally { - leaveBusy(); - } + recheck(); + } + finally { + leaveBusy(); + } + } + }); } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 8256274..73794ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -59,8 +59,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param id Exchange ID. * @param lastVer Last version. + * @param topVer Topology version. */ - public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, + public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, + @Nullable GridCacheVersion lastVer, @NotNull AffinityTopologyVersion topVer) { super(id, lastVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 66140cd..713a80b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -45,6 +45,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Serialized partitions. */ private byte[] partsBytes; + /** */ + private boolean client; + /** * Required by {@link Externalizable}. */ @@ -54,10 +57,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** * @param exchId Exchange ID. + * @param client Client message flag. * @param lastVer Last version. */ - public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) { + public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, + boolean client, + @Nullable GridCacheVersion lastVer) { super(exchId, lastVer); + + this.client = client; + } + + /** + * @return {@code True} if sent from client node. + */ + public boolean client() { + return client; } /** @@ -110,6 +125,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (writer.state()) { case 5: + if (!writer.writeBoolean("client", client)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -132,6 +153,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (reader.state()) { case 5: + client = reader.readBoolean("client"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -151,7 +180,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 7; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index d6373f0..51010ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; @@ -46,7 +47,7 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.*; /** * DHT cache preloader. */ -public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { +public class GridDhtPreloader extends GridCachePreloaderAdapter { /** Default preload resend timeout. */ public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500; @@ -57,13 +58,13 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { private final GridAtomicLong topVer = new GridAtomicLong(); /** Force key futures. */ - private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<K, V>> forceKeyFuts = newMap(); + private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); /** Partition suppliers. */ - private GridDhtPartitionSupplyPool<K, V> supplyPool; + private GridDhtPartitionSupplyPool supplyPool; /** Partition demanders. */ - private GridDhtPartitionDemandPool<K, V> demandPool; + private GridDhtPartitionDemandPool demandPool; /** Start future. */ private final GridFutureAdapter<Object> startFut; @@ -92,7 +93,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { assert !loc.id().equals(n.id()); - for (GridDhtForceKeysFuture<K, V> f : forceKeyFuts.values()) + for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values()) f.onDiscoveryEvent(e); assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + @@ -117,7 +118,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { /** * @param cctx Cache context. */ - public GridDhtPreloader(GridCacheContext<K, V> cctx) { + public GridDhtPreloader(GridCacheContext<?, ?> cctx) { super(cctx); top = cctx.dht().topology(); @@ -158,8 +159,8 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { } }); - supplyPool = new GridDhtPartitionSupplyPool<>(cctx, busyLock); - demandPool = new GridDhtPartitionDemandPool<>(cctx, busyLock); + supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock); + demandPool = new GridDhtPartitionDemandPool(cctx, busyLock); cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } @@ -227,12 +228,14 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { final long start = U.currentTimeMillis(); - if (cctx.config().getRebalanceDelay() >= 0) { - U.log(log, "Starting rebalancing in " + cctx.config().getRebalanceMode() + " mode: " + cctx.name()); + final CacheConfiguration cfg = cctx.config(); + + if (cfg.getRebalanceDelay() >= 0) { + U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name()); demandPool.syncFuture().listen(new CI1<Object>() { @Override public void apply(Object t) { - U.log(log, "Completed rebalancing in " + cctx.config().getRebalanceMode() + " mode " + + U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " + "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); } }); @@ -253,12 +256,12 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { return demandPool.assign(exchFut); } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) { + @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { demandPool.addAssignments(assignments, forcePreload); } @@ -271,7 +274,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> syncFuture() { - return demandPool.syncFuture(); + return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture(); } /** @@ -406,7 +409,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { return; try { - GridDhtForceKeysFuture<K, V> f = forceKeyFuts.get(msg.futureId()); + GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId()); if (f != null) f.onResult(node.id(), msg); @@ -491,7 +494,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { */ @SuppressWarnings( {"unchecked", "RedundantCast"}) @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { - final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); + final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); @@ -543,7 +546,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * * @param fut Future to add. */ - void addFuture(GridDhtForceKeysFuture<K, V> fut) { + void addFuture(GridDhtForceKeysFuture<?, ?> fut) { forceKeyFuts.put(fut.futureId(), fut); } @@ -552,7 +555,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * * @param fut Future to remove. */ - void remoteFuture(GridDhtForceKeysFuture<K, V> fut) { + void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) { forceKeyFuts.remove(fut.futureId(), fut); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 369fc68..2f6ef6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -27,8 +27,7 @@ import java.util.concurrent.*; /** * Partition to node assignments. */ -public class GridDhtPreloaderAssignments<K, V> extends - ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> { +public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index ba3357d..041f83a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -433,6 +433,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + return dht.tryPutIfAbsent(key, val); + } + + /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) throws IgniteCheckedException { return dht.getAndReplace(key, val); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 8258b14..351d6cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -95,7 +95,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public GridCachePreloader<K, V> preloader() { + @Override public GridCachePreloader preloader() { return dht().preloader(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index fc178e3..74438bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -274,7 +274,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (affNodes.isEmpty()) { assert !cctx.affinityNode(); - onDone(new ClusterTopologyCheckedException("Failed to map keys for near-only cache (all partition " + + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all partition " + "nodes left the grid).")); return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 0ffb4e5..3d28018 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -45,7 +45,7 @@ import static org.apache.ignite.events.EventType.*; /** * Cache lock future. */ -public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheMvccFuture<Boolean> { /** */ private static final long serialVersionUID = 0L; @@ -58,7 +58,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Lock owner thread. */ @GridToStringInclude @@ -135,7 +135,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @param skipStore skipStore */ public GridNearLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, Collection<KeyCacheObject> keys, @Nullable GridNearTxLocal tx, boolean read, @@ -184,15 +184,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @return Participating nodes. */ @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); + return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { + if (isMini(f)) + return ((MiniFuture)f).node(); - return cctx.discovery().localNode(); - } - }); + return cctx.discovery().localNode(); + } + }); } /** {@inheritDoc} */ @@ -350,13 +349,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * Undoes all locks. * * @param dist If {@code true}, then remove locks from remote nodes as well. + * @param rollback {@code True} if should rollback tx. */ - private void undoLocks(boolean dist) { + private void undoLocks(boolean dist, boolean rollback) { // Transactions will undo during rollback. if (dist && tx == null) cctx.nearTx().removeLocks(lockVer, keys); else { - if (tx != null) { + if (rollback && tx != null) { if (tx.setRollbackOnly()) { if (log.isDebugEnabled()) log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx); @@ -397,7 +397,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @param dist {@code True} if need to distribute lock release. */ private void onFailed(boolean dist) { - undoLocks(dist); + undoLocks(dist, true); complete(false); } @@ -607,7 +607,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B ", fut=" + this + ']'); if (!success) - undoLocks(distribute); + undoLocks(distribute, true); if (tx != null) cctx.tm().txContext(tx); @@ -682,7 +682,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // Continue mapping on the same topology version as it was before. this.topVer.compareAndSet(null, topVer); - map(keys); + map(keys, false); markInitialized(); @@ -690,14 +690,16 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B } // Must get topology snapshot and map on that version. - mapOnTopology(); + mapOnTopology(false); } /** * Acquires topology future and checks it completeness under the read lock. If it is not complete, * will asynchronously wait for it's completeness and then try again. + * + * @param remap Remap flag. */ - void mapOnTopology() { + void mapOnTopology(final boolean remap) { // We must acquire topology snapshot from the topology version future. cctx.topology().readLock(); @@ -721,19 +723,27 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B AffinityTopologyVersion topVer = fut.topologyVersion(); - if (tx != null) - tx.topologyVersion(topVer); + if (remap) { + if (tx != null) + tx.onRemap(topVer); - this.topVer.compareAndSet(null, topVer); + this.topVer.set(topVer); + } + else { + if (tx != null) + tx.topologyVersion(topVer); + + this.topVer.compareAndSet(null, topVer); + } - map(keys); + map(keys, remap); markInitialized(); } else { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - mapOnTopology(); + mapOnTopology(remap); } }); } @@ -749,14 +759,15 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * groups belonging to one primary node and locks for these groups are acquired sequentially. * * @param keys Keys. + * @param remap Remap flag. */ - private void map(Iterable<KeyCacheObject> keys) { + private void map(Iterable<KeyCacheObject> keys, boolean remap) { try { AffinityTopologyVersion topVer = this.topVer.get(); assert topVer != null; - assert topVer.topologyVersion() > 0; + assert topVer.topologyVersion() > 0 : topVer; if (CU.affinityNodes(cctx, topVer).isEmpty()) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all " + @@ -765,8 +776,11 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return; } - ConcurrentLinkedDeque8<GridNearLockMapping> mappings = - new ConcurrentLinkedDeque8<>(); + boolean clientNode = cctx.kernalContext().clientNode(); + + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); + + ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); // Assign keys to primary nodes. GridNearLockMapping map = null; @@ -795,6 +809,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (log.isDebugEnabled()) log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); + boolean first = true; + // Create mini futures. for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { GridNearLockMapping mapping = iter.next(); @@ -872,6 +888,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (!cand.reentry()) { if (req == null) { + boolean clientFirst = false; + + if (first) { + clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks()); + + first = false; + } + req = new GridNearLockRequest( cctx.cacheId(), topVer, @@ -893,7 +917,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, - skipStore); + skipStore, + clientFirst); mapping.request(req); } @@ -1197,7 +1222,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** * @return DHT cache. */ - private GridDhtTransactionalCacheAdapter<K, V> dht() { + private GridDhtTransactionalCacheAdapter<?, ?> dht() { return cctx.nearTx().dht(); } @@ -1356,110 +1381,146 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return; } - int i = 0; + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); - AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get(); + IgniteInternalFuture<?> affFut = + cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); - for (KeyCacheObject k : keys) { - while (true) { - GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + remap(); + } + }); + } + else + remap(); + } + else { + int i = 0; - try { - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); + AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get(); - return; - } + for (KeyCacheObject k : keys) { + while (true) { + GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); - IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); + try { + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); - CacheObject oldVal = entry.rawGet(); - boolean hasOldVal = false; - CacheObject newVal = res.value(i); + return; + } - boolean readRecordable = false; + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); - if (retval) { - readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + CacheObject oldVal = entry.rawGet(); + boolean hasOldVal = false; + CacheObject newVal = res.value(i); - if (readRecordable) - hasOldVal = entry.hasValue(); - } + boolean readRecordable = false; - GridCacheVersion dhtVer = res.dhtVersion(i); - GridCacheVersion mappedVer = res.mappedVersion(i); + if (retval) { + readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + + if (readRecordable) + hasOldVal = entry.hasValue(); + } - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + GridCacheVersion dhtVer = res.dhtVersion(i); + GridCacheVersion mappedVer = res.mappedVersion(i); - oldVal = oldValTup.get2(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); + + oldVal = oldValTup.get2(); + } } - } - // Lock is held at this point, so we can set the - // returned value if any. - entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); + // Lock is held at this point, so we can set the + // returned value if any. + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); - if (inTx() && implicitTx() && tx.onePhaseCommit()) { - boolean pass = res.filterResult(i); + if (inTx()) { + tx.hasRemoteLocks(true); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); - } + if (implicitTx() && tx.onePhaseCommit()) { + boolean pass = res.filterResult(i); - entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), - res.pending()); - - if (retval) { - if (readRecordable) - cctx.events().addEvent( - entry.partition(), - entry.key(), - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - oldVal, - hasOldVal, - CU.subjectId(tx, cctx.shared()), - null, - inTx() ? tx.resolveTaskName() : null); - - if (cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(false); - } + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); + } + } - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + entry.readyNearLock(lockVer, + mappedVer, + res.committedVersions(), + res.rolledbackVersions(), + res.pending()); + + if (retval) { + if (readRecordable) + cctx.events().addEvent( + entry.partition(), + entry.key(), + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + oldVal, + hasOldVal, + CU.subjectId(tx, cctx.shared()), + null, + inTx() ? tx.resolveTaskName() : null); + + if (cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(false); + } - break; // Inner while loop. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to add candidates because entry was removed (will renew)."); + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); - // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + break; // Inner while loop. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to add candidates because entry was removed (will renew)."); + + // Replace old entry with new one. + entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + } } + + i++; } - i++; - } + try { + proceedMapping(mappings); + } + catch (IgniteCheckedException e) { + onDone(e); + } - try { - proceedMapping(mappings); - } - catch (IgniteCheckedException e) { - onDone(e); + onDone(true); } - - onDone(true); } } + /** + * + */ + private void remap() { + undoLocks(false, false); + + mapOnTopology(true); + + onDone(true); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());