http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
new file mode 100644
index 0000000..6fb78b8
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -0,0 +1,1846 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.thread.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static 
org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.*;
+
+/**
+ * Processor for continuous routines.
+ */
+public class GridContinuousProcessor extends GridProcessorAdapter {
+    /** Local infos. */
+    private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new 
ConcurrentHashMap8<>();
+
+    /** Remote infos. */
+    private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new 
ConcurrentHashMap8<>();
+
+    /** Start futures. */
+    private final ConcurrentMap<UUID, StartFuture> startFuts = new 
ConcurrentHashMap8<>();
+
+    /** Start ack wait lists. */
+    private final ConcurrentMap<UUID, Collection<UUID>> waitForStartAck = new 
ConcurrentHashMap8<>();
+
+    /** Stop futures. */
+    private final ConcurrentMap<UUID, StopFuture> stopFuts = new 
ConcurrentHashMap8<>();
+
+    /** Stop ack wait lists. */
+    private final ConcurrentMap<UUID, Collection<UUID>> waitForStopAck = new 
ConcurrentHashMap8<>();
+
+    /** Threads started by this processor. */
+    private final Collection<IgniteThread> threads = new 
GridConcurrentHashSet<>();
+
+    /** Pending start requests. */
+    private final Map<UUID, Collection<GridContinuousMessage>> pending = new 
HashMap<>();
+
+    /** */
+    private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts 
= new ConcurrentHashMap8<>();
+
+    /** Stopped IDs. */
+    private final Collection<UUID> stopped = new HashSet<>();
+
+    /** Lock for pending requests. */
+    private final Lock pendingLock = new ReentrantLock();
+
+    /** Lock for stop process. */
+    private final Lock stopLock = new ReentrantLock();
+
+    /** Delay in milliseconds between retries. */
+    private long retryDelay = 1000;
+
+    /** Number of retries using to send messages. */
+    private int retryCnt = 3;
+
+    /** Acknowledgement timeout. */
+    private long ackTimeout;
+
+    /** Marshaller. */
+    private IgniteMarshaller marsh;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public GridContinuousProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.config().isDaemon())
+            return;
+
+        retryDelay = ctx.config().getNetworkSendRetryDelay();
+        retryCnt = ctx.config().getNetworkSendRetryCount();
+        ackTimeout = ctx.config().getNetworkTimeout();
+
+        if (ackTimeout < retryDelay * retryCnt) {
+            U.warn(log, "Acknowledgement timeout for continuous operations is 
less than message send " +
+                "retry delay multiplied by retries count (will increase 
timeout value) [ackTimeout=" +
+                ackTimeout + ", retryDelay=" + retryDelay + ", retryCnt=" + 
retryCnt + ']');
+
+            ackTimeout = retryDelay * retryCnt;
+        }
+
+        marsh = ctx.config().getMarshaller();
+
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
+            @SuppressWarnings({"fallthrough", "TooBroadScope"})
+            @Override public void onEvent(IgniteEvent evt) {
+                assert evt instanceof IgniteDiscoveryEvent;
+
+                UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id();
+
+                Collection<GridContinuousMessage> reqs;
+
+                pendingLock.lock();
+
+                try {
+                    // Remove pending requests to send to joined node
+                    // (if node is left or failed, they are dropped).
+                    reqs = pending.remove(nodeId);
+                }
+                finally {
+                    pendingLock.unlock();
+                }
+
+                switch (evt.type()) {
+                    case EVT_NODE_JOINED:
+                        if (reqs != null) {
+                            UUID routineId = null;
+
+                            // Send pending requests.
+                            try {
+                                for (GridContinuousMessage req : reqs) {
+                                    routineId = req.routineId();
+
+                                    sendWithRetries(nodeId, req, null);
+                                }
+                            }
+                            catch (ClusterTopologyException ignored) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send pending start 
request to node (is node alive?): " +
+                                        nodeId);
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.error(log, "Failed to send pending start 
request to node: " + nodeId, e);
+
+                                completeStartFuture(routineId);
+                            }
+                        }
+
+                        break;
+
+                    case EVT_NODE_LEFT:
+                    case EVT_NODE_FAILED:
+                        // Do not wait for start acknowledgements from left 
node.
+                        for (Map.Entry<UUID, Collection<UUID>> e : 
waitForStartAck.entrySet()) {
+                            Collection<UUID> nodeIds = e.getValue();
+
+                            for (Iterator<UUID> it = nodeIds.iterator(); 
it.hasNext();) {
+                                if (nodeId.equals(it.next())) {
+                                    it.remove();
+
+                                    break;
+                                }
+                            }
+
+                            if (nodeIds.isEmpty())
+                                completeStartFuture(e.getKey());
+                        }
+
+                        // Do not wait for stop acknowledgements from left 
node.
+                        for (Map.Entry<UUID, Collection<UUID>> e : 
waitForStopAck.entrySet()) {
+                            Collection<UUID> nodeIds = e.getValue();
+
+                            for (Iterator<UUID> it = nodeIds.iterator(); 
it.hasNext();) {
+                                if (nodeId.equals(it.next())) {
+                                    it.remove();
+
+                                    break;
+                                }
+                            }
+
+                            if (nodeIds.isEmpty())
+                                completeStopFuture(e.getKey());
+                        }
+
+                        // Unregister handlers created by left node.
+                        for (Map.Entry<UUID, RemoteRoutineInfo> e : 
rmtInfos.entrySet()) {
+                            UUID routineId = e.getKey();
+                            RemoteRoutineInfo info = e.getValue();
+
+                            if (info.autoUnsubscribe && 
nodeId.equals(info.nodeId))
+                                unregisterRemote(routineId);
+                        }
+
+                        for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : 
syncMsgFuts.entrySet()) {
+                            SyncMessageAckFuture fut = e.getValue();
+
+                            if (fut.nodeId().equals(nodeId)) {
+                                SyncMessageAckFuture fut0 = 
syncMsgFuts.remove(e.getKey());
+
+                                if (fut0 != null) {
+                                    ClusterTopologyException err = new 
ClusterTopologyException(
+                                        "Node left grid while sending message 
to: " + nodeId);
+
+                                    fut0.onDone(err);
+                                }
+                            }
+                        }
+
+                        break;
+
+                    default:
+                        assert false : "Unexpected event received: " + 
evt.shortDisplay();
+                }
+            }
+        }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        ctx.io().addMessageListener(TOPIC_CONTINUOUS, new 
GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object obj) {
+                GridContinuousMessage msg = (GridContinuousMessage)obj;
+
+                if (msg.data() == null && msg.dataBytes() != null) {
+                    try {
+                        msg.data(marsh.unmarshal(msg.dataBytes(), null));
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to process message (ignoring): " 
+ msg, e);
+
+                        return;
+                    }
+                }
+
+                switch (msg.type()) {
+                    case MSG_START_REQ:
+                        processStartRequest(nodeId, msg);
+
+                        break;
+
+                    case MSG_START_ACK:
+                        processStartAck(nodeId, msg);
+
+                        break;
+
+                    case MSG_STOP_REQ:
+                        processStopRequest(nodeId, msg);
+
+                        break;
+
+                    case MSG_STOP_ACK:
+                        processStopAck(nodeId, msg);
+
+                        break;
+
+                    case MSG_EVT_NOTIFICATION:
+                        processNotification(nodeId, msg);
+
+                        break;
+
+                    case MSG_EVT_ACK:
+                        processMessageAck(msg);
+
+                        break;
+
+                    default:
+                        assert false : "Unexpected message received: " + 
msg.type();
+                }
+            }
+        });
+
+        if (log.isDebugEnabled())
+            log.debug("Continuous processor started.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        if (ctx.config().isDaemon())
+            return;
+
+        ctx.io().removeMessageListener(TOPIC_CONTINUOUS);
+
+        U.interrupt(threads);
+        U.joinThreads(threads, log);
+
+        if (log.isDebugEnabled())
+            log.debug("Continuous processor stopped.");
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Object collectDiscoveryData(UUID nodeId) {
+        if (!nodeId.equals(ctx.localNodeId())) {
+            pendingLock.lock();
+
+            try {
+                // Create empty pending set.
+                pending.put(nodeId, new HashSet<GridContinuousMessage>());
+
+                DiscoveryData data = new DiscoveryData(ctx.localNodeId());
+
+                // Collect listeners information (will be sent to
+                // joining node during discovery process).
+                for (Map.Entry<UUID, LocalRoutineInfo> e : 
locInfos.entrySet()) {
+                    UUID routineId = e.getKey();
+                    LocalRoutineInfo info = e.getValue();
+
+                    data.addItem(new DiscoveryDataItem(routineId, info.prjPred,
+                        info.hnd, info.bufSize, info.interval));
+                }
+
+                return data;
+            }
+            finally {
+                pendingLock.unlock();
+            }
+        }
+        else
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDiscoveryDataReceived(Object obj) {
+        DiscoveryData data = (DiscoveryData)obj;
+
+        if (!ctx.isDaemon() && data != null) {
+            for (DiscoveryDataItem item : data.items) {
+                // Register handler only if local node passes projection 
predicate.
+                if (item.prjPred == null || 
item.prjPred.apply(ctx.discovery().localNode())) {
+                    try {
+                        if (ctx.config().isPeerClassLoadingEnabled())
+                            item.hnd.p2pUnmarshal(data.nodeId, ctx);
+
+                        if (registerHandler(data.nodeId, item.routineId, 
item.hnd, item.bufSize, item.interval,
+                            item.autoUnsubscribe, false))
+                            item.hnd.onListenerRegistered(item.routineId, ctx);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to register continuous handler.", 
e);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param hnd Handler.
+     * @param bufSize Buffer size.
+     * @param interval Time interval.
+     * @param autoUnsubscribe Automatic unsubscribe flag.
+     * @param prjPred Projection predicate.
+     * @return Future.
+     */
+    @SuppressWarnings("TooBroadScope")
+    public IgniteFuture<UUID> startRoutine(GridContinuousHandler hnd,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe,
+        @Nullable IgnitePredicate<ClusterNode> prjPred) {
+        assert hnd != null;
+        assert bufSize > 0;
+        assert interval >= 0;
+
+        // Whether local node is included in routine.
+        boolean locIncluded = prjPred == null || 
prjPred.apply(ctx.discovery().localNode());
+
+        // Generate ID.
+        final UUID routineId = UUID.randomUUID();
+
+        StartRequestData reqData = new StartRequestData(prjPred, hnd, bufSize, 
interval, autoUnsubscribe);
+
+        try {
+            if (ctx.config().isPeerClassLoadingEnabled()) {
+                // Handle peer deployment for projection predicate.
+                if (prjPred != null && !U.isGrid(prjPred.getClass())) {
+                    Class cls = U.detectClass(prjPred);
+
+                    String clsName = cls.getName();
+
+                    GridDeployment dep = ctx.deploy().deploy(cls, 
U.detectClassLoader(cls));
+
+                    if (dep == null)
+                        throw new IgniteDeploymentException("Failed to deploy 
projection predicate: " + prjPred);
+
+                    reqData.clsName = clsName;
+                    reqData.depInfo = new GridDeploymentInfoBean(dep);
+
+                    reqData.p2pMarshal(marsh);
+                }
+
+                // Handle peer deployment for other handler-specific objects.
+                hnd.p2pMarshal(ctx);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+
+        // Register per-routine notifications listener if ordered messaging is 
used.
+        if (hnd.orderedTopic() != null) {
+            ctx.io().addMessageListener(hnd.orderedTopic(), new 
GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object obj) {
+                    GridContinuousMessage msg = (GridContinuousMessage)obj;
+
+                    // Only notification can be ordered.
+                    assert msg.type() == MSG_EVT_NOTIFICATION;
+
+                    if (msg.data() == null && msg.dataBytes() != null) {
+                        try {
+                            msg.data(marsh.unmarshal(msg.dataBytes(), null));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to process message 
(ignoring): " + msg, e);
+
+                            return;
+                        }
+                    }
+
+                    processNotification(nodeId, msg);
+                }
+            });
+        }
+
+        Collection<? extends ClusterNode> nodes;
+        Collection<UUID> nodeIds;
+
+        pendingLock.lock();
+
+        try {
+            // Nodes that participate in routine (request will be sent to 
these nodes directly).
+            nodes = F.view(ctx.discovery().allNodes(), F.and(prjPred, 
F.remoteNodes(ctx.localNodeId())));
+
+            // Stop with exception if projection is empty.
+            if (nodes.isEmpty() && !locIncluded) {
+                return new GridFinishedFuture<>(ctx,
+                    new ClusterTopologyException("Failed to register remote 
continuous listener (projection is empty)."));
+            }
+
+            // IDs of nodes where request will be sent.
+            nodeIds = new GridConcurrentHashSet<>(F.viewReadOnly(nodes, 
F.node2id()));
+
+            // If there are currently joining nodes, add request to their 
pending lists.
+            // Node IDs set is updated to make sure that we wait for 
acknowledgement from
+            // these nodes.
+            for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : 
pending.entrySet()) {
+                if (nodeIds.add(e.getKey()))
+                    e.getValue().add(new GridContinuousMessage(MSG_START_REQ, 
routineId, null, reqData));
+            }
+
+            // Register routine locally.
+            locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, 
bufSize, interval));
+        }
+        finally {
+            pendingLock.unlock();
+        }
+
+        StartFuture fut = new StartFuture(ctx, routineId);
+
+        if (!nodeIds.isEmpty()) {
+            // Wait for acknowledgements.
+            waitForStartAck.put(routineId, nodeIds);
+
+            startFuts.put(routineId, fut);
+
+            // Register acknowledge timeout (timeout object will be removed 
when
+            // future is completed).
+            fut.addTimeoutObject(new GridTimeoutObjectAdapter(ackTimeout) {
+                @Override public void onTimeout() {
+                    // Stop waiting for acknowledgements.
+                    Collection<UUID> ids = waitForStartAck.remove(routineId);
+
+                    if (ids != null) {
+                        StartFuture f = startFuts.remove(routineId);
+
+                        assert f != null;
+
+                        // If there are still nodes without acknowledgements,
+                        // Stop routine with exception. Continue and complete
+                        // future otherwise.
+                        if (!ids.isEmpty()) {
+                            f.onDone(new IgniteCheckedException("Failed to get 
start acknowledgement from nodes (timeout " +
+                                "expired): " + ids + ". Will unregister all 
continuous listeners."));
+
+                            stopRoutine(routineId);
+                        }
+                        else
+                            f.onRemoteRegistered();
+                    }
+                }
+            });
+        }
+
+        if (!nodes.isEmpty()) {
+            // Do not send projection predicate (nodes already filtered).
+            reqData.prjPred = null;
+
+            // Send start requests.
+            try {
+                GridContinuousMessage req = new 
GridContinuousMessage(MSG_START_REQ, routineId, null, reqData);
+
+                sendWithRetries(nodes, req, null);
+            }
+            catch (IgniteCheckedException e) {
+                startFuts.remove(routineId);
+                waitForStartAck.remove(routineId);
+
+                fut.onDone(e);
+
+                stopRoutine(routineId);
+
+                locIncluded = false;
+            }
+        }
+        else {
+            // There are no remote nodes, but we didn't throw topology 
exception.
+            assert locIncluded;
+
+            // Do not wait anything from remote nodes.
+            fut.onRemoteRegistered();
+        }
+
+        // Register local handler if needed.
+        if (locIncluded) {
+            try {
+                if (registerHandler(ctx.localNodeId(), routineId, hnd, 
bufSize, interval, autoUnsubscribe, true))
+                    hnd.onListenerRegistered(routineId, ctx);
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(ctx,
+                    new IgniteCheckedException("Failed to register handler 
locally: " + hnd, e));
+            }
+        }
+
+        // Handler is registered locally.
+        fut.onLocalRegistered();
+
+        return fut;
+    }
+
+    /**
+     * @param routineId Consume ID.
+     * @return Future.
+     */
+    public IgniteFuture<?> stopRoutine(UUID routineId) {
+        assert routineId != null;
+
+        boolean doStop = false;
+
+        StopFuture fut = stopFuts.get(routineId);
+
+        // Only one thread will stop routine with provided ID.
+        if (fut == null) {
+            StopFuture old = stopFuts.putIfAbsent(routineId, fut = new 
StopFuture(ctx));
+
+            if (old != null)
+                fut = old;
+            else
+                doStop = true;
+        }
+
+        if (doStop) {
+            // Unregister routine locally.
+            LocalRoutineInfo routine = locInfos.remove(routineId);
+
+            // Finish if routine is not found (wrong ID is provided).
+            if (routine == null) {
+                stopFuts.remove(routineId);
+
+                fut.onDone();
+
+                return fut;
+            }
+
+            // Unregister handler locally.
+            unregisterHandler(routineId, routine.hnd, true);
+
+            pendingLock.lock();
+
+            try {
+                // Remove pending requests for this routine.
+                for (Collection<GridContinuousMessage> msgs : 
pending.values()) {
+                    Iterator<GridContinuousMessage> it = msgs.iterator();
+
+                    while (it.hasNext()) {
+                        if (it.next().routineId().equals(routineId))
+                            it.remove();
+                    }
+                }
+            }
+            finally {
+                pendingLock.unlock();
+            }
+
+            // Nodes where to send stop requests.
+            Collection<? extends ClusterNode> nodes = 
F.view(ctx.discovery().allNodes(),
+                F.and(routine.prjPred, F.remoteNodes(ctx.localNodeId())));
+
+            if (!nodes.isEmpty()) {
+                // Wait for acknowledgements.
+                waitForStopAck.put(routineId, new 
GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id())));
+
+                // Register acknowledge timeout (timeout object will be 
removed when
+                // future is completed).
+                fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, 
routineId,
+                    new GridContinuousMessage(MSG_STOP_REQ, routineId, null, 
null)));
+
+                // Send stop requests.
+                try {
+                    for (ClusterNode node : nodes) {
+                        try {
+                            sendWithRetries(node.id(),
+                                new GridContinuousMessage(MSG_STOP_REQ, 
routineId, null, null),
+                                null);
+                        }
+                        catch (ClusterTopologyException ignored) {
+                            U.warn(log, "Failed to send stop request (node 
left topology): " + node.id());
+                        }
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    stopFuts.remove(routineId);
+                    waitForStopAck.remove(routineId);
+
+                    fut.onDone(e);
+                }
+            }
+            else {
+                stopFuts.remove(routineId);
+
+                fut.onDone();
+            }
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param nodeId ID of the node that started routine.
+     * @param routineId Routine ID.
+     * @param obj Notification object.
+     * @param orderedTopic Topic for ordered notifications. If {@code null}, 
non-ordered message will be sent.
+     * @param sync If {@code true} then waits for event acknowledgment.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void addNotification(UUID nodeId,
+        UUID routineId,
+        @Nullable Object obj,
+        @Nullable Object orderedTopic,
+        boolean sync)
+        throws IgniteCheckedException {
+        assert nodeId != null;
+        assert routineId != null;
+
+        assert !nodeId.equals(ctx.localNodeId());
+
+        RemoteRoutineInfo info = rmtInfos.get(routineId);
+
+        if (info != null) {
+            assert info.interval == 0 || !sync;
+
+            if (sync) {
+                SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, 
nodeId);
+
+                IgniteUuid futId = IgniteUuid.randomUuid();
+
+                syncMsgFuts.put(futId, fut);
+
+                try {
+                    sendNotification(nodeId, routineId, futId, F.asList(obj), 
orderedTopic);
+                }
+                catch (IgniteCheckedException e) {
+                    syncMsgFuts.remove(futId);
+
+                    throw e;
+                }
+
+                fut.get();
+            }
+            else {
+                Collection<Object> toSnd = info.add(obj);
+
+                if (toSnd != null)
+                    sendNotification(nodeId, routineId, null, toSnd, 
orderedTopic);
+            }
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param routineId Routine ID.
+     * @param futId Future ID.
+     * @param toSnd Notification object to send.
+     * @param orderedTopic Topic for ordered notifications.
+     *      If {@code null}, non-ordered message will be sent.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sendNotification(UUID nodeId,
+        UUID routineId,
+        @Nullable IgniteUuid futId,
+        Collection<Object> toSnd,
+        @Nullable Object orderedTopic) throws IgniteCheckedException {
+        assert nodeId != null;
+        assert routineId != null;
+        assert toSnd != null;
+        assert !toSnd.isEmpty();
+
+        sendWithRetries(nodeId, new 
GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd), 
orderedTopic);
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param req Start request.
+     */
+    private void processStartRequest(UUID nodeId, GridContinuousMessage req) {
+        assert nodeId != null;
+        assert req != null;
+
+        UUID routineId = req.routineId();
+        StartRequestData data = req.data();
+
+        GridContinuousHandler hnd = data.hnd;
+
+        IgniteCheckedException err = null;
+
+        try {
+            if (ctx.config().isPeerClassLoadingEnabled()) {
+                String clsName = data.clsName;
+
+                if (clsName != null) {
+                    GridDeploymentInfo depInfo = data.depInfo;
+
+                    GridDeployment dep = 
ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+                        depInfo.userVersion(), nodeId, 
depInfo.classLoaderId(), depInfo.participants(), null);
+
+                    if (dep == null)
+                        throw new IgniteDeploymentException("Failed to obtain 
deployment for class: " + clsName);
+
+                    data.p2pUnmarshal(marsh, dep.classLoader());
+                }
+
+                hnd.p2pUnmarshal(nodeId, ctx);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            err = e;
+
+            U.error(log, "Failed to register handler [nodeId=" + nodeId + ", 
routineId=" + routineId + ']', e);
+        }
+
+        boolean registered = false;
+
+        if (err == null) {
+            try {
+                IgnitePredicate<ClusterNode> prjPred = data.prjPred;
+
+                if (prjPred == null || 
prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) {
+                    registered = registerHandler(nodeId, routineId, hnd, 
data.bufSize, data.interval,
+                        data.autoUnsubscribe, false);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                err = e;
+
+                U.error(log, "Failed to register handler [nodeId=" + nodeId + 
", routineId=" + routineId + ']', e);
+            }
+        }
+
+        try {
+            sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, 
routineId, null, err), null);
+        }
+        catch (ClusterTopologyException ignored) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send start acknowledgement to node (is 
node alive?): " + nodeId);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send start acknowledgement to node: " + 
nodeId, e);
+        }
+
+        if (registered)
+            hnd.onListenerRegistered(routineId, ctx);
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param ack Start acknowledgement.
+     */
+    private void processStartAck(UUID nodeId, GridContinuousMessage ack) {
+        assert nodeId != null;
+        assert ack != null;
+
+        UUID routineId = ack.routineId();
+
+        final IgniteCheckedException err = ack.data();
+
+        if (err != null) {
+            if (waitForStartAck.remove(routineId) != null) {
+                final StartFuture fut = startFuts.remove(routineId);
+
+                if (fut != null) {
+                    fut.onDone(err);
+
+                    stopRoutine(routineId);
+                }
+            }
+        }
+
+        Collection<UUID> nodeIds = waitForStartAck.get(routineId);
+
+        if (nodeIds != null) {
+            nodeIds.remove(nodeId);
+
+            if (nodeIds.isEmpty())
+                completeStartFuture(routineId);
+        }
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param req Stop request.
+     */
+    private void processStopRequest(UUID nodeId, GridContinuousMessage req) {
+        assert nodeId != null;
+        assert req != null;
+
+        UUID routineId = req.routineId();
+
+        unregisterRemote(routineId);
+
+        try {
+            sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, 
routineId, null, null), null);
+        }
+        catch (ClusterTopologyException ignored) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send stop acknowledgement to node (is 
node alive?): " + nodeId);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send stop acknowledgement to node: " + 
nodeId, e);
+        }
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param ack Stop acknowledgement.
+     */
+    private void processStopAck(UUID nodeId, GridContinuousMessage ack) {
+        assert nodeId != null;
+        assert ack != null;
+
+        UUID routineId = ack.routineId();
+
+        Collection<UUID> nodeIds = waitForStopAck.get(routineId);
+
+        if (nodeIds != null) {
+            nodeIds.remove(nodeId);
+
+            if (nodeIds.isEmpty())
+                completeStopFuture(routineId);
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void processMessageAck(GridContinuousMessage msg) {
+        assert msg.futureId() != null;
+
+        SyncMessageAckFuture fut = syncMsgFuts.remove(msg.futureId());
+
+        if (fut != null)
+            fut.onDone();
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param msg Message.
+     */
+    private void processNotification(UUID nodeId, GridContinuousMessage msg) {
+        assert nodeId != null;
+        assert msg != null;
+
+        UUID routineId = msg.routineId();
+
+        try {
+            LocalRoutineInfo routine = locInfos.get(routineId);
+
+            if (routine != null)
+                routine.hnd.notifyCallback(nodeId, routineId, 
(Collection<?>)msg.data(), ctx);
+        }
+        finally {
+            if (msg.futureId() != null) {
+                try {
+                    sendWithRetries(nodeId,
+                        new GridContinuousMessage(MSG_EVT_ACK, null, 
msg.futureId(), null),
+                        null);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to send event acknowledgment to node: " 
+ nodeId, e);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param routineId Consume ID.
+     */
+    private void completeStartFuture(UUID routineId) {
+        assert routineId != null;
+
+        if (waitForStartAck.remove(routineId) != null) {
+            StartFuture fut = startFuts.remove(routineId);
+
+            assert fut != null;
+
+            fut.onRemoteRegistered();
+        }
+    }
+
+    /**
+     * @param routineId Consume ID.
+     */
+    private void completeStopFuture(UUID routineId) {
+        assert routineId != null;
+
+        if (waitForStopAck.remove(routineId) != null) {
+            GridFutureAdapter <?> fut = stopFuts.remove(routineId);
+
+            assert fut != null;
+
+            fut.onDone();
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param routineId Consume ID.
+     * @param hnd Handler.
+     * @param bufSize Buffer size.
+     * @param interval Time interval.
+     * @param autoUnsubscribe Automatic unsubscribe flag.
+     * @param loc Local registration flag.
+     * @return Whether listener was actually registered.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private boolean registerHandler(final UUID nodeId,
+        final UUID routineId,
+        final GridContinuousHandler hnd,
+        int bufSize,
+        final long interval,
+        boolean autoUnsubscribe,
+        boolean loc) throws IgniteCheckedException {
+        assert nodeId != null;
+        assert routineId != null;
+        assert hnd != null;
+        assert bufSize > 0;
+        assert interval >= 0;
+
+        final RemoteRoutineInfo info = new RemoteRoutineInfo(nodeId, hnd, 
bufSize, interval, autoUnsubscribe);
+
+        boolean doRegister = loc;
+
+        if (!doRegister) {
+            stopLock.lock();
+
+            try {
+                doRegister = !stopped.remove(routineId) && 
rmtInfos.putIfAbsent(routineId, info) == null;
+            }
+            finally {
+                stopLock.unlock();
+            }
+        }
+
+        if (doRegister) {
+            if (interval > 0) {
+                IgniteThread checker = new IgniteThread(new 
GridWorker(ctx.gridName(), "continuous-buffer-checker", log) {
+                    @SuppressWarnings("ConstantConditions")
+                    @Override protected void body() {
+                        long interval0 = interval;
+
+                        while (!isCancelled()) {
+                            try {
+                                U.sleep(interval0);
+                            }
+                            catch (IgniteInterruptedException ignored) {
+                                break;
+                            }
+
+                            IgniteBiTuple<Collection<Object>, Long> t = 
info.checkInterval();
+
+                            Collection<Object> toSnd = t.get1();
+
+                            if (toSnd != null) {
+                                try {
+                                    sendNotification(nodeId, routineId, null, 
toSnd, hnd.orderedTopic());
+                                }
+                                catch (ClusterTopologyException ignored) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to send notification 
to node (is node alive?): " + nodeId);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    U.error(log, "Failed to send notification 
to node: " + nodeId, e);
+                                }
+                            }
+
+                            interval0 = t.get2();
+                        }
+                    }
+                });
+
+                threads.add(checker);
+
+                checker.start();
+            }
+
+            return hnd.register(nodeId, routineId, ctx);
+        }
+
+        return false;
+    }
+
+    /**
+     * @param routineId Routine ID.
+     * @param hnd Handler
+     * @param loc If Handler unregistered on master node.
+     */
+    private void unregisterHandler(UUID routineId, GridContinuousHandler hnd, 
boolean loc) {
+        assert routineId != null;
+        assert hnd != null;
+
+        if (loc && hnd.orderedTopic() != null)
+            ctx.io().removeMessageListener(hnd.orderedTopic());
+
+        hnd.unregister(routineId, ctx);
+    }
+
+    /**
+     * @param routineId Routine ID.
+     */
+    @SuppressWarnings("TooBroadScope")
+    private void unregisterRemote(UUID routineId) {
+        RemoteRoutineInfo info;
+
+        stopLock.lock();
+
+        try {
+            info = rmtInfos.remove(routineId);
+
+            if (info == null)
+                stopped.add(routineId);
+        }
+        finally {
+            stopLock.unlock();
+        }
+
+        if (info != null)
+            unregisterHandler(routineId, info.hnd, false);
+    }
+
+    /**
+     * @param nodeId Destination node ID.
+     * @param msg Message.
+     * @param orderedTopic Topic for ordered notifications.
+     *      If {@code null}, non-ordered message will be sent.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, 
@Nullable Object orderedTopic)
+        throws IgniteCheckedException {
+        assert nodeId != null;
+        assert msg != null;
+
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node != null)
+            sendWithRetries(node, msg, orderedTopic);
+        else
+            throw new ClusterTopologyException("Node for provided ID doesn't 
exist (did it leave the grid?): " + nodeId);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param msg Message.
+     * @param orderedTopic Topic for ordered notifications.
+     *      If {@code null}, non-ordered message will be sent.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, 
@Nullable Object orderedTopic)
+        throws IgniteCheckedException {
+        assert node != null;
+        assert msg != null;
+
+        sendWithRetries(F.asList(node), msg, orderedTopic);
+    }
+
+    /**
+     * @param nodes Destination nodes.
+     * @param msg Message.
+     * @param orderedTopic Topic for ordered notifications.
+     *      If {@code null}, non-ordered message will be sent.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sendWithRetries(Collection<? extends ClusterNode> nodes, 
GridContinuousMessage msg,
+        @Nullable Object orderedTopic) throws IgniteCheckedException {
+        assert !F.isEmpty(nodes);
+        assert msg != null;
+
+        if (msg.data() != null && (nodes.size() > 1 || 
!ctx.localNodeId().equals(F.first(nodes).id())))
+            msg.dataBytes(marsh.marshal(msg.data()));
+
+        boolean first = true;
+
+        for (ClusterNode node : nodes) {
+            msg = first ? msg : (GridContinuousMessage)msg.clone();
+
+            first = false;
+
+            int cnt = 0;
+
+            while (cnt <= retryCnt) {
+                try {
+                    cnt++;
+
+                    if (orderedTopic != null) {
+                        ctx.io().sendOrderedMessage(
+                            node,
+                            orderedTopic,
+                            ctx.io().nextMessageId(orderedTopic, node.id()),
+                            msg,
+                            SYSTEM_POOL,
+                            0,
+                            true);
+                    }
+                    else
+                        ctx.io().send(node, TOPIC_CONTINUOUS, msg, 
SYSTEM_POOL);
+
+                    break;
+                }
+                catch (IgniteInterruptedException e) {
+                    throw e;
+                }
+                catch (IgniteCheckedException e) {
+                    if (!ctx.discovery().alive(node.id()))
+                        throw new ClusterTopologyException("Node left grid 
while sending message to: " + node.id(), e);
+
+                    if (cnt == retryCnt)
+                        throw e;
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to send message to node (will 
retry): " + node.id());
+                }
+
+                U.sleep(retryDelay);
+            }
+        }
+    }
+
+    /**
+     * Local routine info.
+     */
+    @SuppressWarnings("PackageVisibleInnerClass")
+    static class LocalRoutineInfo {
+        /** Projection predicate. */
+        private final IgnitePredicate<ClusterNode> prjPred;
+
+        /** Continuous routine handler. */
+        private final GridContinuousHandler hnd;
+
+        /** Buffer size. */
+        private final int bufSize;
+
+        /** Time interval. */
+        private final long interval;
+
+        /**
+         * @param prjPred Projection predicate.
+         * @param hnd Continuous routine handler.
+         * @param bufSize Buffer size.
+         * @param interval Interval.
+         */
+        LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred, 
GridContinuousHandler hnd, int bufSize,
+            long interval) {
+            assert hnd != null;
+            assert bufSize > 0;
+            assert interval >= 0;
+
+            this.prjPred = prjPred;
+            this.hnd = hnd;
+            this.bufSize = bufSize;
+            this.interval = interval;
+        }
+
+        /**
+         * @return Handler.
+         */
+        GridContinuousHandler handler() {
+            return hnd;
+        }
+    }
+
+    /**
+     * Remote routine info.
+     */
+    private static class RemoteRoutineInfo {
+        /** Master node ID. */
+        private final UUID nodeId;
+
+        /** Continuous routine handler. */
+        private final GridContinuousHandler hnd;
+
+        /** Buffer size. */
+        private final int bufSize;
+
+        /** Time interval. */
+        private final long interval;
+
+        /** Lock. */
+        private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+        /** Buffer. */
+        private ConcurrentLinkedDeque8<Object> buf;
+
+        /** Last send time. */
+        private long lastSndTime = U.currentTimeMillis();
+
+        /** Automatic unsubscribe flag. */
+        private boolean autoUnsubscribe;
+
+        /**
+         * @param nodeId Master node ID.
+         * @param hnd Continuous routine handler.
+         * @param bufSize Buffer size.
+         * @param interval Interval.
+         * @param autoUnsubscribe Automatic unsubscribe flag.
+         */
+        RemoteRoutineInfo(UUID nodeId, GridContinuousHandler hnd, int bufSize, 
long interval,
+            boolean autoUnsubscribe) {
+            assert nodeId != null;
+            assert hnd != null;
+            assert bufSize > 0;
+            assert interval >= 0;
+
+            this.nodeId = nodeId;
+            this.hnd = hnd;
+            this.bufSize = bufSize;
+            this.interval = interval;
+            this.autoUnsubscribe = autoUnsubscribe;
+
+            buf = new ConcurrentLinkedDeque8<>();
+        }
+
+        /**
+         * @param obj Object to add.
+         * @return Object to send or {@code null} if there is nothing to send 
for now.
+         */
+        @Nullable Collection<Object> add(@Nullable Object obj) {
+            Collection<Object> toSnd = null;
+
+            if (buf.sizex() >= bufSize - 1) {
+                lock.writeLock().lock();
+
+                try {
+                    buf.add(obj);
+
+                    toSnd = buf;
+
+                    buf = new ConcurrentLinkedDeque8<>();
+
+                    if (interval > 0)
+                        lastSndTime = U.currentTimeMillis();
+                }
+                finally {
+                    lock.writeLock().unlock();
+                }
+            }
+            else {
+                lock.readLock().lock();
+
+                try {
+                    buf.add(obj);
+                }
+                finally {
+                    lock.readLock().unlock();
+                }
+            }
+
+            return toSnd != null ? new ArrayList<>(toSnd) : null;
+        }
+
+        /**
+         * @return Tuple with objects to sleep (or {@code null} if there is 
nothing to
+         *      send for now) and time interval after next check is needed.
+         */
+        @SuppressWarnings("TooBroadScope")
+        IgniteBiTuple<Collection<Object>, Long> checkInterval() {
+            assert interval > 0;
+
+            Collection<Object> toSnd = null;
+            long diff;
+
+            long now = U.currentTimeMillis();
+
+            lock.writeLock().lock();
+
+            try {
+                diff = now - lastSndTime;
+
+                if (diff >= interval && !buf.isEmpty()) {
+                    toSnd = buf;
+
+                    buf = new ConcurrentLinkedDeque8<>();
+
+                    lastSndTime = now;
+                }
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+
+            return F.t(toSnd, diff < interval ? interval - diff : interval);
+        }
+    }
+
+    /**
+     * Start request data.
+     */
+    private static class StartRequestData implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Projection predicate. */
+        private IgnitePredicate<ClusterNode> prjPred;
+
+        /** Serialized projection predicate. */
+        private byte[] prjPredBytes;
+
+        /** Deployment class name. */
+        private String clsName;
+
+        /** Deployment info. */
+        private GridDeploymentInfo depInfo;
+
+        /** Handler. */
+        private GridContinuousHandler hnd;
+
+        /** Buffer size. */
+        private int bufSize;
+
+        /** Time interval. */
+        private long interval;
+
+        /** Automatic unsubscribe flag. */
+        private boolean autoUnsubscribe;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public StartRequestData() {
+            // No-op.
+        }
+
+        /**
+         * @param prjPred Serialized projection predicate.
+         * @param hnd Handler.
+         * @param bufSize Buffer size.
+         * @param interval Time interval.
+         * @param autoUnsubscribe Automatic unsubscribe flag.
+         */
+        StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred, 
GridContinuousHandler hnd,
+            int bufSize, long interval, boolean autoUnsubscribe) {
+            assert hnd != null;
+            assert bufSize > 0;
+            assert interval >= 0;
+
+            this.prjPred = prjPred;
+            this.hnd = hnd;
+            this.bufSize = bufSize;
+            this.interval = interval;
+            this.autoUnsubscribe = autoUnsubscribe;
+        }
+
+        /**
+         * @param marsh Marshaller.
+         * @throws IgniteCheckedException In case of error.
+         */
+        void p2pMarshal(IgniteMarshaller marsh) throws IgniteCheckedException {
+            assert marsh != null;
+
+            prjPredBytes = marsh.marshal(prjPred);
+        }
+
+        /**
+         * @param marsh Marshaller.
+         * @param ldr Class loader.
+         * @throws IgniteCheckedException In case of error.
+         */
+        void p2pUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr) 
throws IgniteCheckedException {
+            assert marsh != null;
+
+            assert prjPred == null;
+            assert prjPredBytes != null;
+
+            prjPred = marsh.unmarshal(prjPredBytes, ldr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            boolean b = prjPredBytes != null;
+
+            out.writeBoolean(b);
+
+            if (b) {
+                U.writeByteArray(out, prjPredBytes);
+                U.writeString(out, clsName);
+                out.writeObject(depInfo);
+            }
+            else
+                out.writeObject(prjPred);
+
+            out.writeObject(hnd);
+            out.writeInt(bufSize);
+            out.writeLong(interval);
+            out.writeBoolean(autoUnsubscribe);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            boolean b = in.readBoolean();
+
+            if (b) {
+                prjPredBytes = U.readByteArray(in);
+                clsName = U.readString(in);
+                depInfo = (GridDeploymentInfo)in.readObject();
+            }
+            else
+                prjPred = (IgnitePredicate<ClusterNode>)in.readObject();
+
+            hnd = (GridContinuousHandler)in.readObject();
+            bufSize = in.readInt();
+            interval = in.readLong();
+            autoUnsubscribe = in.readBoolean();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StartRequestData.class, this);
+        }
+    }
+
+    /**
+     * Discovery data.
+     */
+    private static class DiscoveryData implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Node ID. */
+        private UUID nodeId;
+
+        /** Items. */
+        @GridToStringInclude
+        private Collection<DiscoveryDataItem> items;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public DiscoveryData() {
+            // No-op.
+        }
+
+        /**
+         * @param nodeId Node ID.
+         */
+        DiscoveryData(UUID nodeId) {
+            assert nodeId != null;
+
+            this.nodeId = nodeId;
+
+            items = new ArrayList<>();
+        }
+
+        /**
+         * @param item Item.
+         */
+        public void addItem(DiscoveryDataItem item) {
+            items.add(item);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            U.writeUuid(out, nodeId);
+            U.writeCollection(out, items);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            nodeId = U.readUuid(in);
+            items = U.readCollection(in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DiscoveryData.class, this);
+        }
+    }
+
+    /**
+     * Discovery data item.
+     */
+    private static class DiscoveryDataItem implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Consume ID. */
+        private UUID routineId;
+
+        /** Projection predicate. */
+        private IgnitePredicate<ClusterNode> prjPred;
+
+        /** Handler. */
+        private GridContinuousHandler hnd;
+
+        /** Buffer size. */
+        private int bufSize;
+
+        /** Time interval. */
+        private long interval;
+
+        /** Automatic unsubscribe flag. */
+        private boolean autoUnsubscribe;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public DiscoveryDataItem() {
+            // No-op.
+        }
+
+        /**
+         * @param routineId Consume ID.
+         * @param prjPred Projection predicate.
+         * @param hnd Handler.
+         * @param bufSize Buffer size.
+         * @param interval Time interval.
+         */
+        DiscoveryDataItem(UUID routineId, @Nullable 
IgnitePredicate<ClusterNode> prjPred,
+            GridContinuousHandler hnd, int bufSize, long interval) {
+            assert routineId != null;
+            assert hnd != null;
+            assert bufSize > 0;
+            assert interval >= 0;
+
+            this.routineId = routineId;
+            this.prjPred = prjPred;
+            this.hnd = hnd;
+            this.bufSize = bufSize;
+            this.interval = interval;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            U.writeUuid(out, routineId);
+            out.writeObject(prjPred);
+            out.writeObject(hnd);
+            out.writeInt(bufSize);
+            out.writeLong(interval);
+            out.writeBoolean(autoUnsubscribe);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            routineId = U.readUuid(in);
+            prjPred = (IgnitePredicate<ClusterNode>)in.readObject();
+            hnd = (GridContinuousHandler)in.readObject();
+            bufSize = in.readInt();
+            interval = in.readLong();
+            autoUnsubscribe = in.readBoolean();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DiscoveryDataItem.class, this);
+        }
+    }
+
+    /**
+     * Future for start routine.
+     */
+    private static class StartFuture extends GridFutureAdapter<UUID> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Consume ID. */
+        private UUID routineId;
+
+        /** Local listener is registered. */
+        private volatile boolean loc;
+
+        /** All remote listeners are registered. */
+        private volatile boolean rmt;
+
+        /** Timeout object. */
+        private volatile GridTimeoutObject timeoutObj;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public StartFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Kernal context.
+         * @param routineId Consume ID.
+         */
+        StartFuture(GridKernalContext ctx, UUID routineId) {
+            super(ctx);
+
+            this.routineId = routineId;
+        }
+
+        /**
+         * Called when local listener is registered.
+         */
+        public void onLocalRegistered() {
+            loc = true;
+
+            if (rmt && !isDone())
+                onDone(routineId);
+        }
+
+        /**
+         * Called when all remote listeners are registered.
+         */
+        public void onRemoteRegistered() {
+            rmt = true;
+
+            if (loc && !isDone())
+                onDone(routineId);
+        }
+
+        /**
+         * @param timeoutObj Timeout object.
+         */
+        public void addTimeoutObject(GridTimeoutObject timeoutObj) {
+            assert timeoutObj != null;
+
+            this.timeoutObj = timeoutObj;
+
+            ctx.timeout().addTimeoutObject(timeoutObj);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable UUID res, @Nullable 
Throwable err) {
+            if (timeoutObj != null)
+                ctx.timeout().removeTimeoutObject(timeoutObj);
+
+            return super.onDone(res, err);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StartFuture.class, this);
+        }
+    }
+
+    /**
+     * Future for stop routine.
+     */
+    private static class StopFuture extends GridFutureAdapter<Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Timeout object. */
+        private volatile GridTimeoutObject timeoutObj;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public StopFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Kernal context.
+         */
+        StopFuture(GridKernalContext ctx) {
+            super(ctx);
+        }
+
+        /**
+         * @param timeoutObj Timeout object.
+         */
+        public void addTimeoutObject(GridTimeoutObject timeoutObj) {
+            assert timeoutObj != null;
+
+            this.timeoutObj = timeoutObj;
+
+            ctx.timeout().addTimeoutObject(timeoutObj);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Object res, @Nullable 
Throwable err) {
+            if (timeoutObj != null)
+                ctx.timeout().removeTimeoutObject(timeoutObj);
+
+            return super.onDone(res, err);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StopFuture.class, this);
+        }
+    }
+
+    /**
+     * Synchronous message acknowledgement future.
+     */
+    private static class SyncMessageAckFuture extends 
GridFutureAdapter<Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private UUID nodeId;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public SyncMessageAckFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Kernal context.
+         * @param nodeId Master node ID.
+         */
+        SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) {
+            super(ctx);
+
+            this.nodeId = nodeId;
+        }
+
+        /**
+         * @return Master node ID.
+         */
+        UUID nodeId() {
+            return nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SyncMessageAckFuture.class, this);
+        }
+    }
+
+    /**
+     * Timeout object for stop process.
+     */
+    private class StopTimeoutObject extends GridTimeoutObjectAdapter {
+        /** Timeout. */
+        private final long timeout;
+
+        /** Routine ID. */
+        private final UUID routineId;
+
+        /** Request. */
+        private final GridContinuousMessage req;
+
+        /**
+         * @param timeout Timeout.
+         * @param routineId Routine ID.
+         * @param req Request.
+         */
+        protected StopTimeoutObject(long timeout, UUID routineId, 
GridContinuousMessage req) {
+            super(timeout);
+
+            assert routineId != null;
+            assert req != null;
+
+            this.timeout = timeout;
+            this.routineId = routineId;
+            this.req = req;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            Collection<UUID> ids = waitForStopAck.remove(routineId);
+
+            if (ids != null) {
+                U.warn(log, "Failed to get stop acknowledgement from nodes 
(timeout expired): " + ids +
+                    ". Will retry.");
+
+                StopFuture f = stopFuts.get(routineId);
+
+                if (f != null) {
+                    if (!ids.isEmpty()) {
+                        waitForStopAck.put(routineId, ids);
+
+                        // Resend requests.
+                        for (UUID id : ids) {
+                            try {
+                                sendWithRetries(id, req, null);
+                            }
+                            catch (ClusterTopologyException ignored) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to resend stop request 
to node (is node alive?): " + id);
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.error(log, "Failed to resend stop request to 
node: " + id, e);
+
+                                ids.remove(id);
+
+                                if (ids.isEmpty())
+                                    f.onDone(e);
+                            }
+                        }
+
+                        // Reschedule timeout.
+                        ctx.timeout().addTimeoutObject(new 
StopTimeoutObject(timeout, routineId, req));
+                    }
+                    else if (stopFuts.remove(routineId) != null)
+                        f.onDone();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
new file mode 100644
index 0000000..2bda02f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.dataload.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.transactions.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Bundled factory for cache updaters.
+ */
+public class GridDataLoadCacheUpdaters {
+    /** */
+    private static final IgniteDataLoadCacheUpdater INDIVIDUAL = new 
Individual();
+
+    /** */
+    private static final IgniteDataLoadCacheUpdater BATCHED = new Batched();
+
+    /** */
+    private static final IgniteDataLoadCacheUpdater BATCHED_SORTED = new 
BatchedSorted();
+
+    /** */
+    private static final IgniteDataLoadCacheUpdater GROUP_LOCKED = new 
GroupLocked();
+
+    /**
+     * Updates cache using independent {@link 
org.apache.ignite.cache.GridCache#put(Object, Object, 
org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#remove(Object, 
org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from 
deadlocks but performance
+     * is not the best.
+     *
+     * @return Single updater.
+     */
+    public static <K, V> IgniteDataLoadCacheUpdater<K, V> individual() {
+        return INDIVIDUAL;
+    }
+
+    /**
+     * Updates cache using batched methods {@link 
org.apache.ignite.cache.GridCache#putAll(Map, 
org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, 
org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same 
keys are getting
+     * updated concurrently. Performance is generally better than in {@link 
#individual()}.
+     *
+     * @return Batched updater.
+     */
+    public static <K, V> IgniteDataLoadCacheUpdater<K, V> batched() {
+        return BATCHED;
+    }
+
+    /**
+     * Updates cache using batched methods {@link 
org.apache.ignite.cache.GridCache#putAll(Map, 
org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, 
org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order 
and if all updates
+     * use the same rule deadlock can not happen. Performance is generally 
better than in {@link #individual()}.
+     *
+     * @return Batched sorted updater.
+     */
+    public static <K extends Comparable<?>, V> IgniteDataLoadCacheUpdater<K, 
V> batchedSorted() {
+        return BATCHED_SORTED;
+    }
+
+    /**
+     * Updates cache using batched methods {@link 
org.apache.ignite.cache.GridCache#putAll(Map, 
org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, 
org.apache.ignite.lang.IgnitePredicate[])} in group lock transaction. Requires 
that there are no
+     * concurrent updates other than in group lock.
+     *
+     * @return Updater with group lock.
+     */
+    public static <K, V> IgniteDataLoadCacheUpdater<K, V> groupLocked() {
+        return GROUP_LOCKED;
+    }
+
+    /**
+     * Updates cache.
+     *
+     * @param cache Cache.
+     * @param rmvCol Keys to remove.
+     * @param putMap Entries to put.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable 
Collection<K> rmvCol,
+        Map<K, V> putMap) throws IgniteCheckedException {
+        assert rmvCol != null || putMap != null;
+
+        // Here we assume that there are no key duplicates, so the following 
calls are valid.
+        if (rmvCol != null)
+            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
+
+        if (putMap != null)
+            cache.putAll(putMap);
+    }
+
+    /**
+     * Simple cache updater implementation. Updates keys one by one thus is 
not dead lock prone.
+     */
+    private static class Individual<K, V> implements 
IgniteDataLoadCacheUpdater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries)
+            throws IgniteCheckedException {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null)
+                    cache.remove(key);
+                else
+                    cache.put(key, val);
+            }
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
+     */
+    private static class Batched<K, V> implements 
IgniteDataLoadCacheUpdater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries)
+            throws IgniteCheckedException {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Collection<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new ArrayList<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new HashMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock 
prone.
+     */
+    private static class BatchedSorted<K, V> implements 
IgniteDataLoadCacheUpdater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries)
+            throws IgniteCheckedException {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Collection<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key instanceof Comparable;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new TreeSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new TreeMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+
+    /**
+     * Cache updater which uses group lock.
+     */
+    private static class GroupLocked<K, V> implements 
IgniteDataLoadCacheUpdater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries)
+            throws IgniteCheckedException {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            assert 
cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != ATOMIC;
+
+            Map<Integer, Integer> partsCounts = new HashMap<>();
+
+            // Group by partition ID.
+            Map<Integer, Collection<K>> rmvPartMap = null;
+            Map<Integer, Map<K, V>> putPartMap = null;
+
+            Ignite ignite = cache.unwrap(Ignite.class);
+
+            GridCacheAffinity<K> aff = ignite.<K, 
V>cache(cache.getName()).affinity();
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                int part = aff.partition(key);
+
+                Integer cnt = partsCounts.get(part);
+
+                partsCounts.put(part, cnt == null ? 1 : cnt + 1);
+
+                if (val == null) {
+                    if (rmvPartMap == null)
+                        rmvPartMap = new HashMap<>();
+
+                    F.addIfAbsent(rmvPartMap, part, F.<K>newList()).add(key);
+                }
+                else {
+                    if (putPartMap == null)
+                        putPartMap = new HashMap<>();
+
+                    F.addIfAbsent(putPartMap, part, F.<K, V>newMap()).put(key, 
val);
+                }
+            }
+
+            IgniteTransactions txs = ignite.transactions();
+
+            for (Map.Entry<Integer, Integer> e : partsCounts.entrySet()) {
+                Integer part = e.getKey();
+                int cnt = e.getValue();
+
+                try (IgniteTx tx = txs.txStartPartition(cache.getName(), part, 
PESSIMISTIC, REPEATABLE_READ, 0, cnt)) {
+                    updateAll(cache, rmvPartMap == null ? null : 
rmvPartMap.get(part),
+                        putPartMap == null ? null : putPartMap.get(part));
+
+                    tx.commit();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
new file mode 100644
index 0000000..794a02b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridDataLoadRequest extends GridTcpCommunicationMessageAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] resTopicBytes;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** */
+    private byte[] updaterBytes;
+
+    /** Entries to put. */
+    private byte[] colBytes;
+
+    /** {@code True} to ignore deployment ownership. */
+    private boolean ignoreDepOwnership;
+
+    /** */
+    private boolean skipStore;
+
+    /** */
+    private IgniteDeploymentMode depMode;
+
+    /** */
+    private String sampleClsName;
+
+    /** */
+    private String userVer;
+
+    /** Node class loader participants. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+    private Map<UUID, IgniteUuid> ldrParticipants;
+
+    /** */
+    private IgniteUuid clsLdrId;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public GridDataLoadRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param resTopicBytes Response topic.
+     * @param cacheName Cache name.
+     * @param updaterBytes Cache updater.
+     * @param colBytes Collection bytes.
+     * @param ignoreDepOwnership Ignore ownership.
+     * @param skipStore Skip store flag.
+     * @param depMode Deployment mode.
+     * @param sampleClsName Sample class name.
+     * @param userVer User version.
+     * @param ldrParticipants Loader participants.
+     * @param clsLdrId Class loader ID.
+     * @param forceLocDep Force local deployment.
+     */
+    public GridDataLoadRequest(long reqId,
+        byte[] resTopicBytes,
+        @Nullable String cacheName,
+        byte[] updaterBytes,
+        byte[] colBytes,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        IgniteDeploymentMode depMode,
+        String sampleClsName,
+        String userVer,
+        Map<UUID, IgniteUuid> ldrParticipants,
+        IgniteUuid clsLdrId,
+        boolean forceLocDep) {
+        this.reqId = reqId;
+        this.resTopicBytes = resTopicBytes;
+        this.cacheName = cacheName;
+        this.updaterBytes = updaterBytes;
+        this.colBytes = colBytes;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.depMode = depMode;
+        this.sampleClsName = sampleClsName;
+        this.userVer = userVer;
+        this.ldrParticipants = ldrParticipants;
+        this.clsLdrId = clsLdrId;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Response topic.
+     */
+    public byte[] responseTopicBytes() {
+        return resTopicBytes;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * @return Updater.
+     */
+    public byte[] updaterBytes() {
+        return updaterBytes;
+    }
+
+    /**
+     * @return Collection bytes.
+     */
+    public byte[] collectionBytes() {
+        return colBytes;
+    }
+
+    /**
+     * @return {@code True} to ignore ownership.
+     */
+    public boolean ignoreDeploymentOwnership() {
+        return ignoreDepOwnership;
+    }
+
+    /**
+     * @return Skip store flag.
+     */
+    public boolean skipStore() {
+        return skipStore;
+    }
+
+    /**
+     * @return Deployment mode.
+     */
+    public IgniteDeploymentMode deploymentMode() {
+        return depMode;
+    }
+
+    /**
+     * @return Sample class name.
+     */
+    public String sampleClassName() {
+        return sampleClsName;
+    }
+
+    /**
+     * @return User version.
+     */
+    public String userVersion() {
+        return userVer;
+    }
+
+    /**
+     * @return Participants.
+     */
+    public Map<UUID, IgniteUuid> participants() {
+        return ldrParticipants;
+    }
+
+    /**
+     * @return Class loader ID.
+     */
+    public IgniteUuid classLoaderId() {
+        return clsLdrId;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDataLoadRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 0:
+                if (!commState.putString(cacheName))
+                    return false;
+
+                commState.idx++;
+
+            case 1:
+                if (!commState.putGridUuid(clsLdrId))
+                    return false;
+
+                commState.idx++;
+
+            case 2:
+                if (!commState.putByteArray(colBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 3:
+                if (!commState.putEnum(depMode))
+                    return false;
+
+                commState.idx++;
+
+            case 4:
+                if (!commState.putBoolean(forceLocDep))
+                    return false;
+
+                commState.idx++;
+
+            case 5:
+                if (!commState.putBoolean(ignoreDepOwnership))
+                    return false;
+
+                commState.idx++;
+
+            case 6:
+                if (ldrParticipants != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(ldrParticipants.size()))
+                            return false;
+
+                        commState.it = ldrParticipants.entrySet().iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        Map.Entry<UUID, IgniteUuid> e = (Map.Entry<UUID, 
IgniteUuid>)commState.cur;
+
+                        if (!commState.keyDone) {
+                            if (!commState.putUuid(e.getKey()))
+                                return false;
+
+                            commState.keyDone = true;
+                        }
+
+                        if (!commState.putGridUuid(e.getValue()))
+                            return false;
+
+                        commState.keyDone = false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 7:
+                if (!commState.putLong(reqId))
+                    return false;
+
+                commState.idx++;
+
+            case 8:
+                if (!commState.putByteArray(resTopicBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 9:
+                if (!commState.putString(sampleClsName))
+                    return false;
+
+                commState.idx++;
+
+            case 10:
+                if (!commState.putBoolean(skipStore))
+                    return false;
+
+                commState.idx++;
+
+            case 11:
+                if (!commState.putByteArray(updaterBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 12:
+                if (!commState.putString(userVer))
+                    return false;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        switch (commState.idx) {
+            case 0:
+                String cacheName0 = commState.getString();
+
+                if (cacheName0 == STR_NOT_READ)
+                    return false;
+
+                cacheName = cacheName0;
+
+                commState.idx++;
+
+            case 1:
+                IgniteUuid clsLdrId0 = commState.getGridUuid();
+
+                if (clsLdrId0 == GRID_UUID_NOT_READ)
+                    return false;
+
+                clsLdrId = clsLdrId0;
+
+                commState.idx++;
+
+            case 2:
+                byte[] colBytes0 = commState.getByteArray();
+
+                if (colBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                colBytes = colBytes0;
+
+                commState.idx++;
+
+            case 3:
+                if (buf.remaining() < 1)
+                    return false;
+
+                byte depMode0 = commState.getByte();
+
+                depMode = IgniteDeploymentMode.fromOrdinal(depMode0);
+
+                commState.idx++;
+
+            case 4:
+                if (buf.remaining() < 1)
+                    return false;
+
+                forceLocDep = commState.getBoolean();
+
+                commState.idx++;
+
+            case 5:
+                if (buf.remaining() < 1)
+                    return false;
+
+                ignoreDepOwnership = commState.getBoolean();
+
+                commState.idx++;
+
+            case 6:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (ldrParticipants == null)
+                        ldrParticipants = new HashMap<>(commState.readSize, 
1.0f);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        if (!commState.keyDone) {
+                            UUID _val = commState.getUuid();
+
+                            if (_val == UUID_NOT_READ)
+                                return false;
+
+                            commState.cur = _val;
+                            commState.keyDone = true;
+                        }
+
+                        IgniteUuid _val = commState.getGridUuid();
+
+                        if (_val == GRID_UUID_NOT_READ)
+                            return false;
+
+                        ldrParticipants.put((UUID)commState.cur, _val);
+
+                        commState.keyDone = false;
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+                commState.cur = null;
+
+                commState.idx++;
+
+            case 7:
+                if (buf.remaining() < 8)
+                    return false;
+
+                reqId = commState.getLong();
+
+                commState.idx++;
+
+            case 8:
+                byte[] resTopicBytes0 = commState.getByteArray();
+
+                if (resTopicBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                resTopicBytes = resTopicBytes0;
+
+                commState.idx++;
+
+            case 9:
+                String sampleClsName0 = commState.getString();
+
+                if (sampleClsName0 == STR_NOT_READ)
+                    return false;
+
+                sampleClsName = sampleClsName0;
+
+                commState.idx++;
+
+            case 10:
+                if (buf.remaining() < 1)
+                    return false;
+
+                skipStore = commState.getBoolean();
+
+                commState.idx++;
+
+            case 11:
+                byte[] updaterBytes0 = commState.getByteArray();
+
+                if (updaterBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                updaterBytes = updaterBytes0;
+
+                commState.idx++;
+
+            case 12:
+                String userVer0 = commState.getString();
+
+                if (userVer0 == STR_NOT_READ)
+                    return false;
+
+                userVer = userVer0;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 61;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridDataLoadRequest _clone = new GridDataLoadRequest();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        GridDataLoadRequest _clone = (GridDataLoadRequest)_msg;
+
+        _clone.reqId = reqId;
+        _clone.resTopicBytes = resTopicBytes;
+        _clone.cacheName = cacheName;
+        _clone.updaterBytes = updaterBytes;
+        _clone.colBytes = colBytes;
+        _clone.ignoreDepOwnership = ignoreDepOwnership;
+        _clone.skipStore = skipStore;
+        _clone.depMode = depMode;
+        _clone.sampleClsName = sampleClsName;
+        _clone.userVer = userVer;
+        _clone.ldrParticipants = ldrParticipants;
+        _clone.clsLdrId = clsLdrId;
+        _clone.forceLocDep = forceLocDep;
+    }
+}

Reply via email to