Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1093 c92cd899a -> 50e188df2


ignite-1093 Non stop rebalancing


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4776feca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4776feca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4776feca

Branch: refs/heads/ignite-1093
Commit: 4776fecaf059d11d4a4f3ff57634ebad9e41f451
Parents: c92cd89
Author: Anton Vinogradov <vinogradov.an...@gmail.com>
Authored: Fri Aug 7 18:45:07 2015 +0300
Committer: Anton Vinogradov <vinogradov.an...@gmail.com>
Committed: Fri Aug 7 18:45:07 2015 +0300

----------------------------------------------------------------------
 .../preloader/GridDhtPartitionDemandPool.java   | 176 +++++++++----------
 .../preloader/GridDhtPartitionSupplyPool.java   |  80 ++++++---
 .../GridCacheMassiveRebalancingSelfTest.java    |  10 +-
 3 files changed, 144 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 0e0bc01..11645e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -225,6 +225,14 @@ public class GridDhtPartitionDemandPool {
     }
 
     /**
+     * @param idx
+     * @return topic
+     */
+    static Object topic(int idx, int cacheId, UUID nodeId) {
+        return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: 
remove nodeId
+    }
+
+    /**
      *
      */
     private void leaveBusy() {
@@ -537,39 +545,58 @@ public class GridDhtPartitionDemandPool {
             if (isCancelled() || topologyChanged())
                 return missed;
 
-            for (int p : d.partitions()) {
-                cctx.io().addOrderedHandler(topic(p, 
topVer.topologyVersion()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                    @Override public void apply(UUID nodeId, 
GridDhtPartitionSupplyMessage msg) {
-                        handleSupplyMessage(new SupplyMessage(nodeId, msg), 
node, topVer, top, remaining,
-                            exchFut, missed, d);
+            int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo 
= getRebalanceThreadPoolSize / assigns.count
+
+            List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
+
+            int cnt = 0;
+
+            while (cnt < threadCnt) {
+                sParts.add(new HashSet<Integer>());
+
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), 
node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                    @Override public void apply(UUID id, 
GridDhtPartitionSupplyMessage m) {
+                        handleSupplyMessage(idx, new SupplyMessage(id, m), 
node, topVer, top,
+                            exchFut, missed, d, remaining);
                     }
                 });
+
+                cnt++;
             }
 
-            try {
-                Iterator<Integer> it = remaining.keySet().iterator();
+            Iterator<Integer> it = d.partitions().iterator();
 
-                final int maxC = cctx.config().getRebalanceThreadPoolSize();
+            cnt = 0;
 
-                int sent = 0;
+            while (it.hasNext()) {
+                sParts.get(cnt % threadCnt).add(it.next());
 
-                while (sent < maxC && it.hasNext()) {
-                    int p = it.next();
+                cnt++;
+            }
 
-                    boolean res = remaining.replace(p, false, true);
+            try {
+                cnt = 0;
 
-                    assert res;
+                while (cnt < threadCnt) {
 
                     // Create copy.
-                    GridDhtPartitionDemandMessage initD = new 
GridDhtPartitionDemandMessage(d, Collections.singleton(p));
+                    GridDhtPartitionDemandMessage initD = new 
GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
-                    initD.topic(topic(p, topVer.topologyVersion()));
+                    initD.topic(topic(cnt, cctx.cacheId(),node.id()));
 
-                    // Send initial demand message.
-                    cctx.io().sendOrderedMessage(node,
-                        GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), 
initD, cctx.ioPolicy(), d.timeout());
+                    try {
+                        if (logg && cctx.name().equals("cache"))
+                        System.out.println("D "+cnt + " initial Demand "+" 
"+cctx.localNode().id());
 
-                    sent++;
+                        cctx.io().sendOrderedMessage(node, 
GridDhtPartitionSupplyPool.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), 
d.timeout());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send partition demand message 
to local node", e);
+                    }
+
+                    cnt++;
                 }
 
                 do {
@@ -580,41 +607,41 @@ public class GridDhtPartitionDemandPool {
                 return missed;
             }
             finally {
-                for (int p : d.partitions())
-                    cctx.io().removeOrderedHandler(topic(p, 
topVer.topologyVersion()));
+                cnt = 0;
+
+                while (cnt < threadCnt) {
+                    cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), 
node.id()));
+
+                    cnt++;
+                }
             }
         }
 
-        /**
-         * @param p
-         * @param topVer
-         * @return topic
-         */
-        private Object topic(int p, long topVer) {
-            return TOPIC_CACHE.topic("DemandPool" + topVer, cctx.cacheId(), 
p);//Todo topVer as long
-        }
+        boolean logg = false;
 
         /**
          * @param s Supply message.
          * @param node Node.
          * @param topVer Topology version.
          * @param top Topology.
-         * @param remaining Remaining.
          * @param exchFut Exchange future.
          * @param missed Missed.
          * @param d initial DemandMessage.
          */
         private void handleSupplyMessage(
+            int idx,
             SupplyMessage s,
             ClusterNode node,
             AffinityTopologyVersion topVer,
             GridDhtPartitionTopology top,
-            ConcurrentHashMap8<Integer, Boolean> remaining,
             GridDhtPartitionsExchangeFuture exchFut,
             Set<Integer> missed,
-            GridDhtPartitionDemandMessage d) {
+            GridDhtPartitionDemandMessage d,
+            ConcurrentHashMap8 remaining) {
+
+            if (logg && cctx.name().equals("cache"))
+            System.out.println("D "+idx + " handled supply message "+ 
cctx.localNode().id());
 
-            //Todo: check it still actual and remove
             // Check that message was received from expected node.
             if (!s.senderId().equals(node.id())) {
                 U.warn(log, "Received supply message from unexpected node 
[expectedId=" + node.id() +
@@ -639,10 +666,8 @@ public class GridDhtPartitionDemandPool {
                 return;
             }
 
-            assert supply.infos().entrySet().size() == 1;//Todo: remove after 
supply message refactoring
-
             // Preload.
-            for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supply.infos().entrySet()) {//todo:only one partition (supply refactoring)
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supply.infos().entrySet()) {
                 int p = e.getKey();
 
                 if (cctx.affinity().localNode(p, topVer)) {
@@ -685,12 +710,17 @@ public class GridDhtPartitionDemandPool {
                                 }
                             }
 
-                            boolean last = supply.last().contains(p);//Todo: 
refactor as boolean "last"
+                            boolean last = supply.last().contains(p);
 
                             // If message was last for this partition,
                             // then we take ownership.
                             if (last) {
-                                top.own(part);
+                                top.own(part);//todo: close future?
+
+//                                if (logg && cctx.name().equals("cache"))
+//                                    System.out.println("D "+idx + " last "+ 
p +" "+ cctx.localNode().id());
+
+                                remaining.remove(p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Finished rebalancing partition: 
" + part);
@@ -698,29 +728,6 @@ public class GridDhtPartitionDemandPool {
                                 if 
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
                                     preloadEvent(p, 
EVT_CACHE_REBALANCE_PART_LOADED,
                                         exchFut.discoveryEvent());
-
-                                remaining.remove(p);
-
-                                demandNextPartition(node, remaining, d, 
topVer);
-                            }
-                            else {
-                                try {
-                                    // Create copy.
-                                    GridDhtPartitionDemandMessage nextD =
-                                        new GridDhtPartitionDemandMessage(d, 
Collections.singleton(p));
-
-                                    nextD.topic(topic(p, 
topVer.topologyVersion()));
-
-                                    // Send demand message.
-                                    cctx.io().sendOrderedMessage(node, 
GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
-                                        nextD, cctx.ioPolicy(), d.timeout());
-                                   }
-                                catch (IgniteCheckedException ex) {
-                                    U.error(log, "Failed to receive partitions 
from node (rebalancing will not " +
-                                        "fully finish) [node=" + node.id() + 
", msg=" + d + ']', ex);
-
-                                    cancel();
-                                }
                             }
                         }
                         finally {
@@ -743,48 +750,35 @@ public class GridDhtPartitionDemandPool {
                 }
             }
 
-            for (Integer miss : s.supply().missed()) // Todo: miss as param, 
not collection
+            for (Integer miss : s.supply().missed())
                 remaining.remove(miss);
 
             // Only request partitions based on latest topology version.
             for (Integer miss : s.supply().missed())
                 if (cctx.affinity().localNode(miss, topVer))
                     missed.add(miss);
-        }
 
-        /**
-         * @param node Node.
-         * @param remaining Remaining.
-         * @param d initial DemandMessage.
-         * @param topVer Topology version.
-         */
-        private void demandNextPartition(
-            final ClusterNode node,
-            final ConcurrentHashMap8<Integer, Boolean> remaining,
-            final GridDhtPartitionDemandMessage d,
-            final AffinityTopologyVersion topVer
-        ) {
-            try {
-                for (Integer p : remaining.keySet()) {
-                    if (remaining.replace(p, false, true)) {
-                        // Create copy.
-                        GridDhtPartitionDemandMessage nextD = new 
GridDhtPartitionDemandMessage(d, Collections.singleton(p));
+            if (!remaining.isEmpty()) {
+                try {
+                    // Create copy.
+                    GridDhtPartitionDemandMessage nextD =
+                        new GridDhtPartitionDemandMessage(d, 
Collections.<Integer>emptySet());
 
-                        nextD.topic(topic(p, topVer.topologyVersion()));
+                    nextD.topic(topic(idx, cctx.cacheId(), node.id()));
 
-                        // Send demand message.
-                        cctx.io().sendOrderedMessage(node, 
GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
-                            nextD, cctx.ioPolicy(), d.timeout());
+                    // Send demand message.
+                    cctx.io().sendOrderedMessage(node, 
GridDhtPartitionSupplyPool.topic(idx, cctx.cacheId()),
+                        nextD, cctx.ioPolicy(), d.timeout());
 
-                        break;
-                    }
+                    if (logg && cctx.name().equals("cache"))
+                        System.out.println("D " + idx + " ack  " + 
cctx.localNode().id());
                 }
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to receive partitions from node 
(rebalancing will not " +
-                    "fully finish) [node=" + node.id() + ", msg=" + d + ']', 
e);
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Failed to receive partitions from node 
(rebalancing will not " +
+                        "fully finish) [node=" + node.id() + ", msg=" + d + 
']', ex);
 
-                cancel();
+                    cancel();
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index f10837a..c1c9941 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -76,25 +76,32 @@ class GridDhtPartitionSupplyPool {
 
         top = cctx.dht().topology();
 
+        int cnt = 0;
+
         if (!cctx.kernalContext().clientNode()) {
-            for (int p = 0; p <= cctx.affinity().partitions(); p++)
-                cctx.io().addOrderedHandler(topic(p, cctx.cacheId()), new 
CI2<UUID, GridDhtPartitionDemandMessage>() {
+            while (cnt < cctx.config().getRebalanceThreadPoolSize()) {
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new 
CI2<UUID, GridDhtPartitionDemandMessage>() {
                     @Override public void apply(UUID id, 
GridDhtPartitionDemandMessage m) {
-                        processMessage(m, id);
+                        processMessage(m, id, idx);
                     }
                 });
+
+                cnt++;
+            }
         }
 
         depEnabled = cctx.gridDeploy().enabled();
     }
 
     /**
-     * @param p Partition.
+     * @param idx Index.
      * @param id Node id.
      * @return topic
      */
-    static Object topic(int p, int id) {
-        return TOPIC_CACHE.topic("SupplyPool", id, p);
+    static Object topic(int idx, int id) {
+        return TOPIC_CACHE.topic("SupplyPool", idx, id);
     }
 
     /**
@@ -119,44 +126,65 @@ class GridDhtPartitionSupplyPool {
         this.preloadPred = preloadPred;
     }
 
+    boolean logg = false;
+
     /**
      * @param d Demand message.
      * @param id Node uuid.
      */
-    private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
+    private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int 
idx) {
         assert d != null;
         assert id != null;
 
+        if 
(!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+            return;
+
+        if (logg && cctx.name().equals("cache"))
+            System.out.println("S " + idx + " process message " + 
cctx.localNode().id());
+
         GridDhtPartitionSupplyMessage s = new 
GridDhtPartitionSupplyMessage(d.workerId(),
             d.updateSequence(), cctx.cacheId());
 
         long preloadThrottle = cctx.config().getRebalanceThrottle();
 
-        long maxBatchesCnt = 3;//Todo: param
-
         ClusterNode node = cctx.discovery().node(id);
 
-        boolean ack = false;
-
         T2<UUID, Object> scId = new T2<>(id, d.topic());
 
         try {
             SupplyContext sctx = scMap.remove(scId);
 
-            if (doneMap.get(scId) != null)//Todo: refactor
+            if (!d.partitions().isEmpty()) {//Only first request contains 
partitions.
+                doneMap.remove(scId);
+            }
+
+            if (doneMap.get(scId) != null) {
+                if (logg && cctx.name().equals("cache"))
+                    System.out.println("S " + idx + " exit " + 
cctx.localNode().id());
+
                 return;
+            }
 
             long bCnt = 0;
 
             int phase = 0;
 
-            if (sctx != null)
+            boolean newReq = true;
+
+            long maxBatchesCnt = 3;//Todo: param
+
+            if (sctx != null) {
                 phase = sctx.phase;
 
+                maxBatchesCnt = 1;
+            }
+
             Iterator<Integer> partIt = sctx != null ? sctx.partIt : 
d.partitions().iterator();
 
-            while (sctx != null || partIt.hasNext()) {
-                int part = sctx != null ? sctx.part : partIt.next();
+            while ((sctx != null && newReq) || partIt.hasNext()) {
+                int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+                newReq = false;
 
                 GridDhtLocalPartition loc = top.localPartition(part, 
d.topologyVersion(), false);
 
@@ -206,8 +234,6 @@ class GridDhtPartitionSupplyPool {
                             }
 
                             if (s.messageSize() >= 
cctx.config().getRebalanceBatchSize()) {
-                                ack = true;
-
                                 if (!reply(node, d, s))
                                     return;
 
@@ -223,6 +249,9 @@ class GridDhtPartitionSupplyPool {
                                     return;
                                 }
                                 else {
+                                    if (logg && cctx.name().equals("cache"))
+                                        System.out.println("S " + idx + " 
renew " + part + " " + cctx.localNode().id());
+
                                     s = new 
GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
                                         cctx.cacheId());
                                 }
@@ -275,8 +304,6 @@ class GridDhtPartitionSupplyPool {
                                     }
 
                                     if (s.messageSize() >= 
cctx.config().getRebalanceBatchSize()) {
-                                        ack = true;
-
                                         if (!reply(node, d, s))
                                             return;
 
@@ -382,8 +409,6 @@ class GridDhtPartitionSupplyPool {
                             }
 
                             if (s.messageSize() >= 
cctx.config().getRebalanceBatchSize()) {
-                                ack = true;
-
                                 if (!reply(node, d, s))
                                     return;
 
@@ -415,11 +440,12 @@ class GridDhtPartitionSupplyPool {
                     // Mark as last supply message.
                     s.last(part);
 
-                    if (ack) {
-                        s.markAck();
+//                    if (logg && cctx.name().equals("cache"))
+//                        System.out.println("S " + idx + " last " + part + " 
" + cctx.localNode().id());
 
-                        break; // Partition for loop.
-                    }
+                    phase = 0;
+
+                    sctx = null;
                 }
                 finally {
                     loc.release();
@@ -442,13 +468,15 @@ class GridDhtPartitionSupplyPool {
 
     /**
      * @param n Node.
-     * @param d Demand message.
      * @param s Supply message.
      * @return {@code True} if message was sent, {@code false} if recipient 
left grid.
      * @throws IgniteCheckedException If failed.
      */
     private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, 
GridDhtPartitionSupplyMessage s)
         throws IgniteCheckedException {
+        if (logg && cctx.name().equals("cache"))
+            System.out.println("S sent "+ cctx.localNode().id());
+
         try {
             if (log.isDebugEnabled())
                 log.debug("Replying to partition demand [node=" + n.id() + ", 
demand=" + d + ", supply=" + s + ']');

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index 11ea8f6..4992d19 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -34,7 +34,7 @@ public class GridCacheMassiveRebalancingSelfTest extends 
GridCommonAbstractTest
     /** */
     private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
 
-    private static int TEST_SIZE = 1_024_000;
+    private static int TEST_SIZE = 10_024_000;
 
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";
@@ -58,7 +58,7 @@ public class GridCacheMassiveRebalancingSelfTest extends 
GridCommonAbstractTest
 
         cacheCfg.setName(CACHE_NAME_DHT);
         cacheCfg.setCacheMode(CacheMode.PARTITIONED);
-        cacheCfg.setRebalanceBatchSize(100 * 1024);
+        //cacheCfg.setRebalanceBatchSize(1024);
         cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         cacheCfg.setRebalanceThreadPoolSize(4);
         //cacheCfg.setRebalanceTimeout(1000000);
@@ -107,7 +107,7 @@ public class GridCacheMassiveRebalancingSelfTest extends 
GridCommonAbstractTest
 
         long start = System.currentTimeMillis();
 
-        //startGrid(1);
+        startGrid(1);
 
         startGrid(2);
 
@@ -115,9 +115,9 @@ public class GridCacheMassiveRebalancingSelfTest extends 
GridCommonAbstractTest
 
         stopGrid(0);
 
-        //Thread.sleep(20000);
+        Thread.sleep(20000);
 
-        //stopGrid(1);
+        stopGrid(1);
 
         checkData(grid(2));
 

Reply via email to