ignite-1093

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d0b7d9fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d0b7d9fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d0b7d9fc

Branch: refs/heads/ignite-1093
Commit: d0b7d9fca8713aefeb6e6477679efb9d7a8db9e0
Parents: 76ba5d9
Author: Anton Vinogradov <vinogradov.an...@gmail.com>
Authored: Tue Aug 11 18:09:00 2015 +0300
Committer: Anton Vinogradov <vinogradov.an...@gmail.com>
Committed: Tue Aug 11 18:09:00 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCachePreloader.java    |   2 +-
 .../cache/GridCachePreloaderAdapter.java        |   2 +-
 .../dht/preloader/GridDhtPartitionDemander.java | 941 +++++++------------
 .../dht/preloader/GridDhtPartitionSupplier.java |  21 +-
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 5 files changed, 328 insertions(+), 640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index b8bb08e..1e915eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -91,7 +91,7 @@ public interface GridCachePreloader {
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
      */
-    public void addAssignments(GridDhtPreloaderAssignments assignments, 
boolean forcePreload);
+    public void addAssignments(GridDhtPreloaderAssignments assignments, 
boolean forcePreload) throws IgniteCheckedException;
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 0adf510..68deb2e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -142,7 +142,7 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload) throws IgniteCheckedException {
         // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fdd101e..e177dae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -27,30 +27,25 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
-import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.GridTopic.*;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 
 /**
- * Thread pool for requesting partitions from other nodes
- * and populating local cache.
+ * Thread pool for requesting partitions from other nodes and populating local 
cache.
  */
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
 public class GridDhtPartitionDemander {
@@ -63,35 +58,25 @@ public class GridDhtPartitionDemander {
     /** */
     private final ReadWriteLock busyLock;
 
-    /** */
-    @GridToStringInclude
-    private final Collection<DemandWorker> dmdWorkers;
-
     /** Preload predicate. */
     private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
     /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
     @GridToStringInclude
-    private SyncFuture syncFut;
-
-    /** Preload timeout. */
-    private final AtomicLong timeout;
-
-    /** Allows demand threads to synchronize their step. */
-    private CyclicBarrier barrier;
+    private volatile SyncFuture syncFut;
 
     /** Demand lock. */
     private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
 
-    /** */
-    private int poolSize;
-
     /** Last timeout object. */
     private AtomicReference<GridTimeoutObject> lastTimeoutObj = new 
AtomicReference<>();
 
     /** Last exchange future. */
     private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
 
+    /** Assignments. */
+    private volatile GridDhtPreloaderAssignments assigns;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
@@ -107,53 +92,47 @@ public class GridDhtPartitionDemander {
 
         boolean enabled = cctx.rebalanceEnabled() && 
!cctx.kernalContext().clientNode();
 
-        poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
         if (enabled) {
-            barrier = new CyclicBarrier(poolSize);
 
-            dmdWorkers = new ArrayList<>(poolSize);
+            for (int cnt = 0; cnt < 
cctx.config().getRebalanceThreadPoolSize(); cnt++) {
+                final int idx = cnt;
 
-            for (int i = 0; i < poolSize; i++)
-                dmdWorkers.add(new DemandWorker(i));
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new 
CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                    @Override public void apply(final UUID id, final 
GridDhtPartitionSupplyMessage m) {
+                        enterBusy();
 
-            syncFut = new SyncFuture(dmdWorkers);
+                        try {
+                            handleSupplyMessage(idx, id, m);
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
         }
-        else {
-            dmdWorkers = Collections.emptyList();
 
-            syncFut = new SyncFuture(dmdWorkers);
+        syncFut = new SyncFuture();
 
+        if (!enabled)
             // Calling onDone() immediately since preloading is disabled.
             syncFut.onDone();
-        }
-
-        timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
     }
 
     /**
      *
      */
     void start() {
-        if (poolSize > 0) {
-            for (DemandWorker w : dmdWorkers)
-                new IgniteThread(cctx.gridName(), "preloader-demand-worker", 
w).start();
-        }
     }
 
     /**
      *
      */
     void stop() {
-        U.cancel(dmdWorkers);
-
-        if (log.isDebugEnabled())
-            log.debug("Before joining on demand workers: " + dmdWorkers);
-
-        U.join(dmdWorkers, log);
-
-        if (log.isDebugEnabled())
-            log.debug("After joining on demand workers: " + dmdWorkers);
+        if (cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode()) {
+            for (int cnt = 0; cnt < 
cctx.config().getRebalanceThreadPoolSize(); cnt++)
+                cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
+        }
 
         lastExchangeFut = null;
 
@@ -177,13 +156,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @return Size of this thread pool.
-     */
-    int poolSize() {
-        return poolSize;
-    }
-
-    /**
      * Force preload.
      */
     void forcePreload() {
@@ -225,23 +197,22 @@ public class GridDhtPartitionDemander {
      * @param idx
      * @return topic
      */
-    static Object topic(int idx, int cacheId, UUID nodeId) {
-        return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: 
remove nodeId
+    static Object topic(int idx, int cacheId) {
+        return TOPIC_CACHE.topic("Demander", cacheId, idx);
     }
 
     /**
-     *
+     * @return {@code True} if topology changed.
      */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
+    private boolean topologyChanged(AffinityTopologyVersion topVer) {
+        return !cctx.affinity().affinityTopologyVersion().equals(topVer);
     }
 
     /**
-     * @param type Type.
-     * @param discoEvt Discovery event.
+     *
      */
-    private void preloadEvent(int type, DiscoveryEvent discoEvt) {
-        preloadEvent(-1, type, discoEvt);
+    private void leaveBusy() {
+        busyLock.readLock().unlock();
     }
 
     /**
@@ -256,28 +227,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker 
w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to 
hang.  This check
-        // will always make sure that interrupted flag gets reset before going 
into wait conditions.
-        // The true fix should actually make sure that interrupted flag does 
not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, 
this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
      * @param p Partition.
      * @param topVer Topology version.
      * @return Picked owners.
@@ -316,7 +265,7 @@ public class GridDhtPartitionDemander {
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
      */
-    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean 
force) {
+    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean 
force) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -325,341 +274,194 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             assert assigns != null;
 
-            synchronized (dmdWorkers) {
-                for (DemandWorker w : dmdWorkers)
-                    w.addAssignments(assigns);
-            }
-        }
-        else if (delay > 0) {
-            assert !force;
+            AffinityTopologyVersion topVer = 
cctx.affinity().affinityTopologyVersion();
 
-            GridTimeoutObject obj = lastTimeoutObj.get();
+            if (this.assigns != null) {
+                syncFut.get();
 
-            if (obj != null)
-                cctx.time().removeTimeoutObject(obj);
-
-            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
-            assert exchFut != null : "Delaying rebalance process without 
topology event.";
-
-            obj = new GridTimeoutObjectAdapter(delay) {
-                @Override public void onTimeout() {
-                    exchFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
-                            
cctx.shared().exchange().forcePreloadExchange(exchFut);
-                        }
-                    });
-                }
-            };
-
-            lastTimeoutObj.set(obj);
+                syncFut = new SyncFuture();
+            }
 
-            cctx.time().addTimeoutObject(obj);
-        }
-    }
+            if (assigns.isEmpty() || topologyChanged(topVer)) {
+                syncFut.onDone();
 
-    /**
-     *
-     */
-    void unwindUndeploys() {
-        demandLock.writeLock().lock();
+                return;
+            }
 
-        try {
-            cctx.deploy().unwind(cctx);
-        }
-        finally {
-            demandLock.writeLock().unlock();
-        }
-    }
+            this.assigns = assigns;
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtPartitionDemander.class, this);
-    }
+            for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assigns.entrySet()) {
+                GridDhtPartitionDemandMessage d = e.getValue();
 
-    /**
-     *
-     */
-    private class DemandWorker extends GridWorker {
-        /** Worker ID. */
-        private int id;
+                d.timeout(cctx.config().getRebalanceTimeout());
+                d.workerId(0);//old api support.
 
-        /** Partition-to-node assignments. */
-        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ 
= new LinkedBlockingDeque<>();
+                ClusterNode node = e.getKey();
 
-        /** Hide worker logger and use cache logger instead. */
-        private IgniteLogger log = GridDhtPartitionDemander.this.log;
+                GridConcurrentHashSet<Integer> remainings = new 
GridConcurrentHashSet<>();
 
-        /**
-         * @param id Worker ID.
-         */
-        private DemandWorker(int id) {
-            super(cctx.gridName(), "preloader-demand-worker", 
GridDhtPartitionDemander.this.log);
+                remainings.addAll(d.partitions());
 
-            assert id >= 0;
+                syncFut.append(node.id(), remainings);
 
-            this.id = id;
-        }
+                int lsnrCnt = cctx.config().getRebalanceThreadPoolSize();
 
-        /**
-         * @param assigns Assignments.
-         */
-        void addAssignments(GridDhtPreloaderAssignments assigns) {
-            assert assigns != null;
+                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
-            assignQ.offer(assigns);
+                for (int cnt = 0; cnt < lsnrCnt; cnt++)
+                    sParts.add(new HashSet<Integer>());
 
-            if (log.isDebugEnabled())
-                log.debug("Added assignments to worker: " + this);
-        }
+                Iterator<Integer> it = d.partitions().iterator();
 
-        /**
-         * @return {@code True} if topology changed.
-         */
-        private boolean topologyChanged() {
-            return !assignQ.isEmpty() || 
cctx.shared().exchange().topologyChanged();
-        }
+                int cnt = 0;
 
-        /**
-         * @param pick Node picked for preloading.
-         * @param p Partition.
-         * @param entry Preloaded entry.
-         * @param topVer Topology version.
-         * @return {@code False} if partition has become invalid during 
preloading.
-         * @throws IgniteInterruptedCheckedException If interrupted.
-         */
-        private boolean preloadEntry(
-            ClusterNode pick,
-            int p,
-            GridCacheEntryInfo entry,
-            AffinityTopologyVersion topVer
-        ) throws IgniteCheckedException {
-            try {
-                GridCacheEntryEx cached = null;
+                while (it.hasNext())
+                    sParts.get(cnt++ % lsnrCnt).add(it.next());
 
-                try {
-                    cached = cctx.dht().entryEx(entry.key());
+                for (cnt = 0; cnt < lsnrCnt; cnt++) {
 
-                    if (log.isDebugEnabled())
-                        log.debug("Rebalancing key [key=" + entry.key() + ", 
part=" + p + ", node=" + pick.id() + ']');
+                    if (!sParts.get(cnt).isEmpty()) {
 
-                    if (cctx.dht().isIgfsDataCache() &&
-                        cctx.dht().igfsDataSpaceUsed() > 
cctx.dht().igfsDataSpaceMax()) {
-                        LT.error(log, null, "Failed to rebalance IGFS data 
cache (IGFS space size exceeded maximum " +
-                            "value, will ignore rebalance entries): " + 
name());
+                        // Create copy.
+                        GridDhtPartitionDemandMessage initD = new 
GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
-                        if (cached.markObsoleteIfEmpty(null))
-                            
cached.context().cache().removeIfObsolete(cached.key());
+                        initD.topic(topic(cnt, cctx.cacheId()));
 
-                        return true;
-                    }
-
-                    if (preloadPred == null || preloadPred.apply(entry)) {
-                        if (cached.initialValue(
-                            entry.value(),
-                            entry.version(),
-                            entry.ttl(),
-                            entry.expireTime(),
-                            true,
-                            topVer,
-                            cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
-                        )) {
-                            cctx.evicts().touch(cached, topVer); // Start 
tracking.
-
-                            if 
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && 
!cached.isInternal())
-                                cctx.events().addEvent(cached.partition(), 
cached.key(), cctx.localNodeId(),
-                                    (IgniteUuid)null, null, 
EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
-                                    false, null, null, null);
+                        try {
+                            cctx.io().sendOrderedMessage(node, 
GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), 
d.timeout());
+                        }
+                        catch (IgniteCheckedException ex) {
+                            U.error(log, "Failed to send partition demand 
message to local node", ex);
                         }
-                        else if (log.isDebugEnabled())
-                            log.debug("Rebalancing entry is already in cache 
(will ignore) [key=" + cached.key() +
-                                ", part=" + p + ']');
                     }
-                    else if (log.isDebugEnabled())
-                        log.debug("Rebalance predicate evaluated to false for 
entry (will ignore): " + entry);
                 }
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Entry has been concurrently removed while 
rebalancing (will ignore) [key=" +
-                            cached.key() + ", part=" + p + ']');
-                }
-                catch (GridDhtInvalidPartitionException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Partition became invalid during rebalancing 
(will ignore): " + p);
-
-                    return false;
-                }
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteCheckedException("Failed to cache rebalanced 
entry (will stop rebalancing) [local=" +
-                    cctx.nodeId() + ", node=" + pick.id() + ", key=" + 
entry.key() + ", part=" + p + ']', e);
-            }
-
-            return true;
-        }
-
-        /**
-         * @param node Node to demand from.
-         * @param topVer Topology version.
-         * @param d Demand message.
-         * @param exchFut Exchange future.
-         * @return Missed partitions.
-         * @throws InterruptedException If interrupted.
-         * @throws ClusterTopologyCheckedException If node left.
-         * @throws IgniteCheckedException If failed to send message.
-         */
-        private Set<Integer> demandFromNode(
-            final ClusterNode node,
-            final AffinityTopologyVersion topVer,
-            final GridDhtPartitionDemandMessage d,
-            final GridDhtPartitionsExchangeFuture exchFut
-        ) throws InterruptedException, IgniteCheckedException {
-            final GridDhtPartitionTopology top = cctx.dht().topology();
 
-            long timeout = GridDhtPartitionDemander.this.timeout.get();
+                if (log.isInfoEnabled() && !d.partitions().isEmpty()) {
+                    LinkedList<Integer> s = new LinkedList<>(d.partitions());
 
-            d.timeout(timeout);
-            d.workerId(id);
+                    Collections.sort(s);
 
-            final Set<Integer> missed = new HashSet<>();
+                    StringBuilder sb = new StringBuilder();
 
-            final ConcurrentHashMap8<Integer, Boolean> remaining = new 
ConcurrentHashMap8<>();
+                    int start = -1;
 
-            for (int p : d.partitions())
-                remaining.put(p, false);
+                    int prev = -1;
 
-            if (isCancelled() || topologyChanged())
-                return missed;
+                    Iterator<Integer> sit = s.iterator();
 
-            int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo 
= getRebalanceThreadPoolSize / assigns.count
+                    while (sit.hasNext()) {
+                        int p = sit.next();
+                        if (start == -1) {
+                            start = p;
+                            prev = p;
+                        }
 
-            List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
+                        if (prev < p - 1) {
+                            sb.append(start);
 
-            int cnt = 0;
+                            if (start != prev)
+                                sb.append("-").append(prev);
 
-            while (cnt < threadCnt) {
-                sParts.add(new HashSet<Integer>());
+                            sb.append(", ");
 
-                final int idx = cnt;
+                            start = p;
+                        }
 
-                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), 
node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                    @Override public void apply(UUID id, 
GridDhtPartitionSupplyMessage m) {
-                        enterBusy();
+                        if (!sit.hasNext()) {
+                            sb.append(start);
 
-                        try {
-                            handleSupplyMessage(idx, new SupplyMessage(id, m), 
node, topVer, top,
-                                exchFut, missed, d, remaining);
-                        }finally{
-                            leaveBusy();
+                            if (start != p)
+                                sb.append("-").append(p);
                         }
+
+                        prev = p;
                     }
-                });
 
-                cnt++;
+                    log.info("Requested rebalancing [from node=" + node.id() + 
", partitions=" + s.size() + " (" + sb.toString() + ")]");
+                }
             }
+        }
+        else if (delay > 0) {
+            GridTimeoutObject obj = lastTimeoutObj.get();
 
-            Iterator<Integer> it = d.partitions().iterator();
-
-            cnt = 0;
+            if (obj != null)
+                cctx.time().removeTimeoutObject(obj);
 
-            while (it.hasNext()) {
-                sParts.get(cnt % threadCnt).add(it.next());
+            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
 
-                cnt++;
-            }
+            assert exchFut != null : "Delaying rebalance process without 
topology event.";
 
-            try {
-                cnt = 0;
+            obj = new GridTimeoutObjectAdapter(delay) {
+                @Override public void onTimeout() {
+                    exchFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+                            
cctx.shared().exchange().forcePreloadExchange(exchFut);
+                        }
+                    });
+                }
+            };
 
-                while (cnt < threadCnt) {
+            lastTimeoutObj.set(obj);
 
-                    // Create copy.
-                    GridDhtPartitionDemandMessage initD = new 
GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+            cctx.time().addTimeoutObject(obj);
+        }
+    }
 
-                    initD.topic(topic(cnt, cctx.cacheId(),node.id()));
+    /**
+     *
+     */
+    void unwindUndeploys() {
+        demandLock.writeLock().lock();
 
-                    try {
-                        if (logg && cctx.name().equals("cache"))
-                        System.out.println("D "+cnt + " initial Demand "+" 
"+cctx.localNode().id());
+        try {
+            cctx.deploy().unwind(cctx);
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
+    }
 
-                        cctx.io().sendOrderedMessage(node, 
GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), 
d.timeout());
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to send partition demand message 
to local node", e);
-                    }
+    /**
+     * @param idx Index.
+     * @param id Node id.
+     * @param supply Supply.
+     */
+    private void handleSupplyMessage(
+        int idx,
+        final UUID id,
+        final GridDhtPartitionSupplyMessage supply) {
+        ClusterNode node = cctx.node(id);
 
-                    cnt++;
-                }
+        assert node != null;
 
-                do {
-                    U.sleep(1000);//Todo: improve
-                }
-                while (!isCancelled() && !topologyChanged() && 
!remaining.isEmpty());
+        GridDhtPartitionDemandMessage d = assigns.get(node);
 
-                return missed;
-            }
-            finally {
-                cnt = 0;
+        AffinityTopologyVersion topVer = d.topologyVersion();
 
-                while (cnt < threadCnt) {
-                    cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), 
node.id()));
+        if (topologyChanged(topVer)) {
+            syncFut.cancel(id);
 
-                    cnt++;
-                }
-            }
+            return;
         }
 
-        boolean logg = false;
-
-        /**
-         * @param s Supply message.
-         * @param node Node.
-         * @param topVer Topology version.
-         * @param top Topology.
-         * @param exchFut Exchange future.
-         * @param missed Missed.
-         * @param d initial DemandMessage.
-         */
-        private void handleSupplyMessage(
-            int idx,
-            SupplyMessage s,
-            ClusterNode node,
-            AffinityTopologyVersion topVer,
-            GridDhtPartitionTopology top,
-            GridDhtPartitionsExchangeFuture exchFut,
-            Set<Integer> missed,
-            GridDhtPartitionDemandMessage d,
-            ConcurrentHashMap8 remaining) {
-
-            if (logg && cctx.name().equals("cache"))
-            System.out.println("D "+idx + " handled supply message "+ 
cctx.localNode().id());
-
-            // Check that message was received from expected node.
-            if (!s.senderId().equals(node.id())) {
-                U.warn(log, "Received supply message from unexpected node 
[expectedId=" + node.id() +
-                    ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+        if (log.isDebugEnabled())
+            log.debug("Received supply message: " + supply);
 
-                return;
-            }
+        // Check whether there were class loading errors on unmarshal
+        if (supply.classError() != null) {
+            if (log.isDebugEnabled())
+                log.debug("Class got undeployed during preloading: " + 
supply.classError());
 
-            if (topologyChanged())
-                return;
+            syncFut.cancel(id);
 
-            if (log.isDebugEnabled())
-                log.debug("Received supply message: " + s);
+            return;
+        }
 
-            GridDhtPartitionSupplyMessage supply = s.supply();
+        final GridDhtPartitionTopology top = cctx.dht().topology();
 
-            // Check whether there were class loading errors on unmarshal
-            if (supply.classError() != null) {
-                if (log.isDebugEnabled())
-                    log.debug("Class got undeployed during preloading: " + 
supply.classError());
+        GridDhtPartitionsExchangeFuture exchFut = assigns.exchangeFuture();
 
-                return;
-            }
+        try {
 
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supply.infos().entrySet()) {
@@ -689,19 +491,12 @@ public class GridDhtPartitionDemander {
 
                                     continue;
                                 }
-                                try {
-                                    if (!preloadEntry(node, p, entry, topVer)) 
{
-                                        if (log.isDebugEnabled())
-                                            log.debug("Got entries for invalid 
partition during " +
-                                                "preloading (will skip) [p=" + 
p + ", entry=" + entry + ']');
-
-                                        break;
-                                    }
-                                }
-                                catch (IgniteCheckedException ex) {
-                                    cancel();
+                                if (!preloadEntry(node, p, entry, topVer)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got entries for invalid 
partition during " +
+                                            "preloading (will skip) [p=" + p + 
", entry=" + entry + ']');
 
-                                    return;
+                                    break;
                                 }
                             }
 
@@ -710,12 +505,9 @@ public class GridDhtPartitionDemander {
                             // If message was last for this partition,
                             // then we take ownership.
                             if (last) {
-                                top.own(part);//todo: close future?
-
-//                                if (logg && cctx.name().equals("cache"))
-//                                    System.out.println("D "+idx + " last "+ 
p +" "+ cctx.localNode().id());
+                                top.own(part);
 
-                                remaining.remove(p);
+                                syncFut.onPartitionDone(id, p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Finished rebalancing partition: 
" + part);
@@ -731,218 +523,139 @@ public class GridDhtPartitionDemander {
                         }
                     }
                     else {
-                        remaining.remove(p);
+                        syncFut.onPartitionDone(id, p);
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (state 
is not MOVING): " + part);
                     }
                 }
                 else {
-                    remaining.remove(p);
+                    syncFut.onPartitionDone(id, p);
 
                     if (log.isDebugEnabled())
                         log.debug("Skipping rebalancing partition (it does not 
belong on current node): " + p);
                 }
             }
 
-            for (Integer miss : s.supply().missed())
-                remaining.remove(miss);
-
             // Only request partitions based on latest topology version.
-            for (Integer miss : s.supply().missed())
+            for (Integer miss : supply.missed())
                 if (cctx.affinity().localNode(miss, topVer))
-                    missed.add(miss);
+                    syncFut.onMissedPartition(id, miss);
 
-            if (!remaining.isEmpty()) {
-                try {
-                    // Create copy.
-                    GridDhtPartitionDemandMessage nextD =
-                        new GridDhtPartitionDemandMessage(d, 
Collections.<Integer>emptySet());
+            for (Integer miss : supply.missed())
+                syncFut.onPartitionDone(id, miss);
 
-                    nextD.topic(topic(idx, cctx.cacheId(), node.id()));
+            if (!syncFut.isDone()) {
 
-                    // Send demand message.
-                    cctx.io().sendOrderedMessage(node, 
GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
-                        nextD, cctx.ioPolicy(), d.timeout());
+                // Create copy.
+                GridDhtPartitionDemandMessage nextD =
+                    new GridDhtPartitionDemandMessage(d, 
Collections.<Integer>emptySet());
 
-                    if (logg && cctx.name().equals("cache"))
-                        System.out.println("D " + idx + " ack  " + 
cctx.localNode().id());
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(log, "Failed to receive partitions from node 
(rebalancing will not " +
-                        "fully finish) [node=" + node.id() + ", msg=" + d + 
']', ex);
+                nextD.topic(topic(idx, cctx.cacheId()));
 
-                    cancel();
-                }
+                // Send demand message.
+                cctx.io().sendOrderedMessage(node, 
GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
+                    nextD, cctx.ioPolicy(), d.timeout());
             }
         }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Node left during rebalancing (will retry) [node=" + 
node.id() +
+                    ", msg=" + e.getMessage() + ']');
+            syncFut.cancel(id);
+        }
+        catch (IgniteCheckedException ex) {
+            U.error(log, "Failed to receive partitions from node (rebalancing 
will not " +
+                "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
 
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-            try {
-                int rebalanceOrder = cctx.config().getRebalanceOrder();
-
-                if (!CU.isMarshallerCache(cctx.name())) {
-                    if (log.isDebugEnabled())
-                        log.debug("Waiting for marshaller cache preload 
[cacheName=" + cctx.name() + ']');
+            syncFut.cancel(id);
+        }
+    }
 
-                    try {
-                        
cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for marshaller cache 
preload future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ']');
+    /**
+     * @param pick Node picked for preloading.
+     * @param p Partition.
+     * @param entry Preloaded entry.
+     * @param topVer Topology version.
+     * @return {@code False} if partition has become invalid during preloading.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    private boolean preloadEntry(
+        ClusterNode pick,
+        int p,
+        GridCacheEntryInfo entry,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
+        try {
+            GridCacheEntryEx cached = null;
 
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered preload future should never 
fail: " + e.getMessage(), e);
-                    }
-                }
+            try {
+                cached = cctx.dht().entryEx(entry.key());
 
-                if (rebalanceOrder > 0) {
-                    IgniteInternalFuture<?> fut = 
cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing key [key=" + entry.key() + ", 
part=" + p + ", node=" + pick.id() + ']');
 
-                    try {
-                        if (fut != null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Waiting for dependant caches 
rebalance [cacheName=" + cctx.name() +
-                                    ", rebalanceOrder=" + rebalanceOrder + 
']');
+                if (cctx.dht().isIgfsDataCache() &&
+                    cctx.dht().igfsDataSpaceUsed() > 
cctx.dht().igfsDataSpaceMax()) {
+                    LT.error(log, null, "Failed to rebalance IGFS data cache 
(IGFS space size exceeded maximum " +
+                        "value, will ignore rebalance entries)");
 
-                            fut.get();
-                        }
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for ordered rebalance 
future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ", 
rebalanceOrder=" + rebalanceOrder + ']');
+                    if (cached.markObsoleteIfEmpty(null))
+                        
cached.context().cache().removeIfObsolete(cached.key());
 
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered rebalance future should never 
fail: " + e.getMessage(), e);
-                    }
+                    return true;
                 }
 
-                GridDhtPartitionsExchangeFuture exchFut = null;
-
-                boolean stopEvtFired = false;
-
-                while (!isCancelled()) {
-                    try {
-                        barrier.await();
-
-                        if (id == 0 && exchFut != null && !exchFut.dummy() &&
-                            
cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
-
-                            if (!cctx.isReplicated() || !stopEvtFired) {
-                                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, 
exchFut.discoveryEvent());
-
-                                stopEvtFired = true;
-                            }
-                        }
-                    }
-                    catch (BrokenBarrierException ignore) {
-                        throw new InterruptedException("Demand worker 
stopped.");
-                    }
-
-                    // Sync up all demand threads at this step.
-                    GridDhtPreloaderAssignments assigns = null;
-
-                    while (assigns == null)
-                        assigns = poll(assignQ, 
cctx.gridConfig().getNetworkTimeout(), this);
-
-                    demandLock.readLock().lock();
-
-                    try {
-                        exchFut = assigns.exchangeFuture();
-
-                        // Assignments are empty if preloading is disabled.
-                        if (assigns.isEmpty())
-                            continue;
-
-                        boolean resync = false;
-
-                        // While.
-                        // =====
-                        while (!isCancelled() && !topologyChanged() && 
!resync) {
-                            Collection<Integer> missed = new HashSet<>();
-
-                            // For.
-                            // ===
-                            for (ClusterNode node : assigns.keySet()) {
-                                if (topologyChanged() || isCancelled())
-                                    break; // For.
-
-                                GridDhtPartitionDemandMessage d = 
assigns.remove(node);
-
-                                // If another thread is already processing 
this message,
-                                // move to the next node.
-                                if (d == null)
-                                    continue; // For.
-
-                                try {
-                                    Set<Integer> set = demandFromNode(node, 
assigns.topologyVersion(), d, exchFut);
-
-                                    if (!set.isEmpty()) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Missed partitions from 
node [nodeId=" + node.id() + ", missed=" +
-                                                set + ']');
-
-                                        missed.addAll(set);
-                                    }
-                                }
-                                catch (IgniteInterruptedCheckedException e) {
-                                    throw e;
-                                }
-                                catch (ClusterTopologyCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Node left during 
rebalancing (will retry) [node=" + node.id() +
-                                            ", msg=" + e.getMessage() + ']');
-
-                                    resync = true;
-
-                                    break; // For.
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.error(log, "Failed to receive partitions 
from node (rebalancing will not " +
-                                        "fully finish) [node=" + node.id() + 
", msg=" + d + ']', e);
-                                }
-                            }
-
-                            // Processed missed entries.
-                            if (!missed.isEmpty()) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Reassigning partitions that 
were missed: " + missed);
-
-                                assert exchFut.exchangeId() != null;
-
-                                
cctx.shared().exchange().forceDummyExchange(true, exchFut);
-                            }
-                            else
-                                break; // While.
-                        }
-                    }
-                    finally {
-                        demandLock.readLock().unlock();
-
-                        syncFut.onWorkerDone(this);
+                if (preloadPred == null || preloadPred.apply(entry)) {
+                    if (cached.initialValue(
+                        entry.value(),
+                        entry.version(),
+                        entry.ttl(),
+                        entry.expireTime(),
+                        true,
+                        topVer,
+                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                    )) {
+                        cctx.evicts().touch(cached, topVer); // Start tracking.
+
+                        if 
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && 
!cached.isInternal())
+                            cctx.events().addEvent(cached.partition(), 
cached.key(), cctx.localNodeId(),
+                                (IgniteUuid)null, null, 
EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+                                false, null, null, null);
                     }
-
-                    cctx.shared().exchange().scheduleResendPartitions();
+                    else if (log.isDebugEnabled())
+                        log.debug("Rebalancing entry is already in cache (will 
ignore) [key=" + cached.key() +
+                            ", part=" + p + ']');
                 }
+                else if (log.isDebugEnabled())
+                    log.debug("Rebalance predicate evaluated to false for 
entry (will ignore): " + entry);
             }
-            finally {
-                // Safety.
-                syncFut.onWorkerDone(this);
+            catch (GridCacheEntryRemovedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Entry has been concurrently removed while 
rebalancing (will ignore) [key=" +
+                        cached.key() + ", part=" + p + ']');
             }
-        }
+            catch (GridDhtInvalidPartitionException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Partition became invalid during rebalancing 
(will ignore): " + p);
 
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(DemandWorker.class, this, "assignQ", assignQ, 
"super", super.toString());
+                return false;
+            }
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw e;
         }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Failed to cache rebalanced entry 
(will stop rebalancing) [local=" +
+                cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() 
+ ", part=" + p + ']', e);
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionDemander.class, this);
     }
 
     /**
@@ -1035,88 +748,80 @@ public class GridDhtPartitionDemander {
         return assigns;
     }
 
-    /**
-     *
-     */
-    private class SyncFuture extends GridFutureAdapter<Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
+/**
+ *
+ */
+private class SyncFuture extends GridFutureAdapter<Object> {
+    /** */
+    private static final long serialVersionUID = 1L;
 
-        /** Remaining workers. */
-        private Collection<DemandWorker> remaining;
+    private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new 
ConcurrentHashMap8<>();
 
-        /**
-         * @param workers List of workers.
-         */
-        private SyncFuture(Collection<DemandWorker> workers) {
-            assert workers.size() == poolSize();
+    private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new 
ConcurrentHashMap8<>();
 
-            remaining = Collections.synchronizedList(new 
LinkedList<>(workers));
-        }
+    public void append(UUID nodeId, Collection<Integer> parts) {
+        remaining.put(nodeId, parts);
 
-        /**
-         * @param w Worker who iterated through all partitions.
-         */
-        void onWorkerDone(DemandWorker w) {
-            if (isDone())
-                return;
+        missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+    }
 
-            if (remaining.remove(w))
-                if (log.isDebugEnabled())
-                    log.debug("Completed full partition iteration for worker 
[worker=" + w + ']');
+    void cancel(UUID nodeId) {
+        if (isDone())
+            return;
 
-            if (remaining.isEmpty()) {
-                if (log.isDebugEnabled())
-                    log.debug("Completed sync future.");
+        remaining.remove(nodeId);
 
-                onDone();
-            }
-        }
+        checkIsDone();
     }
 
-    /**
-     * Supply message wrapper.
-     */
-    private static class SupplyMessage {
-        /** Sender ID. */
-        private UUID sndId;
-
-        /** Supply message. */
-        private GridDhtPartitionSupplyMessage supply;
-
-        /**
-         * Dummy constructor.
-         */
-        private SupplyMessage() {
-            // No-op.
-        }
+    void onMissedPartition(UUID nodeId, int p) {
+        if (missed.get(nodeId) == null)
+            missed.put(nodeId, new GridConcurrentHashSet<Integer>());
 
-        /**
-         * @param sndId Sender ID.
-         * @param supply Supply message.
-         */
-        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
-            this.sndId = sndId;
-            this.supply = supply;
-        }
+        missed.get(nodeId).add(p);
+   }
 
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return sndId;
-        }
+    void onPartitionDone(UUID nodeId, int p) {
+        if (isDone())
+            return;
+
+        Collection<Integer> parts = remaining.get(nodeId);
+
+        parts.remove(p);
 
-        /**
-         * @return Message.
-         */
-        GridDhtPartitionSupplyMessage supply() {
-            return supply;
+        if (parts.isEmpty()) {
+            remaining.remove(nodeId);
+
+            if (log.isDebugEnabled())
+                log.debug("Completed full partition iteration for node 
[nodeId=" + nodeId + ']');
         }
 
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SupplyMessage.class, this);
+        checkIsDone();
+    }
+
+    private void checkIsDone() {
+        if (remaining.isEmpty()) {
+            if (log.isDebugEnabled())
+                log.debug("Completed sync future.");
+
+            Collection<Integer> m = new HashSet<>();
+
+            for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+                if (e.getValue() != null && !e.getValue().isEmpty())
+                    m.addAll(e.getValue());
+            }
+
+            if (!m.isEmpty()) {
+                if (log.isDebugEnabled())
+                    log.debug("Reassigning partitions that were missed: " + m);
+
+                cctx.shared().exchange().forceDummyExchange(true, 
assigns.exchangeFuture());
+            }
+
+            missed.clear();
+
+            onDone();
         }
     }
 }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 920d10d..b948fbd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -108,7 +108,7 @@ class GridDhtPartitionSupplier {
      * @return topic
      */
     static Object topic(int idx, int id) {
-        return TOPIC_CACHE.topic("SupplyPool", idx, id);
+        return TOPIC_CACHE.topic("Supplier", idx, id);
     }
 
     /**
@@ -138,8 +138,6 @@ class GridDhtPartitionSupplier {
         this.preloadPred = preloadPred;
     }
 
-    boolean logg = false;
-
     /**
      * @return {@code true} if entered to busy state.
      */
@@ -172,9 +170,6 @@ class GridDhtPartitionSupplier {
         if 
(!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
             return;
 
-        if (logg && cctx.name().equals("cache"))
-            System.out.println("S " + idx + " process message " + 
cctx.localNode().id());
-
         GridDhtPartitionSupplyMessage s = new 
GridDhtPartitionSupplyMessage(d.workerId(),
             d.updateSequence(), cctx.cacheId());
 
@@ -191,12 +186,8 @@ class GridDhtPartitionSupplier {
                 doneMap.remove(scId);
             }
 
-            if (doneMap.get(scId) != null) {
-                if (logg && cctx.name().equals("cache"))
-                    System.out.println("S " + idx + " exit " + 
cctx.localNode().id());
-
+            if (doneMap.get(scId) != null)
                 return;
-            }
 
             long bCnt = 0;
 
@@ -282,9 +273,6 @@ class GridDhtPartitionSupplier {
                                     return;
                                 }
                                 else {
-                                    if (logg && cctx.name().equals("cache"))
-                                        System.out.println("S " + idx + " 
renew " + part + " " + cctx.localNode().id());
-
                                     s = new 
GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
                                         cctx.cacheId());
                                 }
@@ -473,9 +461,6 @@ class GridDhtPartitionSupplier {
                     // Mark as last supply message.
                     s.last(part);
 
-//                    if (logg && cctx.name().equals("cache"))
-//                        System.out.println("S " + idx + " last " + part + " 
" + cctx.localNode().id());
-
                     phase = 0;
 
                     sctx = null;
@@ -508,8 +493,6 @@ class GridDhtPartitionSupplier {
      */
     private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, 
GridDhtPartitionSupplyMessage s)
         throws IgniteCheckedException {
-        if (logg && cctx.name().equals("cache"))
-            System.out.println("S sent "+ cctx.localNode().id());
 
         try {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a22f281..8a097ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -254,7 +254,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload) throws IgniteCheckedException {
         demander.addAssignments(assignments, forcePreload);
     }
 

Reply via email to