ignite-1093

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/db72f531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/db72f531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/db72f531

Branch: refs/heads/ignite-1093
Commit: db72f531342231cf45f49e395056c12cce3a79e5
Parents: d0b7d9f
Author: Anton Vinogradov <vinogradov.an...@gmail.com>
Authored: Tue Aug 11 18:58:05 2015 +0300
Committer: Anton Vinogradov <vinogradov.an...@gmail.com>
Committed: Tue Aug 11 18:58:05 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 135 +------------------
 .../dht/preloader/GridDhtPreloader.java         | 123 ++++++++++++++++-
 2 files changed, 122 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db72f531/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e177dae..fca9f53 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -65,9 +65,6 @@ public class GridDhtPartitionDemander {
     @GridToStringInclude
     private volatile SyncFuture syncFut;
 
-    /** Demand lock. */
-    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
-
     /** Last timeout object. */
     private AtomicReference<GridTimeoutObject> lastTimeoutObj = new 
AtomicReference<>();
 
@@ -227,44 +224,10 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Picked owners.
-     */
-    private Collection<ClusterNode> pickedOwners(int p, 
AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
-        int affCnt = affNodes.size();
-
-        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
-
-        int rmtCnt = rmts.size();
-
-        if (rmtCnt <= affCnt)
-            return rmts;
-
-        List<ClusterNode> sorted = new ArrayList<>(rmts);
-
-        // Sort in descending order, so nodes with higher order will be first.
-        Collections.sort(sorted, CU.nodeComparator(false));
-
-        // Pick newest nodes.
-        return sorted.subList(0, affCnt);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Nodes owning this partition.
-     */
-    private Collection<ClusterNode> remoteOwners(int p, 
AffinityTopologyVersion topVer) {
-        return F.view(cctx.dht().topology().owners(p, topVer), 
F.remoteNodes(cctx.nodeId()));
-    }
-
-    /**
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
      */
+
     void addAssignments(final GridDhtPreloaderAssignments assigns, boolean 
force) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
@@ -408,20 +371,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     *
-     */
-    void unwindUndeploys() {
-        demandLock.writeLock().lock();
-
-        try {
-            cctx.deploy().unwind(cctx);
-        }
-        finally {
-            demandLock.writeLock().unlock();
-        }
-    }
-
-    /**
      * @param idx Index.
      * @param id Node id.
      * @param supply Supply.
@@ -666,88 +615,6 @@ public class GridDhtPartitionDemander {
     void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
         lastExchangeFut = lastFut;
     }
-
-    /**
-     * @param exchFut Exchange future.
-     * @return Assignments of partitions to nodes.
-     */
-    GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture 
exchFut) {
-        // No assignments for disabled preloader.
-        GridDhtPartitionTopology top = cctx.dht().topology();
-
-        if (!cctx.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments(exchFut, 
top.topologyVersion());
-
-        int partCnt = cctx.affinity().partitions();
-
-        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
-            
exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
-            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-                ", topVer=" + top.topologyVersion() + ']';
-
-        GridDhtPreloaderAssignments assigns = new 
GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
-        AffinityTopologyVersion topVer = assigns.topologyVersion();
-
-        for (int p = 0; p < partCnt; p++) {
-            if (cctx.shared().exchange().hasPendingExchange()) {
-                if (log.isDebugEnabled())
-                    log.debug("Skipping assignments creation, exchange worker 
has pending assignments: " +
-                        exchFut.exchangeId());
-
-                break;
-            }
-
-            // If partition belongs to local node.
-            if (cctx.affinity().localNode(p, topVer)) {
-                GridDhtLocalPartition part = top.localPartition(p, topVer, 
true);
-
-                assert part != null;
-                assert part.id() == p;
-
-                if (part.state() != MOVING) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping partition assignment (state is not 
MOVING): " + part);
-
-                    continue; // For.
-                }
-
-                Collection<ClusterNode> picked = pickedOwners(p, topVer);
-
-                if (picked.isEmpty()) {
-                    top.own(part);
-
-                    if 
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
-                        cctx.events().addPreloadEvent(p,
-                            EVT_CACHE_REBALANCE_PART_DATA_LOST, 
discoEvt.eventNode(),
-                            discoEvt.type(), discoEvt.timestamp());
-                    }
-
-                    if (log.isDebugEnabled())
-                        log.debug("Owning partition as there are no other 
owners: " + part);
-                }
-                else {
-                    ClusterNode n = F.first(picked);
-
-                    GridDhtPartitionDemandMessage msg = assigns.get(n);
-
-                    if (msg == null) {
-                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
-                            top.updateSequence(),
-                            exchFut.exchangeId().topologyVersion(),
-                            cctx.cacheId()));
-                    }
-
-                    msg.addPartition(p);
-                }
-            }
-        }
-
-        return assigns;
-    }
-
 /**
  *
  */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db72f531/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 8a097ed..d994a19 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.*;
 
 /**
@@ -76,6 +77,9 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     private ConcurrentMap<AffinityTopologyVersion, 
GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
         new ConcurrentHashMap8<>();
 
+    /** Demand lock. */
+    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -250,7 +254,115 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments 
assign(GridDhtPartitionsExchangeFuture exchFut) {
-        return demander.assign(exchFut);
+        // No assignments for disabled preloader.
+        GridDhtPartitionTopology top = cctx.dht().topology();
+
+        if (!cctx.rebalanceEnabled())
+            return new GridDhtPreloaderAssignments(exchFut, 
top.topologyVersion());
+
+        int partCnt = cctx.affinity().partitions();
+
+        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+            
exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+                ", topVer=" + top.topologyVersion() + ']';
+
+        GridDhtPreloaderAssignments assigns = new 
GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+        AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+        for (int p = 0; p < partCnt; p++) {
+            if (cctx.shared().exchange().hasPendingExchange()) {
+                if (log.isDebugEnabled())
+                    log.debug("Skipping assignments creation, exchange worker 
has pending assignments: " +
+                        exchFut.exchangeId());
+
+                break;
+            }
+
+            // If partition belongs to local node.
+            if (cctx.affinity().localNode(p, topVer)) {
+                GridDhtLocalPartition part = top.localPartition(p, topVer, 
true);
+
+                assert part != null;
+                assert part.id() == p;
+
+                if (part.state() != MOVING) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping partition assignment (state is not 
MOVING): " + part);
+
+                    continue; // For.
+                }
+
+                Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+                if (picked.isEmpty()) {
+                    top.own(part);
+
+                    if 
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+                        cctx.events().addPreloadEvent(p,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST, 
discoEvt.eventNode(),
+                            discoEvt.type(), discoEvt.timestamp());
+                    }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Owning partition as there are no other 
owners: " + part);
+                }
+                else {
+                    ClusterNode n = F.first(picked);
+
+                    GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+                    if (msg == null) {
+                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                            top.updateSequence(),
+                            exchFut.exchangeId().topologyVersion(),
+                            cctx.cacheId()));
+                    }
+
+                    msg.addPartition(p);
+                }
+            }
+        }
+
+        return assigns;
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Picked owners.
+     */
+    private Collection<ClusterNode> pickedOwners(int p, 
AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+        int affCnt = affNodes.size();
+
+        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+        int rmtCnt = rmts.size();
+
+        if (rmtCnt <= affCnt)
+            return rmts;
+
+        List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+        // Sort in descending order, so nodes with higher order will be first.
+        Collections.sort(sorted, CU.nodeComparator(false));
+
+        // Pick newest nodes.
+        return sorted.subList(0, affCnt);
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Nodes owning this partition.
+     */
+    private Collection<ClusterNode> remoteOwners(int p, 
AffinityTopologyVersion topVer) {
+        return F.view(cctx.dht().topology().owners(p, topVer), 
F.remoteNodes(cctx.nodeId()));
     }
 
     /** {@inheritDoc} */
@@ -531,7 +643,14 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        demander.unwindUndeploys();
+        demandLock.writeLock().lock();
+
+        try {
+            cctx.deploy().unwind(cctx);
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
     }
 
     /**

Reply via email to