ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8f36482b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8f36482b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8f36482b Branch: refs/heads/ignite-1093 Commit: 8f36482b33d05d20a065d2b3684d82ab7559b902 Parents: 50e188d Author: Anton Vinogradov <vinogradov.an...@gmail.com> Authored: Wed Aug 12 12:42:48 2015 +0300 Committer: Anton Vinogradov <vinogradov.an...@gmail.com> Committed: Wed Aug 12 12:42:48 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 168 ++++++++----------- .../dht/preloader/GridDhtPreloader.java | 19 +-- .../GridCacheMassiveRebalancingSelfTest.java | 19 ++- 3 files changed, 90 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f36482b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index f6a33c3..6c95707 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; @@ -305,6 +306,22 @@ public class GridDhtPartitionDemander { ClusterNode node = e.getKey(); + final long start = U.currentTimeMillis(); + + final CacheConfiguration cfg = cctx.config(); + + if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) { + U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + + ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + "]"); + + syncFut.listen(new CI1<Object>() { + @Override public void apply(Object t) { + U.log(log, "Completed rebalancing [cache=" + cctx.name() + ", mode=" + + cfg.getRebalanceMode() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); + } + }); + } + GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>(); remainings.addAll(d.partitions()); @@ -342,50 +359,6 @@ public class GridDhtPartitionDemander { } } } - - if (log.isInfoEnabled() && !d.partitions().isEmpty()) { - LinkedList<Integer> s = new LinkedList<>(d.partitions()); - - Collections.sort(s); - - StringBuilder sb = new StringBuilder(); - - int start = -1; - - int prev = -1; - - Iterator<Integer> sit = s.iterator(); - - while (sit.hasNext()) { - int p = sit.next(); - if (start == -1) { - start = p; - prev = p; - } - - if (prev < p - 1) { - sb.append(start); - - if (start != prev) - sb.append("-").append(prev); - - sb.append(", "); - - start = p; - } - - if (!sit.hasNext()) { - sb.append(start); - - if (start != p) - sb.append("-").append(p); - } - - prev = p; - } - - log.info("Requested rebalancing [from node=" + node.id() + ", partitions=" + s.size() + " (" + sb.toString() + ")]"); - } } } else if (delay > 0) { @@ -659,82 +632,83 @@ public class GridDhtPartitionDemander { void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { lastExchangeFut = lastFut; } -/** - * - */ -private class SyncFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 1L; - - private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>(); - private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>(); - - public void append(UUID nodeId, Collection<Integer> parts) { - remaining.put(nodeId, parts); - - missed.put(nodeId, new GridConcurrentHashSet<Integer>()); - } + /** + * + */ + private class SyncFuture extends GridFutureAdapter<Object> { + /** */ + private static final long serialVersionUID = 1L; - void cancel(UUID nodeId) { - if (isDone()) - return; + private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>(); - remaining.remove(nodeId); + private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>(); - checkIsDone(); - } + public void append(UUID nodeId, Collection<Integer> parts) { + remaining.put(nodeId, parts); - void onMissedPartition(UUID nodeId, int p) { - if (missed.get(nodeId) == null) missed.put(nodeId, new GridConcurrentHashSet<Integer>()); + } - missed.get(nodeId).add(p); - } - - void onPartitionDone(UUID nodeId, int p) { - if (isDone()) - return; + void cancel(UUID nodeId) { + if (isDone()) + return; - Collection<Integer> parts = remaining.get(nodeId); + remaining.remove(nodeId); - parts.remove(p); + checkIsDone(); + } - if (parts.isEmpty()) { - remaining.remove(nodeId); + void onMissedPartition(UUID nodeId, int p) { + if (missed.get(nodeId) == null) + missed.put(nodeId, new GridConcurrentHashSet<Integer>()); - if (log.isDebugEnabled()) - log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']'); + missed.get(nodeId).add(p); } - checkIsDone(); - } + void onPartitionDone(UUID nodeId, int p) { + if (isDone()) + return; - private void checkIsDone() { - if (remaining.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Completed sync future."); + Collection<Integer> parts = remaining.get(nodeId); + + parts.remove(p); - Collection<Integer> m = new HashSet<>(); + if (parts.isEmpty()) { + remaining.remove(nodeId); - for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) { - if (e.getValue() != null && !e.getValue().isEmpty()) - m.addAll(e.getValue()); + if (log.isDebugEnabled()) + log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']'); } - if (!m.isEmpty()) { + checkIsDone(); + } + + private void checkIsDone() { + if (remaining.isEmpty()) { if (log.isDebugEnabled()) - log.debug("Reassigning partitions that were missed: " + m); + log.debug("Completed sync future."); - cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture()); - } + Collection<Integer> m = new HashSet<>(); - missed.clear(); + for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) { + if (e.getValue() != null && !e.getValue().isEmpty()) + m.addAll(e.getValue()); + } + + if (!m.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Reassigning partitions that were missed: " + m); + + cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture()); + } + + missed.clear(); - cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary? + cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary? - onDone(); + onDone(); + } } } } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f36482b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index d994a19..7f99ebf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; @@ -220,24 +219,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void onInitialExchangeComplete(@Nullable Throwable err) { - if (err == null) { + if (err == null) startFut.onDone(); - - final long start = U.currentTimeMillis(); - - final CacheConfiguration cfg = cctx.config(); - - if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) { - U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name()); - - demander.syncFuture().listen(new CI1<Object>() { - @Override public void apply(Object t) { - U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " + - "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); - } - }); - } - } else startFut.onDone(err); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f36482b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java index 5148753..ca95905 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -113,12 +115,27 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest long spend = (System.currentTimeMillis() - start) / 1000; + IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + stopGrid(0); - Thread.sleep(20000); + while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() || + f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture()) + U.sleep(100); + + ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); + ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); + + f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); stopGrid(1); + while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture()) + U.sleep(100); + + ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); + checkData(grid(2)); log.info("Spend " + spend + " seconds to preload entries.");