http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 0000000,253ac18..e7067f2
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@@ -1,0 -1,361 +1,348 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.managers.communication;
+ 
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ 
+ import java.io.*;
+ import java.nio.*;
+ 
+ /**
+  * Wrapper for all grid messages.
+  */
+ public class GridIoMessage extends GridTcpCommunicationMessageAdapter {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** Policy. */
+     private GridIoPolicy plc;
+ 
+     /** Message topic. */
+     @GridToStringInclude
+     @GridDirectTransient
+     private Object topic;
+ 
+     /** Topic bytes. */
+     private byte[] topicBytes;
+ 
+     /** Topic ordinal. */
+     private int topicOrd = -1;
+ 
 -    /** Message order. */
 -    private long msgId = -1;
++    /** Message ordered flag. */
++    private boolean ordered;
+ 
+     /** Message timeout. */
+     private long timeout;
+ 
+     /** Whether message can be skipped on timeout. */
+     private boolean skipOnTimeout;
+ 
+     /** Message. */
+     private GridTcpCommunicationMessageAdapter msg;
+ 
+     /**
+      * No-op constructor to support {@link Externalizable} interface.
+      * This constructor is not meant to be used for other purposes.
+      */
+     public GridIoMessage() {
+         // No-op.
+     }
+ 
+     /**
+      * @param plc Policy.
+      * @param topic Communication topic.
+      * @param topicOrd Topic ordinal value.
+      * @param msg Message.
 -     * @param msgId Message ID.
++     * @param ordered Message ordered flag.
+      * @param timeout Timeout.
+      * @param skipOnTimeout Whether message can be skipped on timeout.
+      */
 -    public GridIoMessage(GridIoPolicy plc, Object topic, int topicOrd, 
GridTcpCommunicationMessageAdapter msg,
 -        long msgId, long timeout, boolean skipOnTimeout) {
++    public GridIoMessage(
++        GridIoPolicy plc,
++        Object topic,
++        int topicOrd,
++        GridTcpCommunicationMessageAdapter msg,
++        boolean ordered,
++        long timeout,
++        boolean skipOnTimeout
++    ) {
+         assert plc != null;
+         assert topic != null;
+         assert topicOrd <= Byte.MAX_VALUE;
+         assert msg != null;
+ 
+         this.plc = plc;
+         this.msg = msg;
+         this.topic = topic;
+         this.topicOrd = topicOrd;
 -        this.msgId = msgId;
++        this.ordered = ordered;
+         this.timeout = timeout;
+         this.skipOnTimeout = skipOnTimeout;
+     }
+ 
+     /**
+      * @return Policy.
+      */
+     GridIoPolicy policy() {
+         return plc;
+     }
+ 
+     /**
+      * @return Topic.
+      */
+     Object topic() {
+         return topic;
+     }
+ 
+     /**
+      * @param topic Topic.
+      */
+     void topic(Object topic) {
+         this.topic = topic;
+     }
+ 
+     /**
+      * @return Topic bytes.
+      */
+     byte[] topicBytes() {
+         return topicBytes;
+     }
+ 
+     /**
+      * @param topicBytes Topic bytes.
+      */
+     void topicBytes(byte[] topicBytes) {
+         this.topicBytes = topicBytes;
+     }
+ 
+     /**
+      * @return Topic ordinal.
+      */
+     int topicOrdinal() {
+         return topicOrd;
+     }
+ 
+     /**
+      * @return Message.
+      */
+     public Object message() {
+         return msg;
+     }
+ 
+     /**
 -     * @return Message ID.
 -     */
 -    long messageId() {
 -        return msgId;
 -    }
 -
 -    /**
+      * @return Message timeout.
+      */
+     public long timeout() {
+         return timeout;
+     }
+ 
+     /**
+      * @return Whether message can be skipped on timeout.
+      */
+     public boolean skipOnTimeout() {
+         return skipOnTimeout;
+     }
+ 
+     /**
+      * @return {@code True} if message is ordered, {@code false} otherwise.
+      */
+     boolean isOrdered() {
 -        return msgId > 0;
++        return ordered;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean equals(Object obj) {
 -        if (obj == this)
 -            return true;
 -
 -        if (!(obj instanceof GridIoMessage))
 -            return false;
 -
 -        GridIoMessage other = (GridIoMessage)obj;
 -
 -        return topic.equals(other.topic) && msgId == other.msgId;
++        throw new AssertionError();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int hashCode() {
 -        int res = topic.hashCode();
 -
 -        res = 31 * res + (int)(msgId ^ (msgId >>> 32));
 -        res = 31 * res + topic.hashCode();
 -
 -        return res;
++        throw new AssertionError();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+     @Override public GridTcpCommunicationMessageAdapter clone() {
+         GridIoMessage _clone = new GridIoMessage();
+ 
+         clone0(_clone);
+ 
+         return _clone;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("RedundantCast")
+     @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+         GridIoMessage _clone = (GridIoMessage)_msg;
+ 
+         _clone.plc = plc;
+         _clone.topic = topic;
+         _clone.topicBytes = topicBytes;
+         _clone.topicOrd = topicOrd;
 -        _clone.msgId = msgId;
++        _clone.ordered = ordered;
+         _clone.timeout = timeout;
+         _clone.skipOnTimeout = skipOnTimeout;
+         _clone.msg = msg != null ? 
(GridTcpCommunicationMessageAdapter)msg.clone() : null;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("all")
+     @Override public boolean writeTo(ByteBuffer buf) {
+         commState.setBuffer(buf);
+ 
+         if (!commState.typeWritten) {
+             if (!commState.putByte(directType()))
+                 return false;
+ 
+             commState.typeWritten = true;
+         }
+ 
+         switch (commState.idx) {
+             case 0:
+                 if (!commState.putMessage(msg))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 1:
 -                if (!commState.putLong(msgId))
++                if (!commState.putBoolean(ordered))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 2:
+                 if (!commState.putEnum(plc))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 3:
+                 if (!commState.putBoolean(skipOnTimeout))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 4:
+                 if (!commState.putLong(timeout))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 5:
+                 if (!commState.putByteArray(topicBytes))
+                     return false;
+ 
+                 commState.idx++;
+ 
+             case 6:
+                 if (!commState.putInt(topicOrd))
+                     return false;
+ 
+                 commState.idx++;
+ 
+         }
+ 
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("all")
+     @Override public boolean readFrom(ByteBuffer buf) {
+         commState.setBuffer(buf);
+ 
+         switch (commState.idx) {
+             case 0:
+                 Object msg0 = commState.getMessage();
+ 
+                 if (msg0 == MSG_NOT_READ)
+                     return false;
+ 
+                 msg = (GridTcpCommunicationMessageAdapter)msg0;
+ 
+                 commState.idx++;
+ 
+             case 1:
 -                if (buf.remaining() < 8)
++                if (buf.remaining() < 1)
+                     return false;
+ 
 -                msgId = commState.getLong();
++                ordered = commState.getBoolean();
+ 
+                 commState.idx++;
+ 
+             case 2:
+                 if (buf.remaining() < 1)
+                     return false;
+ 
+                 byte plc0 = commState.getByte();
+ 
+                 plc = GridIoPolicy.fromOrdinal(plc0);
+ 
+                 commState.idx++;
+ 
+             case 3:
+                 if (buf.remaining() < 1)
+                     return false;
+ 
+                 skipOnTimeout = commState.getBoolean();
+ 
+                 commState.idx++;
+ 
+             case 4:
+                 if (buf.remaining() < 8)
+                     return false;
+ 
+                 timeout = commState.getLong();
+ 
+                 commState.idx++;
+ 
+             case 5:
+                 byte[] topicBytes0 = commState.getByteArray();
+ 
+                 if (topicBytes0 == BYTE_ARR_NOT_READ)
+                     return false;
+ 
+                 topicBytes = topicBytes0;
+ 
+                 commState.idx++;
+ 
+             case 6:
+                 if (buf.remaining() < 4)
+                     return false;
+ 
+                 topicOrd = commState.getInt();
+ 
+                 commState.idx++;
+ 
+         }
+ 
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public byte directType() {
+         return 8;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridIoMessage.class, this);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0000000,b720eb1..837b162
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@@ -1,0 -1,865 +1,847 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ 
+ /**
+  * Cache communication manager.
+  */
+ public class GridCacheIoManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
+     /** Message ID generator. */
+     private static final AtomicLong idGen = new AtomicLong();
+ 
+     /** Delay in milliseconds between retries. */
+     private long retryDelay;
+ 
+     /** Number of retries using to send messages. */
+     private int retryCnt;
+ 
+     /** Indexed class handlers. */
+     private Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new 
HashMap<>();
+ 
+     /** Handler registry. */
+     private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, 
GridCacheMessage<K, V>>>
+         clsHandlers = new ConcurrentHashMap8<>();
+ 
+     /** Ordered handler registry. */
+     private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends 
GridCacheMessage<K, V>>> orderedHandlers =
+         new ConcurrentHashMap8<>();
+ 
+     /** Stopping flag. */
+     private boolean stopping;
+ 
+     /** Error flag. */
+     private final AtomicBoolean startErr = new AtomicBoolean();
+ 
+     /** Mutex. */
+     private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+ 
+     /** Deployment enabled. */
+     private boolean depEnabled;
+ 
+     /** Message listener. */
+     private GridMessageListener lsnr = new GridMessageListener() {
+         @SuppressWarnings("unchecked")
+         @Override public void onMessage(final UUID nodeId, Object msg) {
+             if (log.isDebugEnabled())
+                 log.debug("Received unordered cache communication message 
[nodeId=" + nodeId +
+                     ", locId=" + cctx.localNodeId() + ", msg=" + msg + ']');
+ 
+             final GridCacheMessage<K, V> cacheMsg = (GridCacheMessage<K, 
V>)msg;
+ 
+             int msgIdx = cacheMsg.lookupIndex();
+ 
+             IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c = null;
+ 
+             if (msgIdx >= 0) {
+                 IgniteBiInClosure[] cacheClsHandlers = 
idxClsHandlers.get(cacheMsg.cacheId());
+ 
+                 if (cacheClsHandlers != null)
+                     c = cacheClsHandlers[msgIdx];
+             }
+ 
+             if (c == null)
+                 c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), 
cacheMsg.getClass()));
+ 
+             if (c == null) {
+                 if (log.isDebugEnabled())
+                     log.debug("Received message without registered handler 
(will ignore) [msg=" + msg +
+                         ", nodeId=" + nodeId + ']');
+ 
+                 return;
+             }
+ 
+             long locTopVer = cctx.discovery().topologyVersion();
+             long rmtTopVer = cacheMsg.topologyVersion();
+ 
+             if (locTopVer < rmtTopVer) {
+                 if (log.isDebugEnabled())
+                     log.debug("Received message has higher topology version 
[msg=" + msg +
+                         ", locTopVer=" + locTopVer + ", rmtTopVer=" + 
rmtTopVer + ']');
+ 
+                 IgniteFuture<Long> topFut = 
cctx.discovery().topologyFuture(rmtTopVer);
+ 
+                 if (!topFut.isDone()) {
+                     final IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c0 
= c;
+ 
+                     topFut.listenAsync(new CI1<IgniteFuture<Long>>() {
+                         @Override public void apply(IgniteFuture<Long> t) {
+                             onMessage0(nodeId, cacheMsg, c0);
+                         }
+                     });
+ 
+                     return;
+                 }
+             }
+ 
+             onMessage0(nodeId, cacheMsg, c);
+         }
+     };
+ 
+     /** {@inheritDoc} */
+     @Override public void start0() throws IgniteCheckedException {
+         retryDelay = cctx.gridConfig().getNetworkSendRetryDelay();
+         retryCnt = cctx.gridConfig().getNetworkSendRetryCount();
+ 
+         depEnabled = cctx.gridDeploy().enabled();
+ 
+         cctx.gridIO().addMessageListener(TOPIC_CACHE, lsnr);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("BusyWait")
+     @Override protected void onKernalStop0(boolean cancel) {
+         cctx.gridIO().removeMessageListener(TOPIC_CACHE);
+ 
+         for (Object ordTopic : orderedHandlers.keySet())
+             cctx.gridIO().removeMessageListener(ordTopic);
+ 
+         boolean interrupted = false;
+ 
+         // Busy wait is intentional.
+         while (true) {
+             try {
+                 if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
+                     break;
+                 else
+                     Thread.sleep(200);
+             }
+             catch (InterruptedException ignore) {
+                 // Preserve interrupt status & ignore.
+                 // Note that interrupted flag is cleared.
+                 interrupted = true;
+             }
+         }
+ 
+         if (interrupted)
+             Thread.currentThread().interrupt();
+ 
+         try {
+             stopping = true;
+         }
+         finally {
+             rw.writeUnlock();
+         }
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param cacheMsg Cache message.
+      * @param c Handler closure.
+      */
+     private void onMessage0(final UUID nodeId, final GridCacheMessage<K, V> 
cacheMsg,
+         final IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c) {
+         rw.readLock();
+ 
+         try {
+             if (stopping) {
+                 if (log.isDebugEnabled())
+                     log.debug("Received cache communication message while 
stopping (will ignore) [nodeId=" +
+                         nodeId + ", msg=" + cacheMsg + ']');
+ 
+                 return;
+             }
+ 
+             if (depEnabled)
+                 cctx.deploy().ignoreOwnership(true);
+ 
+             unmarshall(nodeId, cacheMsg);
+ 
+             if (cacheMsg.allowForStartup())
+                 processMessage(nodeId, cacheMsg, c);
+             else {
+                 IgniteFuture<?> startFut = startFuture(cacheMsg);
+ 
+                 if (startFut.isDone())
+                     processMessage(nodeId, cacheMsg, c);
+                 else {
+                     if (log.isDebugEnabled())
+                         log.debug("Waiting for start future to complete for 
message [nodeId=" + nodeId +
+                             ", locId=" + cctx.localNodeId() + ", msg=" + 
cacheMsg + ']');
+ 
+                     // Don't hold this thread waiting for preloading to 
complete.
+                     startFut.listenAsync(new CI1<IgniteFuture<?>>() {
+                         @Override public void apply(IgniteFuture<?> f) {
+                             rw.readLock();
+ 
+                             try {
+                                 if (stopping) {
+                                     if (log.isDebugEnabled())
+                                         log.debug("Received cache 
communication message while stopping " +
+                                             "(will ignore) [nodeId=" + nodeId 
+ ", msg=" + cacheMsg + ']');
+ 
+                                     return;
+                                 }
+ 
+                                 f.get();
+ 
+                                 if (log.isDebugEnabled())
+                                     log.debug("Start future completed for 
message [nodeId=" + nodeId +
+                                         ", locId=" + cctx.localNodeId() + ", 
msg=" + cacheMsg + ']');
+ 
+                                 processMessage(nodeId, cacheMsg, c);
+                             }
+                             catch (IgniteCheckedException e) {
+                                 // Log once.
+                                 if (startErr.compareAndSet(false, true))
+                                     U.error(log, "Failed to complete preload 
start future (will ignore message) " +
+                                         "[fut=" + f + ", nodeId=" + nodeId + 
", msg=" + cacheMsg + ']', e);
+                             }
+                             finally {
+                                 rw.readUnlock();
+                             }
+                         }
+                     });
+                 }
+             }
+         }
+         catch (Throwable e) {
+             if (X.hasCause(e, ClassNotFoundException.class))
+                 U.error(log, "Failed to process message (note that 
distributed services " +
+                     "do not support peer class loading, if you deploy 
distributed service " +
+                     "you should have all required classes in CLASSPATH on all 
nodes in topology) " +
+                     "[senderId=" + nodeId + ", err=" + X.cause(e, 
ClassNotFoundException.class).getMessage() + ']');
+             else
+                 U.error(log, "Failed to process message [senderId=" + nodeId 
+ ']', e);
+         }
+         finally {
+             if (depEnabled)
+                 cctx.deploy().ignoreOwnership(false);
+ 
+             rw.readUnlock();
+         }
+     }
+ 
+     /**
+      * @param cacheMsg Cache message to get start future.
+      * @return Preloader start future.
+      */
+     private IgniteFuture<Object> startFuture(GridCacheMessage<K, V> cacheMsg) 
{
+         int cacheId = cacheMsg.cacheId();
+ 
+         return cacheId != 0 ? 
cctx.cacheContext(cacheId).preloader().startFuture() : 
cctx.preloadersStartFuture();
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param msg Message.
+      * @param c Closure.
+      */
+     private void processMessage(UUID nodeId, GridCacheMessage<K, V> msg,
+         IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c) {
+         try {
+             // Start clean.
+             if (msg.transactional())
+                 CU.resetTxContext(cctx);
+ 
+             // We will not end up with storing a bunch of new UUIDs
+             // in each cache entry, since node ID is stored in NIO session
+             // on handshake.
+             c.apply(nodeId, msg);
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Finished processing cache communication message 
[nodeId=" + nodeId + ", msg=" + msg + ']');
+         }
+         catch (Throwable e) {
+             U.error(log, "Failed processing message [senderId=" + nodeId + 
']', e);
+         }
+         finally {
+             // Clear thread-local tx contexts.
+             CU.resetTxContext(cctx);
+ 
+             // Unwind eviction notifications.
+             CU.unwindEvicts(cctx);
+         }
+     }
+ 
+     /**
+      * Pre-processes message prior to send.
+      *
+      * @param msg Message to send.
+      * @param destNodeId Destination node ID.
+      * @throws IgniteCheckedException If failed.
+      */
+     private void onSend(GridCacheMessage<K, V> msg, @Nullable UUID 
destNodeId) throws IgniteCheckedException {
+         if (msg.messageId() < 0)
+             // Generate and set message ID.
+             msg.messageId(idGen.incrementAndGet());
+ 
+         if (destNodeId == null || !cctx.localNodeId().equals(destNodeId)) {
+             msg.prepareMarshal(cctx);
+ 
+             if (depEnabled && msg instanceof GridCacheDeployable)
+                 cctx.deploy().prepare((GridCacheDeployable)msg);
+         }
+     }
+ 
+     /**
+      * Sends communication message.
+      *
+      * @param node Node to send the message to.
+      * @param msg Message to send.
+      * @throws IgniteCheckedException If sending failed.
+      * @throws ClusterTopologyException If receiver left.
+      */
+     public void send(ClusterNode node, GridCacheMessage<K, V> msg) throws 
IgniteCheckedException {
+         send(node, msg, SYSTEM_POOL);
+     }
+ 
+     /**
+      * Sends communication message.
+      *
+      * @param node Node to send the message to.
+      * @param msg Message to send.
+      * @throws IgniteCheckedException If sending failed.
+      * @throws ClusterTopologyException If receiver left.
+      */
+     public void send(ClusterNode node, GridCacheMessage<K, V> msg, 
GridIoPolicy plc) throws IgniteCheckedException {
+         assert !node.isLocal();
+ 
+         onSend(msg, node.id());
+ 
+         if (log.isDebugEnabled())
+             log.debug("Sending cache message [msg=" + msg + ", node=" + 
U.toShortString(node) + ']');
+ 
+         int cnt = 0;
+         boolean first = true;
+ 
+         while (cnt <= retryCnt) {
+             try {
+                 cnt++;
+ 
+                 GridCacheMessage<K, V> msg0;
+ 
+                 if (first) {
+                     msg0 = msg;
+ 
+                     first = false;
+                 }
+                 else
+                     msg0 = (GridCacheMessage<K, V>)msg.clone();
+ 
+                 cctx.gridIO().send(node, TOPIC_CACHE, msg0, plc);
+ 
+                 return;
+             }
+             catch (IgniteCheckedException e) {
+                 if (!cctx.discovery().alive(node.id()) || 
!cctx.discovery().pingNode(node.id()))
+                     throw new ClusterTopologyException("Node left grid while 
sending message to: " + node.id(), e);
+ 
+                 if (cnt == retryCnt)
+                     throw e;
+                 else if (log.isDebugEnabled())
+                     log.debug("Failed to send message to node (will retry): " 
+ node.id());
+             }
+ 
+             U.sleep(retryDelay);
+         }
+ 
+         if (log.isDebugEnabled())
+             log.debug("Sent cache message [msg=" + msg + ", node=" + 
U.toShortString(node) + ']');
+     }
+ 
+     /**
+      * Sends message and automatically accounts for lefts nodes.
+      *
+      * @param nodes Nodes to send to.
+      * @param msg Message to send.
+      * @param fallback Callback for failed nodes.
+      * @return {@code True} if nodes are empty or message was sent, {@code 
false} if
+      *      all nodes have left topology while sending this message.
+      * @throws IgniteCheckedException If send failed.
+      */
+     @SuppressWarnings( {"BusyWait"})
+     public boolean safeSend(Collection<? extends ClusterNode> nodes, 
GridCacheMessage<K, V> msg,
+         @Nullable IgnitePredicate<ClusterNode> fallback) throws 
IgniteCheckedException {
+         assert nodes != null;
+         assert msg != null;
+ 
+         if (nodes.isEmpty()) {
+             if (log.isDebugEnabled())
+                 log.debug("Message will not be sent as collection of nodes is 
empty: " + msg);
+ 
+             return true;
+         }
+ 
+         onSend(msg, null);
+ 
+         if (log.isDebugEnabled())
+             log.debug("Sending cache message [msg=" + msg + ", nodes=" + 
U.toShortString(nodes) + ']');
+ 
+         final Collection<UUID> leftIds = new GridLeanSet<>();
+ 
+         int cnt = 0;
+         boolean first = true;
+ 
+         while (cnt < retryCnt) {
+             try {
+                 Collection<? extends ClusterNode> nodesView = F.view(nodes, 
new P1<ClusterNode>() {
+                     @Override public boolean apply(ClusterNode e) {
+                         return !leftIds.contains(e.id());
+                     }
+                 });
+ 
+                 GridCacheMessage<K, V> msg0;
+ 
+                 if (first) {
+                     msg0 = msg;
+ 
+                     first = false;
+                 }
+                 else
+                     msg0 = (GridCacheMessage<K, V>)msg.clone();
+ 
+                 cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, SYSTEM_POOL);
+ 
+                 boolean added = false;
+ 
+                 // Even if there is no exception, we still check here, as 
node could have
+                 // ignored the message during stopping.
+                 for (ClusterNode n : nodes) {
+                     if (!leftIds.contains(n.id()) && 
!cctx.discovery().alive(n.id())) {
+                         leftIds.add(n.id());
+ 
+                         if (fallback != null && !fallback.apply(n))
+                             // If fallback signalled to stop.
+                             return false;
+ 
+                         added = true;
+                     }
+                 }
+ 
+                 if (added) {
+                     if (!F.exist(F.nodeIds(nodes), 
F0.not(F.contains(leftIds)))) {
+                         if (log.isDebugEnabled())
+                             log.debug("Message will not be sent because all 
nodes left topology [msg=" + msg +
+                                 ", nodes=" + U.toShortString(nodes) + ']');
+ 
+                         return false;
+                     }
+                 }
+ 
+                 break;
+             }
+             catch (IgniteCheckedException e) {
+                 boolean added = false;
+ 
+                 for (ClusterNode n : nodes) {
+                     if (!leftIds.contains(n.id()) &&
+                         (!cctx.discovery().alive(n.id()) || 
!cctx.discovery().pingNode(n.id()))) {
+                         leftIds.add(n.id());
+ 
+                         if (fallback != null && !fallback.apply(n))
+                             // If fallback signalled to stop.
+                             return false;
+ 
+                         added = true;
+                     }
+                 }
+ 
+                 if (!added) {
+                     cnt++;
+ 
+                     if (cnt == retryCnt)
+                         throw e;
+ 
+                     U.sleep(retryDelay);
+                 }
+ 
+                 if (!F.exist(F.nodeIds(nodes), F0.not(F.contains(leftIds)))) {
+                     if (log.isDebugEnabled())
+                         log.debug("Message will not be sent because all nodes 
left topology [msg=" + msg + ", nodes=" +
+                             U.toShortString(nodes) + ']');
+ 
+                     return false;
+                 }
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Message send will be retried [msg=" + msg + ", 
nodes=" + U.toShortString(nodes) +
+                         ", leftIds=" + leftIds + ']');
+             }
+         }
+ 
+         if (log.isDebugEnabled())
+             log.debug("Sent cache message [msg=" + msg + ", nodes=" + 
U.toShortString(nodes) + ']');
+ 
+         return true;
+     }
+ 
+     /**
+      * Sends communication message.
+      *
+      * @param nodeId ID of node to send the message to.
+      * @param msg Message to send.
+      * @throws IgniteCheckedException If sending failed.
+      */
+     public void send(UUID nodeId, GridCacheMessage<K, V> msg) throws 
IgniteCheckedException {
+         ClusterNode n = cctx.discovery().node(nodeId);
+ 
+         if (n == null)
+             throw new ClusterTopologyException("Failed to send message 
because node left grid [node=" + n + ", msg=" +
+                 msg + ']');
+ 
+         send(n, msg);
+     }
+ 
+     /**
+      * Sends communication message.
+      *
+      * @param nodeId ID of node to send the message to.
+      * @param msg Message to send.
+      * @throws IgniteCheckedException If sending failed.
+      */
+     public void send(UUID nodeId, GridCacheMessage<K, V> msg, GridIoPolicy 
plc) throws IgniteCheckedException {
+         ClusterNode n = cctx.discovery().node(nodeId);
+ 
+         if (n == null)
+             throw new ClusterTopologyException("Failed to send message 
because node left grid [node=" + n + ", msg=" +
+                 msg + ']');
+ 
+         send(n, msg, plc);
+     }
+ 
+     /**
+      * @param node Destination node.
+      * @param topic Topic to send the message to.
 -     * @param msgId Ordered message ID.
+      * @param msg Message to send.
+      * @param timeout Timeout to keep a message on receiving queue.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
 -    public void sendOrderedMessage(ClusterNode node, Object topic, long 
msgId, GridCacheMessage<K, V> msg,
++    public void sendOrderedMessage(ClusterNode node, Object topic, 
GridCacheMessage<K, V> msg,
+         long timeout) throws IgniteCheckedException {
+         onSend(msg, node.id());
+ 
+         int cnt = 0;
+ 
+         while (cnt <= retryCnt) {
+             try {
+                 cnt++;
+ 
 -                cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, 
SYSTEM_POOL, timeout, false);
++                cctx.gridIO().sendOrderedMessage(node, topic, msg, 
SYSTEM_POOL, timeout, false);
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Sent ordered cache message [topic=" + topic + 
", msg=" + msg +
+                         ", nodeId=" + node.id() + ']');
+ 
+                 return;
+             }
+             catch (IgniteCheckedException e) {
+                 if (cctx.discovery().node(node.id()) == null)
+                     throw new ClusterTopologyException("Node left grid while 
sending ordered message to: " + node.id(), e);
+ 
+                 if (cnt == retryCnt)
+                     throw e;
+                 else if (log.isDebugEnabled())
+                     log.debug("Failed to send message to node (will retry): " 
+ node.id());
+             }
+ 
+             U.sleep(retryDelay);
+         }
+     }
+ 
+     /**
 -     * @param topic Message topic.
 -     * @param nodeId Node ID.
 -     * @return Next ordered message ID.
 -     */
 -    public long messageId(Object topic, UUID nodeId) {
 -        return cctx.gridIO().nextMessageId(topic, nodeId);
 -    }
 -
 -    /**
+      * @return ID that auto-grows based on local counter and counters received
+      *      from other nodes.
+      */
+     public long nextIoId() {
+         return idGen.incrementAndGet();
+     }
+ 
+     /**
+      * Adds message handler.
+      *
+      * @param type Type of message.
+      * @param c Handler.
+      */
+     @SuppressWarnings({"unchecked"})
+     public void addHandler(
+         int cacheId,
+         Class<? extends GridCacheMessage> type,
+         IgniteBiInClosure<UUID, ? extends GridCacheMessage<K, V>> c) {
+         int msgIdx = messageIndex(type);
+ 
+         if (msgIdx != -1) {
+             IgniteBiInClosure[] cacheClsHandlers = 
idxClsHandlers.get(cacheId);
+ 
+             if (cacheClsHandlers == null) {
+                 cacheClsHandlers = new 
IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
+ 
+                 idxClsHandlers.put(cacheId, cacheClsHandlers);
+             }
+ 
+             if (cacheClsHandlers[msgIdx] != null)
+                 throw new IgniteException("Duplicate cache message ID found 
[cacheId=" + cacheId +
+                     ", type=" + type + ']');
+ 
+             cacheClsHandlers[msgIdx] = c;
+ 
+             return;
+         }
+         else {
+             ListenerKey key = new ListenerKey(cacheId, type);
+ 
+             if (clsHandlers.putIfAbsent(key,
+                 (IgniteBiInClosure<UUID, GridCacheMessage<K, V>>)c) != null)
+                 assert false : "Handler for class already registered 
[cacheId=" + cacheId + ", cls=" + type +
+                     ", old=" + clsHandlers.get(key) + ", new=" + c + ']';
+         }
+ 
+         if (log != null && log.isDebugEnabled())
+             log.debug("Registered cache communication handler [cacheId=" + 
cacheId + ", type=" + type +
+                 ", msgIdx=" + msgIdx + ", handler=" + c + ']');
+     }
+ 
+     /**
+      * @param lsnr Listener to add.
+      */
+     public void addDisconnectListener(GridDisconnectListener lsnr) {
+         cctx.kernalContext().io().addDisconnectListener(lsnr);
+     }
+ 
+     /**
+      * @param msgCls Message class to check.
+      * @return Message index.
+      */
+     private int messageIndex(Class<?> msgCls) {
+         try {
+             Integer msgIdx = U.field(msgCls, 
GridCacheMessage.CACHE_MSG_INDEX_FIELD_NAME);
+ 
+             if (msgIdx == null || msgIdx < 0)
+                 return -1;
+ 
+             return msgIdx;
+         }
+         catch (IgniteCheckedException ignored) {
+             return -1;
+         }
+     }
+ 
+     /**
+      * Removes message handler.
+      *
+      * @param type Type of message.
+      * @param c Handler.
+      */
+     public void removeHandler(Class<?> type, IgniteBiInClosure<UUID, ?> c) {
+         assert type != null;
+         assert c != null;
+ 
+         boolean res = clsHandlers.remove(type, c);
+ 
+         if (log != null && log.isDebugEnabled()) {
+             if (res) {
+                 log.debug("Removed cache communication handler " +
+                     "[type=" + type + ", handler=" + c + ']');
+             }
+             else {
+                 log.debug("Cache communication handler is not registered " +
+                     "[type=" + type + ", handler=" + c + ']');
+             }
+         }
+     }
+ 
+     /**
+      * Adds ordered message handler.
+      *
+      * @param topic Topic.
+      * @param c Handler.
+      */
+     @SuppressWarnings({"unchecked"})
+     public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? 
extends GridCacheMessage<K, V>> c) {
+         if (orderedHandlers.putIfAbsent(topic, c) == null) {
+             cctx.gridIO().addMessageListener(topic, new 
OrderedMessageListener(
+                 (IgniteBiInClosure<UUID, GridCacheMessage<K, V>>)c));
+ 
+             if (log != null && log.isDebugEnabled())
+                 log.debug("Registered ordered cache communication handler 
[topic=" + topic + ", handler=" + c + ']');
+         }
+         else if (log != null)
+             U.warn(log, "Failed to register ordered cache communication 
handler because it is already " +
+                 "registered for this topic [topic=" + topic + ", handler=" + 
c + ']');
+     }
+ 
+     /**
+      * Removed ordered message handler.
+      *
+      * @param topic Topic.
+      */
+     public void removeOrderedHandler(Object topic) {
+         if (orderedHandlers.remove(topic) != null) {
 -            cctx.gridIO().removeMessageId(topic);
+             cctx.gridIO().removeMessageListener(topic);
+ 
+             if (log != null && log.isDebugEnabled())
+                 log.debug("Unregistered ordered cache communication handler 
for topic:" + topic);
+         }
+         else if (log != null)
+             U.warn(log, "Failed to unregister ordered cache communication 
handler because it was not found " +
+                 "for topic: " + topic);
+     }
+ 
+     /**
 -     * @param topic Message topic.
 -     */
 -    public void removeMessageId(Object topic) {
 -        cctx.gridIO().removeMessageId(topic);
 -    }
 -
 -    /**
+      * @param nodeId Sender node ID.
+      * @param cacheMsg Message.
+      * @throws IgniteCheckedException If failed.
+      */
+     @SuppressWarnings("ErrorNotRethrown")
+     private void unmarshall(UUID nodeId, GridCacheMessage<K, V> cacheMsg) 
throws IgniteCheckedException {
+         if (cctx.localNodeId().equals(nodeId))
+             return;
+ 
+         GridDeploymentInfo bean = cacheMsg.deployInfo();
+ 
+         if (bean != null) {
+             assert depEnabled : "Received deployment info while peer class 
loading is disabled [nodeId=" + nodeId +
+                 ", msg=" + cacheMsg + ']';
+ 
+             cctx.deploy().p2pContext(nodeId, bean.classLoaderId(), 
bean.userVersion(),
+                 bean.deployMode(), bean.participants(), 
bean.localDeploymentOwner());
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Set P2P context [senderId=" + nodeId + ", msg=" + 
cacheMsg + ']');
+         }
+ 
+         try {
+             cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
+         }
+         catch (IgniteCheckedException e) {
+             if (cacheMsg.ignoreClassErrors() && X.hasCause(e, 
InvalidClassException.class,
+                     ClassNotFoundException.class, NoClassDefFoundError.class, 
UnsupportedClassVersionError.class))
+                 cacheMsg.onClassError(e);
+             else
+                 throw e;
+         }
+         catch (Error e) {
+             if (cacheMsg.ignoreClassErrors() && X.hasCause(e, 
NoClassDefFoundError.class,
+                 UnsupportedClassVersionError.class))
+                     cacheMsg.onClassError(new IgniteCheckedException("Failed 
to load class during unmarshalling: " + e, e));
+             else
+                 throw e;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void printMemoryStats() {
+         X.println(">>> ");
+         X.println(">>> Cache IO manager memory stats [grid=" + 
cctx.gridName() + ']');
+         X.println(">>>   clsHandlersSize: " + clsHandlers.size());
+         X.println(">>>   orderedHandlersSize: " + orderedHandlers.size());
+     }
+ 
+     /**
+      * Ordered message listener.
+      */
+     private class OrderedMessageListener implements GridMessageListener {
+         /** */
+         private final IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c;
+ 
+         /**
+          * @param c Handler closure.
+          */
+         OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage<K, 
V>> c) {
+             this.c = c;
+         }
+ 
+         /** {@inheritDoc} */
+         @SuppressWarnings( {"CatchGenericClass", "unchecked"})
+         @Override public void onMessage(final UUID nodeId, Object msg) {
+             if (log.isDebugEnabled())
+                 log.debug("Received cache ordered message [nodeId=" + nodeId 
+ ", msg=" + msg + ']');
+ 
+             final GridCacheMessage<K, V> cacheMsg = (GridCacheMessage<K, 
V>)msg;
+ 
+             onMessage0(nodeId, cacheMsg, c);
+         }
+     }
+ 
+     private static class ListenerKey {
+         /** Cache ID. */
+         private int cacheId;
+ 
+         /** Message class. */
+         private Class<? extends GridCacheMessage> msgCls;
+ 
+         /**
+          * @param cacheId Cache ID.
+          * @param msgCls Message class.
+          */
+         private ListenerKey(int cacheId, Class<? extends GridCacheMessage> 
msgCls) {
+             this.cacheId = cacheId;
+             this.msgCls = msgCls;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean equals(Object o) {
+             if (this == o)
+                 return true;
+ 
+             if (!(o instanceof ListenerKey))
+                 return false;
+ 
+             ListenerKey that = (ListenerKey)o;
+ 
+             return cacheId == that.cacheId && msgCls.equals(that.msgCls);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public int hashCode() {
+             int result = cacheId;
+ 
+             result = 31 * result + msgCls.hashCode();
+ 
+             return result;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 0000000,30b536c..5011e35
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@@ -1,0 -1,1139 +1,1129 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.thread.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ import java.util.concurrent.locks.*;
+ 
+ import static java.util.concurrent.TimeUnit.*;
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
+ 
+ /**
+  * Thread pool for requesting partitions from other nodes
+  * and populating local cache.
+  */
+ @SuppressWarnings("NonConstantFieldWithUpperCaseName")
+ public class GridDhtPartitionDemandPool<K, V> {
+     /** Dummy message to wake up a blocking queue if a node leaves. */
+     private final SupplyMessage<K, V> DUMMY_TOP = new SupplyMessage<>();
+ 
+     /** */
+     private final GridCacheContext<K, V> cctx;
+ 
+     /** */
+     private final IgniteLogger log;
+ 
+     /** */
+     private final ReadWriteLock busyLock;
+ 
+     /** */
 -    private GridDhtPartitionTopology<K, V> top;
 -
 -    /** */
+     @GridToStringInclude
+     private final Collection<DemandWorker> dmdWorkers;
+ 
+     /** Preload predicate. */
+     private IgnitePredicate<GridCacheEntryInfo<K, V>> preloadPred;
+ 
+     /** Future for preload mode {@link 
org.apache.ignite.cache.CachePreloadMode#SYNC}. */
+     @GridToStringInclude
+     private SyncFuture syncFut;
+ 
+     /** Preload timeout. */
+     private final AtomicLong timeout;
+ 
+     /** Allows demand threads to synchronize their step. */
+     private CyclicBarrier barrier;
+ 
+     /** Demand lock. */
+     private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+ 
+     /** */
+     private int poolSize;
+ 
+     /** Last timeout object. */
+     private AtomicReference<GridTimeoutObject> lastTimeoutObj = new 
AtomicReference<>();
+ 
+     /** Last exchange future. */
+     private volatile GridDhtPartitionsExchangeFuture<K, V> lastExchangeFut;
+ 
+     /**
+      * @param cctx Cache context.
+      * @param busyLock Shutdown lock.
+      */
+     public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, 
ReadWriteLock busyLock) {
+         assert cctx != null;
+         assert busyLock != null;
+ 
+         this.cctx = cctx;
+         this.busyLock = busyLock;
+ 
+         log = cctx.logger(getClass());
+ 
 -        top = cctx.dht().topology();
 -
+         poolSize = cctx.preloadEnabled() ? 
cctx.config().getPreloadThreadPoolSize() : 0;
+ 
+         if (poolSize > 0) {
+             barrier = new CyclicBarrier(poolSize);
+ 
+             dmdWorkers = new ArrayList<>(poolSize);
+ 
+             for (int i = 0; i < poolSize; i++)
+                 dmdWorkers.add(new DemandWorker(i));
+ 
+             syncFut = new SyncFuture(dmdWorkers);
+         }
+         else {
+             dmdWorkers = Collections.emptyList();
+ 
+             syncFut = new SyncFuture(dmdWorkers);
+ 
+             // Calling onDone() immediately since preloading is disabled.
+             syncFut.onDone();
+         }
+ 
+         timeout = new AtomicLong(cctx.config().getPreloadTimeout());
+     }
+ 
+     /**
+      *
+      */
+     void start() {
+         if (poolSize > 0) {
+             for (DemandWorker w : dmdWorkers)
+                 new IgniteThread(cctx.gridName(), "preloader-demand-worker", 
w).start();
+         }
+     }
+ 
+     /**
+      *
+      */
+     void stop() {
+         U.cancel(dmdWorkers);
+ 
+         if (log.isDebugEnabled())
+             log.debug("Before joining on demand workers: " + dmdWorkers);
+ 
+         U.join(dmdWorkers, log);
+ 
+         if (log.isDebugEnabled())
+             log.debug("After joining on demand workers: " + dmdWorkers);
+ 
 -        top = null;
+         lastExchangeFut = null;
+ 
+         lastTimeoutObj.set(null);
+     }
+ 
+     /**
+      * @return Future for {@link 
org.apache.ignite.cache.CachePreloadMode#SYNC} mode.
+      */
+     IgniteFuture<?> syncFuture() {
+         return syncFut;
+     }
+ 
+     /**
+      * Sets preload predicate for demand pool.
+      *
+      * @param preloadPred Preload predicate.
+      */
+     void preloadPredicate(IgnitePredicate<GridCacheEntryInfo<K, V>> 
preloadPred) {
+         this.preloadPred = preloadPred;
+     }
+ 
+     /**
+      * @return Size of this thread pool.
+      */
+     int poolSize() {
+         return poolSize;
+     }
+ 
+     /**
+      * Wakes up demand workers when new exchange future was added.
+      */
+     void onExchangeFutureAdded() {
+         synchronized (dmdWorkers) {
+             for (DemandWorker w : dmdWorkers)
+                 w.addMessage(DUMMY_TOP);
+         }
+     }
+ 
+     /**
+      * Force preload.
+      */
+     void forcePreload() {
+         GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
+ 
+         if (obj != null)
+             cctx.time().removeTimeoutObject(obj);
+ 
+         final GridDhtPartitionsExchangeFuture<K, V> exchFut = lastExchangeFut;
+ 
+         if (exchFut != null) {
+             if (log.isDebugEnabled())
+                 log.debug("Forcing preload event for future: " + exchFut);
+ 
+             exchFut.listenAsync(new CI1<IgniteFuture<Long>>() {
+                 @Override public void apply(IgniteFuture<Long> t) {
+                     cctx.shared().exchange().forcePreloadExchange(exchFut);
+                 }
+             });
+         }
+         else if (log.isDebugEnabled())
+             log.debug("Ignoring force preload request (no topology event 
happened yet).");
+     }
+ 
+     /**
+      * @return {@code true} if entered to busy state.
+      */
+     private boolean enterBusy() {
+         if (busyLock.readLock().tryLock())
+             return true;
+ 
+         if (log.isDebugEnabled())
+             log.debug("Failed to enter to busy state (demander is stopping): 
" + cctx.nodeId());
+ 
+         return false;
+     }
+ 
+     /**
+      *
+      */
+     private void leaveBusy() {
+         busyLock.readLock().unlock();
+     }
+ 
+     /**
+      * @param type Type.
+      * @param discoEvt Discovery event.
+      */
+     private void preloadEvent(int type, IgniteDiscoveryEvent discoEvt) {
+         preloadEvent(-1, type, discoEvt);
+     }
+ 
+     /**
+      * @param part Partition.
+      * @param type Type.
+      * @param discoEvt Discovery event.
+      */
+     private void preloadEvent(int part, int type, IgniteDiscoveryEvent 
discoEvt) {
+         assert discoEvt != null;
+ 
+         cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), 
discoEvt.type(), discoEvt.timestamp());
+     }
+ 
+     /**
 -     * @return Dummy node-left message.
 -     */
 -    private SupplyMessage<K, V> dummyTopology() {
 -        return DUMMY_TOP;
 -    }
 -
 -    /**
+      * @param msg Message to check.
+      * @return {@code True} if dummy message.
+      */
+     private boolean dummyTopology(SupplyMessage<K, V> msg) {
+         return msg == DUMMY_TOP;
+     }
+ 
+     /**
+      * @param deque Deque to poll from.
+      * @param time Time to wait.
+      * @param w Worker.
+      * @return Polled item.
+      * @throws InterruptedException If interrupted.
+      */
+     @Nullable private <T> T poll(BlockingQueue<T> deque, long time, 
GridWorker w) throws InterruptedException {
+         assert w != null;
+ 
+         // There is currently a case where {@code interrupted}
+         // flag on a thread gets flipped during stop which causes the pool to 
hang.  This check
+         // will always make sure that interrupted flag gets reset before 
going into wait conditions.
+         // The true fix should actually make sure that interrupted flag does 
not get reset or that
+         // interrupted exception gets propagated. Until we find a real fix, 
this method should
+         // always work to make sure that there is no hanging during stop.
+         if (w.isCancelled())
+             Thread.currentThread().interrupt();
+ 
+         return deque.poll(time, MILLISECONDS);
+     }
+ 
+     /**
+      * @param p Partition.
+      * @param topVer Topology version.
+      * @return Picked owners.
+      */
+     private Collection<ClusterNode> pickedOwners(int p, long topVer) {
+         Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+ 
+         int affCnt = affNodes.size();
+ 
+         Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+ 
+         int rmtCnt = rmts.size();
+ 
+         if (rmtCnt <= affCnt)
+             return rmts;
+ 
+         List<ClusterNode> sorted = new ArrayList<>(rmts);
+ 
+         // Sort in descending order, so nodes with higher order will be first.
+         Collections.sort(sorted, CU.nodeComparator(false));
+ 
+         // Pick newest nodes.
+         return sorted.subList(0, affCnt);
+     }
+ 
+     /**
+      * @param p Partition.
+      * @param topVer Topology version.
+      * @return Nodes owning this partition.
+      */
+     private Collection<ClusterNode> remoteOwners(int p, long topVer) {
 -        return F.view(top.owners(p, topVer), F.remoteNodes(cctx.nodeId()));
++        return F.view(cctx.dht().topology().owners(p, topVer), 
F.remoteNodes(cctx.nodeId()));
+     }
+ 
+     /**
+      * @param assigns Assignments.
+      * @param force {@code True} if dummy reassign.
+      */
+     void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, 
boolean force) {
+         if (log.isDebugEnabled())
+             log.debug("Adding partition assignments: " + assigns);
+ 
+         long delay = cctx.config().getPreloadPartitionedDelay();
+ 
+         if (delay == 0 || force) {
+             assert assigns != null;
+ 
+             synchronized (dmdWorkers) {
+                 for (DemandWorker w : dmdWorkers) {
+                     w.addAssignments(assigns);
+ 
+                     w.addMessage(DUMMY_TOP);
+                 }
+             }
+         }
+         else if (delay > 0) {
+             assert !force;
+ 
+             GridTimeoutObject obj = lastTimeoutObj.get();
+ 
+             if (obj != null)
+                 cctx.time().removeTimeoutObject(obj);
+ 
+             final GridDhtPartitionsExchangeFuture<K, V> exchFut = 
lastExchangeFut;
+ 
+             assert exchFut != null : "Delaying preload process without 
topology event.";
+ 
+             obj = new GridTimeoutObjectAdapter(delay) {
+                 @Override public void onTimeout() {
+                     exchFut.listenAsync(new CI1<IgniteFuture<Long>>() {
+                         @Override public void apply(IgniteFuture<Long> f) {
+                             
cctx.shared().exchange().forcePreloadExchange(exchFut);
+                         }
+                     });
+                 }
+             };
+ 
+             lastTimeoutObj.set(obj);
+ 
+             cctx.time().addTimeoutObject(obj);
+         }
+     }
+ 
+     /**
+      *
+      */
+     void unwindUndeploys() {
+         demandLock.writeLock().lock();
+ 
+         try {
+             cctx.deploy().unwind();
+         }
+         finally {
+             demandLock.writeLock().unlock();
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridDhtPartitionDemandPool.class, this);
+     }
+ 
+     /**
+      *
+      */
+     private class DemandWorker extends GridWorker {
+         /** Worker ID. */
+         private int id;
+ 
+         /** Partition-to-node assignments. */
+         private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> 
assignQ = new LinkedBlockingDeque<>();
+ 
+         /** Message queue. */
+         private final LinkedBlockingDeque<SupplyMessage<K, V>> msgQ =
+             new LinkedBlockingDeque<>();
+ 
+         /** Counter. */
+         private long cntr;
+ 
+         /** Hide worker logger and use cache logger instead. */
+         private IgniteLogger log = GridDhtPartitionDemandPool.this.log;
+ 
+         /**
+          * @param id Worker ID.
+          */
+         private DemandWorker(int id) {
+             super(cctx.gridName(), "preloader-demand-worker", 
GridDhtPartitionDemandPool.this.log);
+ 
+             assert id >= 0;
+ 
+             this.id = id;
+         }
+ 
+         /**
+          * @param assigns Assignments.
+          */
+         void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) {
+             assert assigns != null;
+ 
+             assignQ.offer(assigns);
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Added assignments to worker: " + this);
+         }
+ 
+         /**
+          * @return {@code True} if topology changed.
+          */
+         private boolean topologyChanged() {
+             return !assignQ.isEmpty() || 
cctx.shared().exchange().topologyChanged();
+         }
+ 
+         /**
+          * @param msg Message.
+          */
+         private void addMessage(SupplyMessage<K, V> msg) {
+             if (!enterBusy())
+                 return;
+ 
+             try {
+                 assert dummyTopology(msg) || msg.supply().workerId() == id;
+ 
+                 msgQ.offer(msg);
+             }
+             finally {
+                 leaveBusy();
+             }
+         }
+ 
+         /**
+          * @param timeout Timed out value.
+          */
+         private void growTimeout(long timeout) {
+             long newTimeout = (long)(timeout * 1.5D);
+ 
+             // Account for overflow.
+             if (newTimeout < 0)
+                 newTimeout = Long.MAX_VALUE;
+ 
+             // Grow by 50% only if another thread didn't do it already.
+             if 
(GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout))
+                 U.warn(log, "Increased preloading message timeout from " + 
timeout + "ms to " +
+                     newTimeout + "ms.");
+         }
+ 
+         /**
+          * @param pick Node picked for preloading.
+          * @param p Partition.
+          * @param entry Preloaded entry.
+          * @param topVer Topology version.
+          * @return {@code False} if partition has become invalid during 
preloading.
+          * @throws org.apache.ignite.IgniteInterruptedException If 
interrupted.
+          */
+         private boolean preloadEntry(ClusterNode pick, int p, 
GridCacheEntryInfo<K, V> entry, long topVer)
 -            throws IgniteCheckedException, IgniteInterruptedException {
++            throws IgniteCheckedException {
+             try {
+                 GridCacheEntryEx<K, V> cached = null;
+ 
+                 try {
+                     cached = cctx.dht().entryEx(entry.key());
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Preloading key [key=" + entry.key() + ", 
part=" + p + ", node=" + pick.id() + ']');
+ 
+                     if (cctx.dht().isGgfsDataCache() &&
+                         cctx.dht().ggfsDataSpaceUsed() > 
cctx.dht().ggfsDataSpaceMax()) {
+                         LT.error(log, null, "Failed to preload GGFS data 
cache (GGFS space size exceeded maximum " +
+                             "value, will ignore preload entries): " + name());
+ 
+                         if (cached.markObsoleteIfEmpty(null))
+                             
cached.context().cache().removeIfObsolete(cached.key());
+ 
+                         return true;
+                     }
+ 
+                     if (preloadPred == null || preloadPred.apply(entry)) {
+                         if (cached.initialValue(
+                             entry.value(),
+                             entry.valueBytes(),
+                             entry.version(),
+                             entry.ttl(),
+                             entry.expireTime(),
+                             true,
+                             topVer,
+                             cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                         )) {
+                             cctx.evicts().touch(cached, topVer); // Start 
tracking.
+ 
+                             if 
(cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED) && 
!cached.isInternal())
+                                 cctx.events().addEvent(cached.partition(), 
cached.key(), cctx.localNodeId(),
+                                     (IgniteUuid)null, null, 
EVT_CACHE_PRELOAD_OBJECT_LOADED, entry.value(), true, null,
+                                     false, null, null, null);
+                         }
+                         else if (log.isDebugEnabled())
+                             log.debug("Preloading entry is already in cache 
(will ignore) [key=" + cached.key() +
+                                 ", part=" + p + ']');
+                     }
+                     else if (log.isDebugEnabled())
+                         log.debug("Preload predicate evaluated to false for 
entry (will ignore): " + entry);
+                 }
+                 catch (GridCacheEntryRemovedException ignored) {
+                     if (log.isDebugEnabled())
+                         log.debug("Entry has been concurrently removed while 
preloading (will ignore) [key=" +
+                             cached.key() + ", part=" + p + ']');
+                 }
+                 catch (GridDhtInvalidPartitionException ignored) {
+                     if (log.isDebugEnabled())
+                         log.debug("Partition became invalid during preloading 
(will ignore): " + p);
+ 
+                     return false;
+                 }
+             }
+             catch (IgniteInterruptedException e) {
+                 throw e;
+             }
+             catch (IgniteCheckedException e) {
+                 throw new IgniteCheckedException("Failed to cache preloaded 
entry (will stop preloading) [local=" +
+                     cctx.nodeId() + ", node=" + pick.id() + ", key=" + 
entry.key() + ", part=" + p + ']', e);
+             }
+ 
+             return true;
+         }
+ 
+         /**
+          * @param idx Unique index for this topic.
+          * @return Topic for partition.
+          */
+         public Object topic(long idx) {
+             return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
+         }
+ 
+         /**
+          * @param node Node to demand from.
+          * @param topVer Topology version.
+          * @param d Demand message.
+          * @param exchFut Exchange future.
+          * @return Missed partitions.
+          * @throws InterruptedException If interrupted.
+          * @throws ClusterTopologyException If node left.
+          * @throws IgniteCheckedException If failed to send message.
+          */
+         private Set<Integer> demandFromNode(ClusterNode node, final long 
topVer, GridDhtPartitionDemandMessage<K, V> d,
+             GridDhtPartitionsExchangeFuture<K, V> exchFut) throws 
InterruptedException, IgniteCheckedException {
+             cntr++;
+ 
+             d.topic(topic(cntr));
+             d.workerId(id);
+ 
+             Set<Integer> missed = new HashSet<>();
+ 
+             // Get the same collection that will be sent in the message.
+             Collection<Integer> remaining = d.partitions();
+ 
+             // Drain queue before processing a new node.
+             drainQueue();
+ 
+             if (isCancelled() || topologyChanged())
+                 return missed;
+ 
+             cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, 
GridDhtPartitionSupplyMessage<K, V>>() {
+                 @Override public void apply(UUID nodeId, 
GridDhtPartitionSupplyMessage<K, V> msg) {
+                     addMessage(new SupplyMessage<>(nodeId, msg));
+                 }
+             });
+ 
+             try {
+                 boolean retry;
+ 
+                 // DoWhile.
+                 // =======
+                 do {
+                     retry = false;
+ 
+                     // Create copy.
+                     d = new GridDhtPartitionDemandMessage<>(d, remaining);
+ 
+                     long timeout = 
GridDhtPartitionDemandPool.this.timeout.get();
+ 
+                     d.timeout(timeout);
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Sending demand message [node=" + node.id() 
+ ", demand=" + d + ']');
+ 
+                     // Send demand message.
+                     cctx.io().send(node, d);
+ 
+                     // While.
+                     // =====
+                     while (!isCancelled() && !topologyChanged()) {
+                         SupplyMessage<K, V> s = poll(msgQ, timeout, this);
+ 
+                         // If timed out.
+                         if (s == null) {
+                             if (msgQ.isEmpty()) { // Safety check.
+                                 U.warn(log, "Timed out waiting for partitions 
to load, will retry in " + timeout +
+                                     " ms (you may need to increase 
'networkTimeout' or 'preloadBatchSize'" +
+                                     " configuration properties).");
+ 
+                                 growTimeout(timeout);
+ 
+                                 // Ordered listener was removed if timeout 
expired.
+                                 cctx.io().removeOrderedHandler(d.topic());
+ 
+                                 // Must create copy to be able to work with 
IO manager thread local caches.
+                                 d = new GridDhtPartitionDemandMessage<>(d, 
remaining);
+ 
+                                 // Create new topic.
+                                 d.topic(topic(++cntr));
+ 
+                                 // Create new ordered listener.
+                                 cctx.io().addOrderedHandler(d.topic(),
+                                     new CI2<UUID, 
GridDhtPartitionSupplyMessage<K, V>>() {
+                                         @Override public void apply(UUID 
nodeId,
+                                             GridDhtPartitionSupplyMessage<K, 
V> msg) {
+                                             addMessage(new 
SupplyMessage<>(nodeId, msg));
+                                         }
+                                     });
+ 
+                                 // Resend message with larger timeout.
+                                 retry = true;
+ 
+                                 break; // While.
+                             }
+                             else
+                                 continue; // While.
+                         }
+ 
+                         // If topology changed.
+                         if (dummyTopology(s)) {
+                             if (topologyChanged())
+                                 break; // While.
+                             else
+                                 continue; // While.
+                         }
+ 
+                         // Check that message was received from expected node.
+                         if (!s.senderId().equals(node.id())) {
+                             U.warn(log, "Received supply message from 
unexpected node [expectedId=" + node.id() +
+                                 ", rcvdId=" + s.senderId() + ", msg=" + s + 
']');
+ 
+                             continue; // While.
+                         }
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Received supply message: " + s);
+ 
+                         GridDhtPartitionSupplyMessage<K, V> supply = 
s.supply();
+ 
+                         // Check whether there were class loading errors on 
unmarshal
+                         if (supply.classError() != null) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Class got undeployed during 
preloading: " + supply.classError());
+ 
+                             retry = true;
+ 
+                             // Quit preloading.
+                             break;
+                         }
+ 
+                         // Preload.
+                         for (Map.Entry<Integer, 
Collection<GridCacheEntryInfo<K, V>>> e : supply.infos().entrySet()) {
+                             int p = e.getKey();
+ 
+                             if (cctx.affinity().localNode(p, topVer)) {
 -                                GridDhtLocalPartition<K, V> part = 
top.localPartition(p, topVer, true);
++                                GridDhtLocalPartition<K, V> part = 
cctx.dht().topology().localPartition(p, topVer, true);
+ 
+                                 assert part != null;
+ 
+                                 if (part.state() == MOVING) {
+                                     boolean reserved = part.reserve();
+ 
+                                     assert reserved : "Failed to reserve 
partition [gridName=" +
+                                         cctx.gridName() + ", cacheName=" + 
cctx.namex() + ", part=" + part + ']';
+ 
+                                     part.lock();
+ 
+                                     try {
+                                         Collection<Integer> invalidParts = 
new GridLeanSet<>();
+ 
+                                         // Loop through all received entries 
and try to preload them.
+                                         for (GridCacheEntryInfo<K, V> entry : 
e.getValue()) {
+                                             if (!invalidParts.contains(p)) {
+                                                 if 
(!part.preloadingPermitted(entry.key(), entry.version())) {
+                                                     if (log.isDebugEnabled())
+                                                         log.debug("Preloading 
is not permitted for entry due to " +
+                                                             "evictions [key=" 
+ entry.key() +
+                                                             ", ver=" + 
entry.version() + ']');
+ 
+                                                     continue;
+                                                 }
+ 
+                                                 if (!preloadEntry(node, p, 
entry, topVer)) {
+                                                     invalidParts.add(p);
+ 
+                                                     if (log.isDebugEnabled())
+                                                         log.debug("Got 
entries for invalid partition during " +
+                                                             "preloading (will 
skip) [p=" + p + ", entry=" + entry + ']');
+                                                 }
+                                             }
+                                         }
+ 
+                                         boolean last = 
supply.last().contains(p);
+ 
+                                         // If message was last for this 
partition,
+                                         // then we take ownership.
+                                         if (last) {
+                                             remaining.remove(p);
+ 
 -                                            top.own(part);
++                                            cctx.dht().topology().own(part);
+ 
+                                             if (log.isDebugEnabled())
+                                                 log.debug("Finished 
preloading partition: " + part);
+ 
+                                             if 
(cctx.events().isRecordable(EVT_CACHE_PRELOAD_PART_LOADED))
+                                                 preloadEvent(p, 
EVT_CACHE_PRELOAD_PART_LOADED,
+                                                     exchFut.discoveryEvent());
+                                         }
+                                     }
+                                     finally {
+                                         part.unlock();
+                                         part.release();
+                                     }
+                                 }
+                                 else {
+                                     remaining.remove(p);
+ 
+                                     if (log.isDebugEnabled())
+                                         log.debug("Skipping loading partition 
(state is not MOVING): " + part);
+                                 }
+                             }
+                             else {
+                                 remaining.remove(p);
+ 
+                                 if (log.isDebugEnabled())
+                                     log.debug("Skipping loading partition (it 
does not belong on current node): " + p);
+                             }
+                         }
+ 
+                         remaining.removeAll(s.supply().missed());
+ 
+                         // Only request partitions based on latest topology 
version.
+                         for (Integer miss : s.supply().missed())
+                             if (cctx.affinity().localNode(miss, topVer))
+                                 missed.add(miss);
+ 
+                         if (remaining.isEmpty())
+                             break; // While.
+ 
+                         if (s.supply().ack()) {
+                             retry = true;
+ 
+                             break;
+                         }
+                     }
+                 }
+                 while (retry && !isCancelled() && !topologyChanged());
+ 
+                 return missed;
+             }
+             finally {
+                 cctx.io().removeOrderedHandler(d.topic());
+             }
+         }
+ 
+         /**
+          * @throws InterruptedException If interrupted.
+          */
+         private void drainQueue() throws InterruptedException {
+             while (!msgQ.isEmpty()) {
+                 SupplyMessage<K, V> msg = msgQ.take();
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Drained supply message: " + msg);
+             }
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void body() throws InterruptedException, 
IgniteInterruptedException {
+             try {
+                 int preloadOrder = cctx.config().getPreloadOrder();
+ 
+                 if (preloadOrder > 0) {
+                     IgniteFuture<?> fut = 
cctx.kernalContext().cache().orderedPreloadFuture(preloadOrder);
+ 
+                     try {
+                         if (fut != null) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Waiting for dependant caches 
preload [cacheName=" + cctx.name() +
+                                     ", preloadOrder=" + preloadOrder + ']');
+ 
+                             fut.get();
+                         }
+                     }
+                     catch (IgniteInterruptedException ignored) {
+                         if (log.isDebugEnabled())
+                             log.debug("Failed to wait for ordered preload 
future (grid is stopping): " +
+                                 "[cacheName=" + cctx.name() + ", 
preloadOrder=" + preloadOrder + ']');
+ 
+                         return;
+                     }
+                     catch (IgniteCheckedException e) {
+                         throw new Error("Ordered preload future should never 
fail: " + e.getMessage(), e);
+                     }
+                 }
+ 
+                 GridDhtPartitionsExchangeFuture<K, V> exchFut = null;
+ 
+                 boolean stopEvtFired = false;
+ 
+                 while (!isCancelled()) {
+                     try {
+                         barrier.await();
+ 
+                         if (id == 0 && exchFut != null && !exchFut.dummy() &&
+                             
cctx.events().isRecordable(EVT_CACHE_PRELOAD_STOPPED)) {
+ 
+                             if (!cctx.isReplicated() || !stopEvtFired) {
+                                 preloadEvent(EVT_CACHE_PRELOAD_STOPPED, 
exchFut.discoveryEvent());
+ 
+                                 stopEvtFired = true;
+                             }
+                         }
+                     }
+                     catch (BrokenBarrierException ignore) {
+                         throw new InterruptedException("Demand worker 
stopped.");
+                     }
+ 
+                     // Sync up all demand threads at this step.
+                     GridDhtPreloaderAssignments<K, V> assigns = null;
+ 
+                     while (assigns == null)
+                         assigns = poll(assignQ, 
cctx.gridConfig().getNetworkTimeout(), this);
+ 
+                     demandLock.readLock().lock();
+ 
+                     try {
+                         exchFut = assigns.exchangeFuture();
+ 
+                         // Assignments are empty if preloading is disabled.
+                         if (assigns.isEmpty())
+                             continue;
+ 
+                         boolean resync = false;
+ 
+                         // While.
+                         // =====
+                         while (!isCancelled() && !topologyChanged() && 
!resync) {
+                             Collection<Integer> missed = new HashSet<>();
+ 
+                             // For.
+                             // ===
+                             for (ClusterNode node : assigns.keySet()) {
+                                 if (topologyChanged() || isCancelled())
+                                     break; // For.
+ 
+                                 GridDhtPartitionDemandMessage<K, V> d = 
assigns.remove(node);
+ 
+                                 // If another thread is already processing 
this message,
+                                 // move to the next node.
+                                 if (d == null)
+                                     continue; // For.
+ 
+                                 try {
+                                     Set<Integer> set = demandFromNode(node, 
assigns.topologyVersion(), d, exchFut);
+ 
+                                     if (!set.isEmpty()) {
+                                         if (log.isDebugEnabled())
+                                             log.debug("Missed partitions from 
node [nodeId=" + node.id() + ", missed=" +
+                                                 set + ']');
+ 
+                                         missed.addAll(set);
+                                     }
+                                 }
+                                 catch (IgniteInterruptedException e) {
+                                     throw e;
+                                 }
+                                 catch (ClusterTopologyException e) {
+                                     if (log.isDebugEnabled())
+                                         log.debug("Node left during 
preloading (will retry) [node=" + node.id() +
+                                             ", msg=" + e.getMessage() + ']');
+ 
+                                     resync = true;
+ 
+                                     break; // For.
+                                 }
+                                 catch (IgniteCheckedException e) {
+                                     U.error(log, "Failed to receive 
partitions from node (preloading will not " +
+                                         "fully finish) [node=" + node.id() + 
", msg=" + d + ']', e);
+                                 }
+                             }
+ 
+                             // Processed missed entries.
+                             if (!missed.isEmpty()) {
+                                 if (log.isDebugEnabled())
+                                     log.debug("Reassigning partitions that 
were missed: " + missed);
+ 
+                                 assert exchFut.exchangeId() != null;
+ 
+                                 
cctx.shared().exchange().forceDummyExchange(true, exchFut);
+                             }
+                             else
+                                 break; // While.
+                         }
+                     }
+                     finally {
+                         demandLock.readLock().unlock();
+ 
+                         syncFut.onWorkerDone(this);
+                     }
+ 
+                     cctx.shared().exchange().scheduleResendPartitions();
+                 }
+             }
+             finally {
+                 // Safety.
+                 syncFut.onWorkerDone(this);
+             }
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(DemandWorker.class, this, "assignQ", assignQ, 
"msgQ", msgQ, "super", super.toString());
+         }
+     }
+ 
+     /**
+      * Sets last exchange future.
+      *
+      * @param lastFut Last future to set.
+      */
+     void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture<K, V> 
lastFut) {
+         lastExchangeFut = lastFut;
+     }
+ 
+     /**
+      * @param exchFut Exchange future.
+      * @return Assignments of partitions to nodes.
+      */
+     GridDhtPreloaderAssignments<K, V> 
assign(GridDhtPartitionsExchangeFuture<K, V> exchFut) {
+         // No assignments for disabled preloader.
++        GridDhtPartitionTopology<K, V> top = cctx.dht().topology();
++
+         if (!cctx.preloadEnabled())
+             return new GridDhtPreloaderAssignments<>(exchFut, 
top.topologyVersion());
+ 
+         int partCnt = cctx.affinity().partitions();
+ 
+         assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+             exchFut.exchangeId().topologyVersion() == top.topologyVersion() :
 -            "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", 
topVer=" + top.topologyVersion() + ']';
++            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
++                ", topVer=" + top.topologyVersion() + ']';
+ 
+         GridDhtPreloaderAssignments<K, V> assigns = new 
GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+ 
+         long topVer = assigns.topologyVersion();
+ 
+         for (int p = 0; p < partCnt; p++) {
+             if (cctx.shared().exchange().hasPendingExchange()) {
+                 if (log.isDebugEnabled())
+                     log.debug("Skipping assignments creation, exchange worker 
has pending assignments: " +
+                         exchFut.exchangeId());
+ 
+                 break;
+             }
+ 
+             // If partition belongs to local node.
+             if (cctx.affinity().localNode(p, topVer)) {
+                 GridDhtLocalPartition<K, V> part = top.localPartition(p, 
topVer, true);
+ 
+                 assert part != null;
+                 assert part.id() == p;
+ 
+                 if (part.state() != MOVING) {
+                     if (log.isDebugEnabled())
+                         log.debug("Skipping partition assignment (state is 
not MOVING): " + part);
+ 
+                     continue; // For.
+                 }
+ 
+                 Collection<ClusterNode> picked = pickedOwners(p, topVer);
+ 
+                 if (picked.isEmpty()) {
+                     top.own(part);
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Owning partition as there are no other 
owners: " + part);
+                 }
+                 else {
+                     ClusterNode n = F.first(picked);
+ 
+                     GridDhtPartitionDemandMessage<K, V> msg = assigns.get(n);
+ 
+                     if (msg == null) {
+                         assigns.put(n, msg = new 
GridDhtPartitionDemandMessage<>(
+                             top.updateSequence(),
+                             exchFut.exchangeId().topologyVersion(),
+                             cctx.cacheId()));
+                     }
+ 
+                     msg.addPartition(p);
+                 }
+             }
+         }
+ 
+         return assigns;
+     }
+ 
+     /**
+      *
+      */
+     private class SyncFuture extends GridFutureAdapter<Object> {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Remaining workers. */
+         private Collection<DemandWorker> remaining;
+ 
+         /**
+          * @param workers List of workers.
+          */
+         private SyncFuture(Collection<DemandWorker> workers) {
+             super(cctx.kernalContext());
+ 
+             assert workers.size() == poolSize();
+ 
+             remaining = Collections.synchronizedList(new 
LinkedList<>(workers));
+         }
+ 
+         /**
+          * Empty constructor required for {@link Externalizable}.
+          */
+         public SyncFuture() {
+             assert false;
+         }
+ 
+         /**
+          * @param w Worker who iterated through all partitions.
+          */
+         void onWorkerDone(DemandWorker w) {
+             if (isDone())
+                 return;
+ 
+             if (remaining.remove(w))
+                 if (log.isDebugEnabled())
+                     log.debug("Completed full partition iteration for worker 
[worker=" + w + ']');
+ 
+             if (remaining.isEmpty()) {
+                 if (log.isDebugEnabled())
+                     log.debug("Completed sync future.");
+ 
+                 onDone();
+             }
+         }
+     }
+ 
+     /**
+      * Supply message wrapper.
+      */
+     private static class SupplyMessage<K, V> {
+         /** Sender ID. */
+         private UUID sndId;
+ 
+         /** Supply message. */
+         private GridDhtPartitionSupplyMessage<K, V> supply;
+ 
+         /**
+          * Dummy constructor.
+          */
+         private SupplyMessage() {
+             // No-op.
+         }
+ 
+         /**
+          * @param sndId Sender ID.
+          * @param supply Supply message.
+          */
+         SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage<K, V> supply) 
{
+             this.sndId = sndId;
+             this.supply = supply;
+         }
+ 
+         /**
+          * @return Sender ID.
+          */
+         UUID senderId() {
+             return sndId;
+         }
+ 
+         /**
+          * @return Message.
+          */
+         GridDhtPartitionSupplyMessage<K, V> supply() {
+             return supply;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(SupplyMessage.class, this);
+         }
+     }
+ }

Reply via email to