Repository: incubator-ignite Updated Branches: refs/heads/ignite-1093 c92cd899a -> 50e188df2
ignite-1093 Non stop rebalancing Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4776feca Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4776feca Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4776feca Branch: refs/heads/ignite-1093 Commit: 4776fecaf059d11d4a4f3ff57634ebad9e41f451 Parents: c92cd89 Author: Anton Vinogradov <vinogradov.an...@gmail.com> Authored: Fri Aug 7 18:45:07 2015 +0300 Committer: Anton Vinogradov <vinogradov.an...@gmail.com> Committed: Fri Aug 7 18:45:07 2015 +0300 ---------------------------------------------------------------------- .../preloader/GridDhtPartitionDemandPool.java | 176 +++++++++---------- .../preloader/GridDhtPartitionSupplyPool.java | 80 ++++++--- .../GridCacheMassiveRebalancingSelfTest.java | 10 +- 3 files changed, 144 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 0e0bc01..11645e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -225,6 +225,14 @@ public class GridDhtPartitionDemandPool { } /** + * @param idx + * @return topic + */ + static Object topic(int idx, int cacheId, UUID nodeId) { + return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId + } + + /** * */ private void leaveBusy() { @@ -537,39 +545,58 @@ public class GridDhtPartitionDemandPool { if (isCancelled() || topologyChanged()) return missed; - for (int p : d.partitions()) { - cctx.io().addOrderedHandler(topic(p, topVer.topologyVersion()), new CI2<UUID, GridDhtPartitionSupplyMessage>() { - @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { - handleSupplyMessage(new SupplyMessage(nodeId, msg), node, topVer, top, remaining, - exchFut, missed, d); + int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count + + List<Set<Integer>> sParts = new ArrayList<>(threadCnt); + + int cnt = 0; + + while (cnt < threadCnt) { + sParts.add(new HashSet<Integer>()); + + final int idx = cnt; + + cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() { + @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) { + handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top, + exchFut, missed, d, remaining); } }); + + cnt++; } - try { - Iterator<Integer> it = remaining.keySet().iterator(); + Iterator<Integer> it = d.partitions().iterator(); - final int maxC = cctx.config().getRebalanceThreadPoolSize(); + cnt = 0; - int sent = 0; + while (it.hasNext()) { + sParts.get(cnt % threadCnt).add(it.next()); - while (sent < maxC && it.hasNext()) { - int p = it.next(); + cnt++; + } - boolean res = remaining.replace(p, false, true); + try { + cnt = 0; - assert res; + while (cnt < threadCnt) { // Create copy. - GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p)); + GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); - initD.topic(topic(p, topVer.topologyVersion())); + initD.topic(topic(cnt, cctx.cacheId(),node.id())); - // Send initial demand message. - cctx.io().sendOrderedMessage(node, - GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout()); + try { + if (logg && cctx.name().equals("cache")) + System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id()); - sent++; + cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partition demand message to local node", e); + } + + cnt++; } do { @@ -580,41 +607,41 @@ public class GridDhtPartitionDemandPool { return missed; } finally { - for (int p : d.partitions()) - cctx.io().removeOrderedHandler(topic(p, topVer.topologyVersion())); + cnt = 0; + + while (cnt < threadCnt) { + cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id())); + + cnt++; + } } } - /** - * @param p - * @param topVer - * @return topic - */ - private Object topic(int p, long topVer) { - return TOPIC_CACHE.topic("DemandPool" + topVer, cctx.cacheId(), p);//Todo topVer as long - } + boolean logg = false; /** * @param s Supply message. * @param node Node. * @param topVer Topology version. * @param top Topology. - * @param remaining Remaining. * @param exchFut Exchange future. * @param missed Missed. * @param d initial DemandMessage. */ private void handleSupplyMessage( + int idx, SupplyMessage s, ClusterNode node, AffinityTopologyVersion topVer, GridDhtPartitionTopology top, - ConcurrentHashMap8<Integer, Boolean> remaining, GridDhtPartitionsExchangeFuture exchFut, Set<Integer> missed, - GridDhtPartitionDemandMessage d) { + GridDhtPartitionDemandMessage d, + ConcurrentHashMap8 remaining) { + + if (logg && cctx.name().equals("cache")) + System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id()); - //Todo: check it still actual and remove // Check that message was received from expected node. if (!s.senderId().equals(node.id())) { U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() + @@ -639,10 +666,8 @@ public class GridDhtPartitionDemandPool { return; } - assert supply.infos().entrySet().size() == 1;//Todo: remove after supply message refactoring - // Preload. - for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {//todo:only one partition (supply refactoring) + for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { int p = e.getKey(); if (cctx.affinity().localNode(p, topVer)) { @@ -685,12 +710,17 @@ public class GridDhtPartitionDemandPool { } } - boolean last = supply.last().contains(p);//Todo: refactor as boolean "last" + boolean last = supply.last().contains(p); // If message was last for this partition, // then we take ownership. if (last) { - top.own(part); + top.own(part);//todo: close future? + +// if (logg && cctx.name().equals("cache")) +// System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id()); + + remaining.remove(p); if (log.isDebugEnabled()) log.debug("Finished rebalancing partition: " + part); @@ -698,29 +728,6 @@ public class GridDhtPartitionDemandPool { if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchFut.discoveryEvent()); - - remaining.remove(p); - - demandNextPartition(node, remaining, d, topVer); - } - else { - try { - // Create copy. - GridDhtPartitionDemandMessage nextD = - new GridDhtPartitionDemandMessage(d, Collections.singleton(p)); - - nextD.topic(topic(p, topVer.topologyVersion())); - - // Send demand message. - cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), - nextD, cctx.ioPolicy(), d.timeout()); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to receive partitions from node (rebalancing will not " + - "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex); - - cancel(); - } } } finally { @@ -743,48 +750,35 @@ public class GridDhtPartitionDemandPool { } } - for (Integer miss : s.supply().missed()) // Todo: miss as param, not collection + for (Integer miss : s.supply().missed()) remaining.remove(miss); // Only request partitions based on latest topology version. for (Integer miss : s.supply().missed()) if (cctx.affinity().localNode(miss, topVer)) missed.add(miss); - } - /** - * @param node Node. - * @param remaining Remaining. - * @param d initial DemandMessage. - * @param topVer Topology version. - */ - private void demandNextPartition( - final ClusterNode node, - final ConcurrentHashMap8<Integer, Boolean> remaining, - final GridDhtPartitionDemandMessage d, - final AffinityTopologyVersion topVer - ) { - try { - for (Integer p : remaining.keySet()) { - if (remaining.replace(p, false, true)) { - // Create copy. - GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p)); + if (!remaining.isEmpty()) { + try { + // Create copy. + GridDhtPartitionDemandMessage nextD = + new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet()); - nextD.topic(topic(p, topVer.topologyVersion())); + nextD.topic(topic(idx, cctx.cacheId(), node.id())); - // Send demand message. - cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), - nextD, cctx.ioPolicy(), d.timeout()); + // Send demand message. + cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(idx, cctx.cacheId()), + nextD, cctx.ioPolicy(), d.timeout()); - break; - } + if (logg && cctx.name().equals("cache")) + System.out.println("D " + idx + " ack " + cctx.localNode().id()); } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to receive partitions from node (rebalancing will not " + - "fully finish) [node=" + node.id() + ", msg=" + d + ']', e); + catch (IgniteCheckedException ex) { + U.error(log, "Failed to receive partitions from node (rebalancing will not " + + "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex); - cancel(); + cancel(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index f10837a..c1c9941 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -76,25 +76,32 @@ class GridDhtPartitionSupplyPool { top = cctx.dht().topology(); + int cnt = 0; + if (!cctx.kernalContext().clientNode()) { - for (int p = 0; p <= cctx.affinity().partitions(); p++) - cctx.io().addOrderedHandler(topic(p, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() { + while (cnt < cctx.config().getRebalanceThreadPoolSize()) { + final int idx = cnt; + + cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() { @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { - processMessage(m, id); + processMessage(m, id, idx); } }); + + cnt++; + } } depEnabled = cctx.gridDeploy().enabled(); } /** - * @param p Partition. + * @param idx Index. * @param id Node id. * @return topic */ - static Object topic(int p, int id) { - return TOPIC_CACHE.topic("SupplyPool", id, p); + static Object topic(int idx, int id) { + return TOPIC_CACHE.topic("SupplyPool", idx, id); } /** @@ -119,44 +126,65 @@ class GridDhtPartitionSupplyPool { this.preloadPred = preloadPred; } + boolean logg = false; + /** * @param d Demand message. * @param id Node uuid. */ - private void processMessage(GridDhtPartitionDemandMessage d, UUID id) { + private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) { assert d != null; assert id != null; + if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) + return; + + if (logg && cctx.name().equals("cache")) + System.out.println("S " + idx + " process message " + cctx.localNode().id()); + GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), cctx.cacheId()); long preloadThrottle = cctx.config().getRebalanceThrottle(); - long maxBatchesCnt = 3;//Todo: param - ClusterNode node = cctx.discovery().node(id); - boolean ack = false; - T2<UUID, Object> scId = new T2<>(id, d.topic()); try { SupplyContext sctx = scMap.remove(scId); - if (doneMap.get(scId) != null)//Todo: refactor + if (!d.partitions().isEmpty()) {//Only first request contains partitions. + doneMap.remove(scId); + } + + if (doneMap.get(scId) != null) { + if (logg && cctx.name().equals("cache")) + System.out.println("S " + idx + " exit " + cctx.localNode().id()); + return; + } long bCnt = 0; int phase = 0; - if (sctx != null) + boolean newReq = true; + + long maxBatchesCnt = 3;//Todo: param + + if (sctx != null) { phase = sctx.phase; + maxBatchesCnt = 1; + } + Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator(); - while (sctx != null || partIt.hasNext()) { - int part = sctx != null ? sctx.part : partIt.next(); + while ((sctx != null && newReq) || partIt.hasNext()) { + int part = sctx != null && newReq ? sctx.part : partIt.next(); + + newReq = false; GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); @@ -206,8 +234,6 @@ class GridDhtPartitionSupplyPool { } if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - if (!reply(node, d, s)) return; @@ -223,6 +249,9 @@ class GridDhtPartitionSupplyPool { return; } else { + if (logg && cctx.name().equals("cache")) + System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id()); + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), cctx.cacheId()); } @@ -275,8 +304,6 @@ class GridDhtPartitionSupplyPool { } if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - if (!reply(node, d, s)) return; @@ -382,8 +409,6 @@ class GridDhtPartitionSupplyPool { } if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - if (!reply(node, d, s)) return; @@ -415,11 +440,12 @@ class GridDhtPartitionSupplyPool { // Mark as last supply message. s.last(part); - if (ack) { - s.markAck(); +// if (logg && cctx.name().equals("cache")) +// System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id()); - break; // Partition for loop. - } + phase = 0; + + sctx = null; } finally { loc.release(); @@ -442,13 +468,15 @@ class GridDhtPartitionSupplyPool { /** * @param n Node. - * @param d Demand message. * @param s Supply message. * @return {@code True} if message was sent, {@code false} if recipient left grid. * @throws IgniteCheckedException If failed. */ private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) throws IgniteCheckedException { + if (logg && cctx.name().equals("cache")) + System.out.println("S sent "+ cctx.localNode().id()); + try { if (log.isDebugEnabled()) log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java index 11ea8f6..4992d19 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java @@ -34,7 +34,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - private static int TEST_SIZE = 1_024_000; + private static int TEST_SIZE = 10_024_000; /** cache name. */ protected static String CACHE_NAME_DHT = "cache"; @@ -58,7 +58,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest cacheCfg.setName(CACHE_NAME_DHT); cacheCfg.setCacheMode(CacheMode.PARTITIONED); - cacheCfg.setRebalanceBatchSize(100 * 1024); + //cacheCfg.setRebalanceBatchSize(1024); cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); cacheCfg.setRebalanceThreadPoolSize(4); //cacheCfg.setRebalanceTimeout(1000000); @@ -107,7 +107,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest long start = System.currentTimeMillis(); - //startGrid(1); + startGrid(1); startGrid(2); @@ -115,9 +115,9 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest stopGrid(0); - //Thread.sleep(20000); + Thread.sleep(20000); - //stopGrid(1); + stopGrid(1); checkData(grid(2));