Repository: incubator-ignite Updated Branches: refs/heads/ignite-1093 8341c6d76 -> 4ed77e375
ignite-1093 Parallel supplyPool Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4ed77e37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4ed77e37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4ed77e37 Branch: refs/heads/ignite-1093 Commit: 4ed77e37529e34795a3b581010be3e0f35c8fe6e Parents: 8341c6d Author: Anton Vinogradov <avinogra...@gridgain.com> Authored: Wed Aug 5 18:31:34 2015 +0300 Committer: Anton Vinogradov <avinogra...@gridgain.com> Committed: Wed Aug 5 18:31:34 2015 +0300 ---------------------------------------------------------------------- .../preloader/GridDhtPartitionDemandPool.java | 78 +-- .../preloader/GridDhtPartitionSupplyPool.java | 577 ++++++++++--------- .../GridCacheMassiveRebalancingSelfTest.java | 115 +++- 3 files changed, 432 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ed77e37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index d9d871f..0e0bc01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -424,22 +424,6 @@ public class GridDhtPartitionDemandPool { } /** - * @param timeout Timed out value. - */ - private void growTimeout(long timeout) { - long newTimeout = (long)(timeout * 1.5D); - - // Account for overflow. - if (newTimeout < 0) - newTimeout = Long.MAX_VALUE; - - // Grow by 50% only if another thread didn't do it already. - if (GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout)) - U.warn(log, "Increased rebalancing message timeout from " + timeout + "ms to " + - newTimeout + "ms."); - } - - /** * @param pick Node picked for preloading. * @param p Partition. * @param entry Preloaded entry. @@ -554,7 +538,7 @@ public class GridDhtPartitionDemandPool { return missed; for (int p : d.partitions()) { - cctx.io().addOrderedHandler(topic(p, node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() { + cctx.io().addOrderedHandler(topic(p, topVer.topologyVersion()), new CI2<UUID, GridDhtPartitionSupplyMessage>() { @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { handleSupplyMessage(new SupplyMessage(nodeId, msg), node, topVer, top, remaining, exchFut, missed, d); @@ -565,26 +549,25 @@ public class GridDhtPartitionDemandPool { try { Iterator<Integer> it = remaining.keySet().iterator(); - final int maxC = Runtime.getRuntime().availableProcessors() / 2; //Todo: make param + final int maxC = cctx.config().getRebalanceThreadPoolSize(); int sent = 0; while (sent < maxC && it.hasNext()) { int p = it.next(); - Collection<Integer> ps = Collections.singleton(p); - boolean res = remaining.replace(p, false, true); assert res; // Create copy. - GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, ps); + GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p)); - initD.topic(topic(p, node.id())); + initD.topic(topic(p, topVer.topologyVersion())); // Send initial demand message. - cctx.io().send(node, initD, cctx.ioPolicy()); + cctx.io().sendOrderedMessage(node, + GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout()); sent++; } @@ -598,17 +581,17 @@ public class GridDhtPartitionDemandPool { } finally { for (int p : d.partitions()) - cctx.io().removeOrderedHandler(topic(p, node.id())); + cctx.io().removeOrderedHandler(topic(p, topVer.topologyVersion())); } } /** - * @param p Partition. - * @param id remote node id. + * @param p + * @param topVer * @return topic */ - private Object topic(int p, UUID id) { - return TOPIC_CACHE.topic("Preloading", id, cctx.cacheId(), p); + private Object topic(int p, long topVer) { + return TOPIC_CACHE.topic("DemandPool" + topVer, cctx.cacheId(), p);//Todo topVer as long } /** @@ -631,7 +614,7 @@ public class GridDhtPartitionDemandPool { Set<Integer> missed, GridDhtPartitionDemandMessage d) { - //Todo: check and remove + //Todo: check it still actual and remove // Check that message was received from expected node. if (!s.senderId().equals(node.id())) { U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() + @@ -640,6 +623,9 @@ public class GridDhtPartitionDemandPool { return; } + if (topologyChanged()) + return; + if (log.isDebugEnabled()) log.debug("Received supply message: " + s); @@ -715,7 +701,26 @@ public class GridDhtPartitionDemandPool { remaining.remove(p); - demandNextPartition(node, remaining, d); + demandNextPartition(node, remaining, d, topVer); + } + else { + try { + // Create copy. + GridDhtPartitionDemandMessage nextD = + new GridDhtPartitionDemandMessage(d, Collections.singleton(p)); + + nextD.topic(topic(p, topVer.topologyVersion())); + + // Send demand message. + cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), + nextD, cctx.ioPolicy(), d.timeout()); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to receive partitions from node (rebalancing will not " + + "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex); + + cancel(); + } } } finally { @@ -751,24 +756,25 @@ public class GridDhtPartitionDemandPool { * @param node Node. * @param remaining Remaining. * @param d initial DemandMessage. + * @param topVer Topology version. */ private void demandNextPartition( final ClusterNode node, final ConcurrentHashMap8<Integer, Boolean> remaining, - final GridDhtPartitionDemandMessage d + final GridDhtPartitionDemandMessage d, + final AffinityTopologyVersion topVer ) { try { for (Integer p : remaining.keySet()) { if (remaining.replace(p, false, true)) { - Collection<Integer> nextPs = Collections.singleton(p); - // Create copy. - GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, nextPs); + GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p)); - nextD.topic(topic(p, node.id())); + nextD.topic(topic(p, topVer.topologyVersion())); // Send demand message. - cctx.io().send(node, nextD, cctx.ioPolicy()); + cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), + nextD, cctx.ioPolicy(), d.timeout()); break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ed77e37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 42d6bb2..f10837a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -19,25 +19,22 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; -import org.apache.ignite.thread.*; -import org.jetbrains.annotations.*; +import org.jsr166.*; import java.io.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.locks.*; -import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.internal.GridTopic.*; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; /** @@ -51,23 +48,20 @@ class GridDhtPartitionSupplyPool { private final IgniteLogger log; /** */ - private final ReadWriteLock busyLock; - - /** */ private GridDhtPartitionTopology top; /** */ - private final Collection<SupplyWorker> workers = new LinkedList<>(); - - /** */ - private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>(); - - /** */ private final boolean depEnabled; /** Preload predicate. */ private IgnitePredicate<GridCacheEntryInfo> preloadPred; + /** Supply context map. */ + private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>(); + + /** Done map. */ + private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();//Todo: refactor + /** * @param cctx Cache context. * @param busyLock Shutdown lock. @@ -77,43 +71,42 @@ class GridDhtPartitionSupplyPool { assert busyLock != null; this.cctx = cctx; - this.busyLock = busyLock; log = cctx.logger(getClass()); top = cctx.dht().topology(); if (!cctx.kernalContext().clientNode()) { - int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; - - for (int i = 0; i < poolSize; i++) - workers.add(new SupplyWorker()); - - cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { - @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { - processDemandMessage(id, m); - } - }); + for (int p = 0; p <= cctx.affinity().partitions(); p++) + cctx.io().addOrderedHandler(topic(p, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { + processMessage(m, id); + } + }); } depEnabled = cctx.gridDeploy().enabled(); } /** + * @param p Partition. + * @param id Node id. + * @return topic + */ + static Object topic(int p, int id) { + return TOPIC_CACHE.topic("SupplyPool", id, p); + } + + /** * */ void start() { - for (SupplyWorker w : workers) - new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start(); } /** * */ void stop() { - U.cancel(workers); - U.join(workers, log); - top = null; } @@ -127,164 +120,85 @@ class GridDhtPartitionSupplyPool { } /** - * @return Size of this thread pool. + * @param d Demand message. + * @param id Node uuid. */ - int poolSize() { - return cctx.config().getRebalanceThreadPoolSize(); - } + private void processMessage(GridDhtPartitionDemandMessage d, UUID id) { + assert d != null; + assert id != null; - /** - * @return {@code true} if entered to busy state. - */ - private boolean enterBusy() { - if (busyLock.readLock().tryLock()) - return true; + GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), cctx.cacheId()); - if (log.isDebugEnabled()) - log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId()); + long preloadThrottle = cctx.config().getRebalanceThrottle(); - return false; - } + long maxBatchesCnt = 3;//Todo: param - /** - * @param nodeId Sender node ID. - * @param d Message. - */ - private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) { - if (!enterBusy()) - return; + ClusterNode node = cctx.discovery().node(id); + + boolean ack = false; + + T2<UUID, Object> scId = new T2<>(id, d.topic()); try { - if (cctx.rebalanceEnabled()) { - if (log.isDebugEnabled()) - log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']'); + SupplyContext sctx = scMap.remove(scId); - queue.offer(new DemandMessage(nodeId, d)); - } - else - U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d); - } - finally { - leaveBusy(); - } - } + if (doneMap.get(scId) != null)//Todo: refactor + return; - /** - * - */ - private void leaveBusy() { - busyLock.readLock().unlock(); - } + long bCnt = 0; - /** - * @param deque Deque to poll from. - * @param w Worker. - * @return Polled item. - * @throws InterruptedException If interrupted. - */ - @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException { - assert w != null; - - // There is currently a case where {@code interrupted} - // flag on a thread gets flipped during stop which causes the pool to hang. This check - // will always make sure that interrupted flag gets reset before going into wait conditions. - // The true fix should actually make sure that interrupted flag does not get reset or that - // interrupted exception gets propagated. Until we find a real fix, this method should - // always work to make sure that there is no hanging during stop. - if (w.isCancelled()) - Thread.currentThread().interrupt(); - - return deque.poll(2000, MILLISECONDS); - } + int phase = 0; - /** - * Supply work. - */ - private class SupplyWorker extends GridWorker { - /** Hide worker logger and use cache logger. */ - private IgniteLogger log = GridDhtPartitionSupplyPool.this.log; + if (sctx != null) + phase = sctx.phase; - /** - * Default constructor. - */ - private SupplyWorker() { - super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log); - } + Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator(); - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - DemandMessage msg = poll(queue, this); + while (sctx != null || partIt.hasNext()) { + int part = sctx != null ? sctx.part : partIt.next(); - if (msg == null) - continue; + GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - ClusterNode node = cctx.discovery().node(msg.senderId()); + if (loc == null || loc.state() != OWNING || !loc.reserve()) { + // Reply with partition of "-1" to let sender know that + // this node is no longer an owner. + s.missed(part); - if (node == null) { if (log.isDebugEnabled()) - log.debug("Received message from non-existing node (will ignore): " + msg); + log.debug("Requested partition is not owned by local node [part=" + part + + ", demander=" + id + ']'); continue; } - processMessage(msg, node); - } - } + GridCacheEntryInfoCollectSwapListener swapLsnr = null; - /** - * @param msg Message. - * @param node Demander. - */ - private void processMessage(DemandMessage msg, ClusterNode node) { - assert msg != null; - assert node != null; - - GridDhtPartitionDemandMessage d = msg.message(); - - GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId()); - - long preloadThrottle = cctx.config().getRebalanceThrottle(); - - boolean ack = false; + try { + if (phase == 0 && cctx.isSwapOrOffheapEnabled()) { + swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); - try { - for (int part : d.partitions()) { - GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - - if (loc == null || loc.state() != OWNING || !loc.reserve()) { - // Reply with partition of "-1" to let sender know that - // this node is no longer an owner. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Requested partition is not owned by local node [part=" + part + - ", demander=" + msg.senderId() + ']'); - - continue; + cctx.swap().addOffHeapListener(part, swapLsnr); + cctx.swap().addSwapListener(part, swapLsnr); } - GridCacheEntryInfoCollectSwapListener swapLsnr = null; - - try { - if (cctx.isSwapOrOffheapEnabled()) { - swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); + boolean partMissing = false; - cctx.swap().addOffHeapListener(part, swapLsnr); - cctx.swap().addSwapListener(part, swapLsnr); - } + if (phase == 0) + phase = 1; - boolean partMissing = false; + if (phase == 1) { + Iterator<GridDhtCacheEntry> entIt = sctx != null ? + (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator(); - for (GridCacheEntryEx e : loc.entries()) { + while (entIt.hasNext()) { if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { // Demander no longer needs this partition, so we send '-1' partition and move on. s.missed(part); if (log.isDebugEnabled()) log.debug("Demanding node does not need requested partition [part=" + part + - ", nodeId=" + msg.senderId() + ']'); + ", nodeId=" + id + ']'); partMissing = true; @@ -301,10 +215,21 @@ class GridDhtPartitionSupplyPool { if (preloadThrottle > 0) U.sleep(preloadThrottle); - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId()); + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr); + + swapLsnr = null; + + return; + } + else { + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), + cctx.cacheId()); + } } + GridCacheEntryEx e = entIt.next(); + GridCacheEntryInfo info = e.info(); if (info != null && !info.isNew()) { @@ -319,188 +244,228 @@ class GridDhtPartitionSupplyPool { if (partMissing) continue; - if (cctx.isSwapOrOffheapEnabled()) { - GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = - cctx.swap().iterator(part); + } - // Iterator may be null if space does not exist. - if (iter != null) { - try { - boolean prepared = false; + if (phase == 1) + phase = 2; - for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); + if (phase == 2 && cctx.isSwapOrOffheapEnabled()) { + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ? + (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt : + cctx.swap().iterator(part); - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + msg.senderId() + ']'); + // Iterator may be null if space does not exist. + if (iter != null) { + try { + boolean prepared = false; - partMissing = true; + while (iter.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); - break; // For. - } + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; + partMissing = true; - if (!reply(node, d, s)) - return; + break; // For. + } - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; - s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId()); - } + if (!reply(node, d, s)) + return; - GridCacheSwapEntry swapEntry = e.getValue(); + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); - GridCacheEntryInfo info = new GridCacheEntryInfo(); + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr); - info.keyBytes(e.getKey()); - info.ttl(swapEntry.ttl()); - info.expireTime(swapEntry.expireTime()); - info.version(swapEntry.version()); - info.value(swapEntry.value()); + swapLsnr = null; - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, info, cctx); + return; + } else { - if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not send " + - "cache entry): " + info); - - continue; + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), + cctx.cacheId()); } + } - // Need to manually prepare cache message. - if (depEnabled && !prepared) { - ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : - swapEntry.valueClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : - null; + Map.Entry<byte[], GridCacheSwapEntry> e = iter.next(); - if (ldr == null) - continue; + GridCacheSwapEntry swapEntry = e.getValue(); - if (ldr instanceof GridDeploymentInfo) { - s.prepare((GridDeploymentInfo)ldr); + GridCacheEntryInfo info = new GridCacheEntryInfo(); - prepared = true; - } - } - } + info.keyBytes(e.getKey()); + info.ttl(swapEntry.ttl()); + info.expireTime(swapEntry.expireTime()); + info.version(swapEntry.version()); + info.value(swapEntry.value()); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry0(part, info, cctx); + else { + if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not send " + + "cache entry): " + info); - if (partMissing) continue; + } + + // Need to manually prepare cache message. + if (depEnabled && !prepared) { + ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : + swapEntry.valueClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : + null; + + if (ldr == null) + continue; + + if (ldr instanceof GridDeploymentInfo) { + s.prepare((GridDeploymentInfo)ldr); + + prepared = true; + } + } } - finally { - iter.close(); - } + + if (partMissing) + continue; + } + finally { + iter.close(); } } + } - // Stop receiving promote notifications. - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } + if (swapLsnr == null && sctx != null) + swapLsnr = sctx.swapLsnr; - if (swapLsnr != null) { - Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); + // Stop receiving promote notifications. + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } - swapLsnr = null; + if (phase == 2) + phase = 3; - for (GridCacheEntryInfo info : entries) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); + if (phase == 3 && swapLsnr != null) { + Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + msg.senderId() + ']'); + swapLsnr = null; - // No need to continue iteration over swap entries. - break; - } + Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ? + (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator(); + + while (lsnrIt.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; + // No need to continue iteration over swap entries. + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; - if (!reply(node, d, s)) - return; + if (!reply(node, d, s)) + return; - s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr); + + return; + } + else { + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), cctx.cacheId()); } - - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx); - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + - info); } - } - // Mark as last supply message. - s.last(part); + GridCacheEntryInfo info = lsnrIt.next(); -// if (ack) { -// s.markAck(); -// -// break; // Partition for loop. -// } + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } } - finally { - loc.release(); - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } + // Mark as last supply message. + s.last(part); + + if (ack) { + s.markAck(); + + break; // Partition for loop. } } + finally { + loc.release(); - reply(node, d, s); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send partition supply message to node: " + node.id(), e); + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + } } + + reply(node, d, s); + + doneMap.put(scId, true); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partition supply message to node: " + id, e); } + } - /** - * @param n Node. - * @param d Demand message. - * @param s Supply message. - * @return {@code True} if message was sent, {@code false} if recipient left grid. - * @throws IgniteCheckedException If failed. - */ - private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) - throws IgniteCheckedException { - try { - if (log.isDebugEnabled()) - log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + /** + * @param n Node. + * @param d Demand message. + * @param s Supply message. + * @return {@code True} if message was sent, {@code false} if recipient left grid. + * @throws IgniteCheckedException If failed. + */ + private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) + throws IgniteCheckedException { + try { + if (log.isDebugEnabled()) + log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); - return true; - } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition supply message because node left grid: " + n.id()); + return true; + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition supply message because node left grid: " + n.id()); - return false; - } + return false; } } + /** * Demand message wrapper. */ @@ -542,4 +507,42 @@ class GridDhtPartitionSupplyPool { return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']'; } } + + + /** + * @param t T. + * @param phase Phase. + * @param partIt Partition it. + * @param entryIt Entry it. + * @param swapLsnr Swap listener. + */ + private void saveSupplyContext( + T2 t, + int phase, + Iterator<Integer> partIt, + int part, + Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){ + scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part)); + } + + private static class SupplyContext{ + private int phase; + + private Iterator<Integer> partIt; + + private Iterator<?> entryIt; + + private GridCacheEntryInfoCollectSwapListener swapLsnr; + + int part; + + public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt, + GridCacheEntryInfoCollectSwapListener swapLsnr, int part) { + this.phase = phase; + this.partIt = partIt; + this.entryIt = entryIt; + this.swapLsnr = swapLsnr; + this.part = part; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ed77e37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java index e90b7af..11ea8f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java @@ -20,12 +20,13 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.common.*; +import java.util.concurrent.atomic.*; + /** * */ @@ -38,6 +39,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest /** cache name. */ protected static String CACHE_NAME_DHT = "cache"; + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return Long.MAX_VALUE; } @@ -51,10 +53,14 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); + if (getTestGridName(3).equals(gridName)) + iCfg.setClientMode(true); + cacheCfg.setName(CACHE_NAME_DHT); cacheCfg.setCacheMode(CacheMode.PARTITIONED); - //cacheCfg.setRebalanceBatchSize(1024); + cacheCfg.setRebalanceBatchSize(100 * 1024); cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheCfg.setRebalanceThreadPoolSize(4); //cacheCfg.setRebalanceTimeout(1000000); cacheCfg.setBackups(1); @@ -63,11 +69,9 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest } /** - * @throws Exception + * @param ignite Ignite. */ - public void testMassiveRebalancing() throws Exception { - Ignite ignite = startGrid(0); - + private void generateData(Ignite ignite) { try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) { for (int i = 0; i < TEST_SIZE; i++) { if (i % 1_000_000 == 0) @@ -76,12 +80,34 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest stmr.addData(i, i); } } + } + + /** + * @param ignite Ignite. + * @throws IgniteCheckedException + */ + private void checkData(Ignite ignite) throws IgniteCheckedException { + for (int i = 0; i < TEST_SIZE; i++) { + if (i % 1_000_000 == 0) + log.info("Checked " + i / 1_000_000 + "m entries."); + + assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match"; + } + } + + /** + * @throws Exception + */ + public void testMassiveRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite); log.info("Preloading started."); long start = System.currentTimeMillis(); - // startGrid(1); + //startGrid(1); startGrid(2); @@ -89,19 +115,78 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest stopGrid(0); - // Thread.sleep(10000); + //Thread.sleep(20000); - // stopGrid(1); + //stopGrid(1); - for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Checked " + i / 1_000_000 + "m entries."); - - assert grid(2).cachex(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match"; - } + checkData(grid(2)); log.info("Spend " + spend + " seconds to preload entries."); stopAllGrids(); } + + /** + * @throws Exception + */ + public void testOpPerSecRebalancingTest() throws Exception { + startGrid(0); + + final AtomicBoolean cancelled = new AtomicBoolean(false); + + generateData(grid(0)); + + startGrid(1); + startGrid(2); + startGrid(3); + + Thread t = new Thread(new Runnable() { + @Override public void run() { + + long spend = 0; + + long ops = 0; + + while (!cancelled.get()) { + try { + long start = System.currentTimeMillis(); + + int size = 1000; + + for (int i = 0; i < size; i++) + grid(3).cachex(CACHE_NAME_DHT).remove(i); + + for (int i = 0; i < size; i++) + grid(3).cachex(CACHE_NAME_DHT).put(i, i); + + spend += System.currentTimeMillis() - start; + + ops += size * 2; + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + log.info("Ops. per ms: " + ops / spend); + } + } + }); + t.start(); + + stopGrid(0); + startGrid(0); + + stopGrid(0); + startGrid(0); + + stopGrid(0); + startGrid(0); + + cancelled.set(true); + t.join(); + + checkData(grid(3)); + + //stopAllGrids(); + } } \ No newline at end of file