http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0000000,6fb78b8..c349b55
mode 000000,100644..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@@ -1,0 -1,1846 +1,1845 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.continuous;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.thread.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.locks.*;
+
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ import static
org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.*;
+
+ /**
+ * Processor for continuous routines.
+ */
+ public class GridContinuousProcessor extends GridProcessorAdapter {
+ /** Local infos. */
+ private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new
ConcurrentHashMap8<>();
+
+ /** Remote infos. */
+ private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new
ConcurrentHashMap8<>();
+
+ /** Start futures. */
+ private final ConcurrentMap<UUID, StartFuture> startFuts = new
ConcurrentHashMap8<>();
+
+ /** Start ack wait lists. */
+ private final ConcurrentMap<UUID, Collection<UUID>> waitForStartAck = new
ConcurrentHashMap8<>();
+
+ /** Stop futures. */
+ private final ConcurrentMap<UUID, StopFuture> stopFuts = new
ConcurrentHashMap8<>();
+
+ /** Stop ack wait lists. */
+ private final ConcurrentMap<UUID, Collection<UUID>> waitForStopAck = new
ConcurrentHashMap8<>();
+
+ /** Threads started by this processor. */
+ private final Collection<IgniteThread> threads = new
GridConcurrentHashSet<>();
+
+ /** Pending start requests. */
+ private final Map<UUID, Collection<GridContinuousMessage>> pending = new
HashMap<>();
+
+ /** */
+ private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts
= new ConcurrentHashMap8<>();
+
+ /** Stopped IDs. */
+ private final Collection<UUID> stopped = new HashSet<>();
+
+ /** Lock for pending requests. */
+ private final Lock pendingLock = new ReentrantLock();
+
+ /** Lock for stop process. */
+ private final Lock stopLock = new ReentrantLock();
+
+ /** Delay in milliseconds between retries. */
+ private long retryDelay = 1000;
+
+ /** Number of retries using to send messages. */
+ private int retryCnt = 3;
+
+ /** Acknowledgement timeout. */
+ private long ackTimeout;
+
+ /** Marshaller. */
+ private IgniteMarshaller marsh;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public GridContinuousProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (ctx.config().isDaemon())
+ return;
+
+ retryDelay = ctx.config().getNetworkSendRetryDelay();
+ retryCnt = ctx.config().getNetworkSendRetryCount();
+ ackTimeout = ctx.config().getNetworkTimeout();
+
+ if (ackTimeout < retryDelay * retryCnt) {
+ U.warn(log, "Acknowledgement timeout for continuous operations is
less than message send " +
+ "retry delay multiplied by retries count (will increase
timeout value) [ackTimeout=" +
+ ackTimeout + ", retryDelay=" + retryDelay + ", retryCnt=" +
retryCnt + ']');
+
+ ackTimeout = retryDelay * retryCnt;
+ }
+
+ marsh = ctx.config().getMarshaller();
+
+ ctx.event().addLocalEventListener(new GridLocalEventListener() {
+ @SuppressWarnings({"fallthrough", "TooBroadScope"})
+ @Override public void onEvent(IgniteEvent evt) {
+ assert evt instanceof IgniteDiscoveryEvent;
+
+ UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id();
+
+ Collection<GridContinuousMessage> reqs;
+
+ pendingLock.lock();
+
+ try {
+ // Remove pending requests to send to joined node
+ // (if node is left or failed, they are dropped).
+ reqs = pending.remove(nodeId);
+ }
+ finally {
+ pendingLock.unlock();
+ }
+
+ switch (evt.type()) {
+ case EVT_NODE_JOINED:
+ if (reqs != null) {
+ UUID routineId = null;
+
+ // Send pending requests.
+ try {
+ for (GridContinuousMessage req : reqs) {
+ routineId = req.routineId();
+
+ sendWithRetries(nodeId, req, null);
+ }
+ }
+ catch (ClusterTopologyException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send pending start
request to node (is node alive?): " +
+ nodeId);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send pending start
request to node: " + nodeId, e);
+
+ completeStartFuture(routineId);
+ }
+ }
+
+ break;
+
+ case EVT_NODE_LEFT:
+ case EVT_NODE_FAILED:
+ // Do not wait for start acknowledgements from left
node.
+ for (Map.Entry<UUID, Collection<UUID>> e :
waitForStartAck.entrySet()) {
+ Collection<UUID> nodeIds = e.getValue();
+
+ for (Iterator<UUID> it = nodeIds.iterator();
it.hasNext();) {
+ if (nodeId.equals(it.next())) {
+ it.remove();
+
+ break;
+ }
+ }
+
+ if (nodeIds.isEmpty())
+ completeStartFuture(e.getKey());
+ }
+
+ // Do not wait for stop acknowledgements from left
node.
+ for (Map.Entry<UUID, Collection<UUID>> e :
waitForStopAck.entrySet()) {
+ Collection<UUID> nodeIds = e.getValue();
+
+ for (Iterator<UUID> it = nodeIds.iterator();
it.hasNext();) {
+ if (nodeId.equals(it.next())) {
+ it.remove();
+
+ break;
+ }
+ }
+
+ if (nodeIds.isEmpty())
+ completeStopFuture(e.getKey());
+ }
+
+ // Unregister handlers created by left node.
+ for (Map.Entry<UUID, RemoteRoutineInfo> e :
rmtInfos.entrySet()) {
+ UUID routineId = e.getKey();
+ RemoteRoutineInfo info = e.getValue();
+
+ if (info.autoUnsubscribe &&
nodeId.equals(info.nodeId))
+ unregisterRemote(routineId);
+ }
+
+ for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e :
syncMsgFuts.entrySet()) {
+ SyncMessageAckFuture fut = e.getValue();
+
+ if (fut.nodeId().equals(nodeId)) {
+ SyncMessageAckFuture fut0 =
syncMsgFuts.remove(e.getKey());
+
+ if (fut0 != null) {
+ ClusterTopologyException err = new
ClusterTopologyException(
+ "Node left grid while sending message
to: " + nodeId);
+
+ fut0.onDone(err);
+ }
+ }
+ }
+
+ break;
+
+ default:
+ assert false : "Unexpected event received: " +
evt.shortDisplay();
+ }
+ }
+ }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+ ctx.io().addMessageListener(TOPIC_CONTINUOUS, new
GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object obj) {
+ GridContinuousMessage msg = (GridContinuousMessage)obj;
+
+ if (msg.data() == null && msg.dataBytes() != null) {
+ try {
+ msg.data(marsh.unmarshal(msg.dataBytes(), null));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process message (ignoring): "
+ msg, e);
+
+ return;
+ }
+ }
+
+ switch (msg.type()) {
+ case MSG_START_REQ:
+ processStartRequest(nodeId, msg);
+
+ break;
+
+ case MSG_START_ACK:
+ processStartAck(nodeId, msg);
+
+ break;
+
+ case MSG_STOP_REQ:
+ processStopRequest(nodeId, msg);
+
+ break;
+
+ case MSG_STOP_ACK:
+ processStopAck(nodeId, msg);
+
+ break;
+
+ case MSG_EVT_NOTIFICATION:
+ processNotification(nodeId, msg);
+
+ break;
+
+ case MSG_EVT_ACK:
+ processMessageAck(msg);
+
+ break;
+
+ default:
+ assert false : "Unexpected message received: " +
msg.type();
+ }
+ }
+ });
+
+ if (log.isDebugEnabled())
+ log.debug("Continuous processor started.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ if (ctx.config().isDaemon())
+ return;
+
+ ctx.io().removeMessageListener(TOPIC_CONTINUOUS);
+
+ U.interrupt(threads);
+ U.joinThreads(threads, log);
+
+ if (log.isDebugEnabled())
+ log.debug("Continuous processor stopped.");
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Object collectDiscoveryData(UUID nodeId) {
+ if (!nodeId.equals(ctx.localNodeId())) {
+ pendingLock.lock();
+
+ try {
+ // Create empty pending set.
+ pending.put(nodeId, new HashSet<GridContinuousMessage>());
+
+ DiscoveryData data = new DiscoveryData(ctx.localNodeId());
+
+ // Collect listeners information (will be sent to
+ // joining node during discovery process).
+ for (Map.Entry<UUID, LocalRoutineInfo> e :
locInfos.entrySet()) {
+ UUID routineId = e.getKey();
+ LocalRoutineInfo info = e.getValue();
+
+ data.addItem(new DiscoveryDataItem(routineId,
info.prjPred,
+ info.hnd, info.bufSize, info.interval));
+ }
+
+ return data;
+ }
+ finally {
+ pendingLock.unlock();
+ }
+ }
+ else
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDiscoveryDataReceived(Object obj) {
+ DiscoveryData data = (DiscoveryData)obj;
+
+ if (!ctx.isDaemon() && data != null) {
+ for (DiscoveryDataItem item : data.items) {
+ // Register handler only if local node passes projection
predicate.
+ if (item.prjPred == null ||
item.prjPred.apply(ctx.discovery().localNode())) {
+ try {
+ if (ctx.config().isPeerClassLoadingEnabled())
+ item.hnd.p2pUnmarshal(data.nodeId, ctx);
+
+ if (registerHandler(data.nodeId, item.routineId,
item.hnd, item.bufSize, item.interval,
+ item.autoUnsubscribe, false))
+ item.hnd.onListenerRegistered(item.routineId,
ctx);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to register continuous
handler.", e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @param hnd Handler.
+ * @param bufSize Buffer size.
+ * @param interval Time interval.
+ * @param autoUnsubscribe Automatic unsubscribe flag.
+ * @param prjPred Projection predicate.
+ * @return Future.
+ */
+ @SuppressWarnings("TooBroadScope")
+ public IgniteFuture<UUID> startRoutine(GridContinuousHandler hnd,
+ int bufSize,
+ long interval,
+ boolean autoUnsubscribe,
+ @Nullable IgnitePredicate<ClusterNode> prjPred) {
+ assert hnd != null;
+ assert bufSize > 0;
+ assert interval >= 0;
+
+ // Whether local node is included in routine.
+ boolean locIncluded = prjPred == null ||
prjPred.apply(ctx.discovery().localNode());
+
+ // Generate ID.
+ final UUID routineId = UUID.randomUUID();
+
+ StartRequestData reqData = new StartRequestData(prjPred, hnd,
bufSize, interval, autoUnsubscribe);
+
+ try {
+ if (ctx.config().isPeerClassLoadingEnabled()) {
+ // Handle peer deployment for projection predicate.
+ if (prjPred != null && !U.isGrid(prjPred.getClass())) {
+ Class cls = U.detectClass(prjPred);
+
+ String clsName = cls.getName();
+
+ GridDeployment dep = ctx.deploy().deploy(cls,
U.detectClassLoader(cls));
+
+ if (dep == null)
+ throw new IgniteDeploymentException("Failed to deploy
projection predicate: " + prjPred);
+
+ reqData.clsName = clsName;
+ reqData.depInfo = new GridDeploymentInfoBean(dep);
+
+ reqData.p2pMarshal(marsh);
+ }
+
+ // Handle peer deployment for other handler-specific objects.
+ hnd.p2pMarshal(ctx);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(ctx, e);
+ }
+
+ // Register per-routine notifications listener if ordered messaging
is used.
+ if (hnd.orderedTopic() != null) {
+ ctx.io().addMessageListener(hnd.orderedTopic(), new
GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object obj) {
+ GridContinuousMessage msg = (GridContinuousMessage)obj;
+
+ // Only notification can be ordered.
+ assert msg.type() == MSG_EVT_NOTIFICATION;
+
+ if (msg.data() == null && msg.dataBytes() != null) {
+ try {
+ msg.data(marsh.unmarshal(msg.dataBytes(), null));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process message
(ignoring): " + msg, e);
+
+ return;
+ }
+ }
+
+ processNotification(nodeId, msg);
+ }
+ });
+ }
+
+ Collection<? extends ClusterNode> nodes;
+ Collection<UUID> nodeIds;
+
+ pendingLock.lock();
+
+ try {
+ // Nodes that participate in routine (request will be sent to
these nodes directly).
+ nodes = F.view(ctx.discovery().allNodes(), F.and(prjPred,
F.remoteNodes(ctx.localNodeId())));
+
+ // Stop with exception if projection is empty.
+ if (nodes.isEmpty() && !locIncluded) {
+ return new GridFinishedFuture<>(ctx,
+ new ClusterTopologyException("Failed to register remote
continuous listener (projection is empty)."));
+ }
+
+ // IDs of nodes where request will be sent.
+ nodeIds = new GridConcurrentHashSet<>(F.viewReadOnly(nodes,
F.node2id()));
+
+ // If there are currently joining nodes, add request to their
pending lists.
+ // Node IDs set is updated to make sure that we wait for
acknowledgement from
+ // these nodes.
+ for (Map.Entry<UUID, Collection<GridContinuousMessage>> e :
pending.entrySet()) {
+ if (nodeIds.add(e.getKey()))
+ e.getValue().add(new GridContinuousMessage(MSG_START_REQ,
routineId, null, reqData));
+ }
+
+ // Register routine locally.
+ locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd,
bufSize, interval));
+ }
+ finally {
+ pendingLock.unlock();
+ }
+
+ StartFuture fut = new StartFuture(ctx, routineId);
+
+ if (!nodeIds.isEmpty()) {
+ // Wait for acknowledgements.
+ waitForStartAck.put(routineId, nodeIds);
+
+ startFuts.put(routineId, fut);
+
+ // Register acknowledge timeout (timeout object will be removed
when
+ // future is completed).
+ fut.addTimeoutObject(new GridTimeoutObjectAdapter(ackTimeout) {
+ @Override public void onTimeout() {
+ // Stop waiting for acknowledgements.
+ Collection<UUID> ids = waitForStartAck.remove(routineId);
+
+ if (ids != null) {
+ StartFuture f = startFuts.remove(routineId);
+
+ assert f != null;
+
+ // If there are still nodes without acknowledgements,
+ // Stop routine with exception. Continue and complete
+ // future otherwise.
+ if (!ids.isEmpty()) {
+ f.onDone(new IgniteCheckedException("Failed to
get start acknowledgement from nodes (timeout " +
+ "expired): " + ids + ". Will unregister all
continuous listeners."));
+
+ stopRoutine(routineId);
+ }
+ else
+ f.onRemoteRegistered();
+ }
+ }
+ });
+ }
+
+ if (!nodes.isEmpty()) {
+ // Do not send projection predicate (nodes already filtered).
+ reqData.prjPred = null;
+
+ // Send start requests.
+ try {
+ GridContinuousMessage req = new
GridContinuousMessage(MSG_START_REQ, routineId, null, reqData);
+
+ sendWithRetries(nodes, req, null);
+ }
+ catch (IgniteCheckedException e) {
+ startFuts.remove(routineId);
+ waitForStartAck.remove(routineId);
+
+ fut.onDone(e);
+
+ stopRoutine(routineId);
+
+ locIncluded = false;
+ }
+ }
+ else {
+ // There are no remote nodes, but we didn't throw topology
exception.
+ assert locIncluded;
+
+ // Do not wait anything from remote nodes.
+ fut.onRemoteRegistered();
+ }
+
+ // Register local handler if needed.
+ if (locIncluded) {
+ try {
+ if (registerHandler(ctx.localNodeId(), routineId, hnd,
bufSize, interval, autoUnsubscribe, true))
+ hnd.onListenerRegistered(routineId, ctx);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(ctx,
+ new IgniteCheckedException("Failed to register handler
locally: " + hnd, e));
+ }
+ }
+
+ // Handler is registered locally.
+ fut.onLocalRegistered();
+
+ return fut;
+ }
+
+ /**
+ * @param routineId Consume ID.
+ * @return Future.
+ */
+ public IgniteFuture<?> stopRoutine(UUID routineId) {
+ assert routineId != null;
+
+ boolean doStop = false;
+
+ StopFuture fut = stopFuts.get(routineId);
+
+ // Only one thread will stop routine with provided ID.
+ if (fut == null) {
+ StopFuture old = stopFuts.putIfAbsent(routineId, fut = new
StopFuture(ctx));
+
+ if (old != null)
+ fut = old;
+ else
+ doStop = true;
+ }
+
+ if (doStop) {
+ // Unregister routine locally.
+ LocalRoutineInfo routine = locInfos.remove(routineId);
+
+ // Finish if routine is not found (wrong ID is provided).
+ if (routine == null) {
+ stopFuts.remove(routineId);
+
+ fut.onDone();
+
+ return fut;
+ }
+
+ // Unregister handler locally.
+ unregisterHandler(routineId, routine.hnd, true);
+
+ pendingLock.lock();
+
+ try {
+ // Remove pending requests for this routine.
+ for (Collection<GridContinuousMessage> msgs :
pending.values()) {
+ Iterator<GridContinuousMessage> it = msgs.iterator();
+
+ while (it.hasNext()) {
+ if (it.next().routineId().equals(routineId))
+ it.remove();
+ }
+ }
+ }
+ finally {
+ pendingLock.unlock();
+ }
+
+ // Nodes where to send stop requests.
+ Collection<? extends ClusterNode> nodes =
F.view(ctx.discovery().allNodes(),
+ F.and(routine.prjPred, F.remoteNodes(ctx.localNodeId())));
+
+ if (!nodes.isEmpty()) {
+ // Wait for acknowledgements.
+ waitForStopAck.put(routineId, new
GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id())));
+
+ // Register acknowledge timeout (timeout object will be
removed when
+ // future is completed).
+ fut.addTimeoutObject(new StopTimeoutObject(ackTimeout,
routineId,
+ new GridContinuousMessage(MSG_STOP_REQ, routineId, null,
null)));
+
+ // Send stop requests.
+ try {
+ for (ClusterNode node : nodes) {
+ try {
+ sendWithRetries(node.id(),
+ new GridContinuousMessage(MSG_STOP_REQ,
routineId, null, null),
+ null);
+ }
+ catch (ClusterTopologyException ignored) {
+ U.warn(log, "Failed to send stop request (node
left topology): " + node.id());
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ stopFuts.remove(routineId);
+ waitForStopAck.remove(routineId);
+
+ fut.onDone(e);
+ }
+ }
+ else {
+ stopFuts.remove(routineId);
+
+ fut.onDone();
+ }
+ }
+
+ return fut;
+ }
+
+ /**
+ * @param nodeId ID of the node that started routine.
+ * @param routineId Routine ID.
+ * @param obj Notification object.
+ * @param orderedTopic Topic for ordered notifications. If {@code null},
non-ordered message will be sent.
+ * @param sync If {@code true} then waits for event acknowledgment.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void addNotification(UUID nodeId,
+ UUID routineId,
+ @Nullable Object obj,
+ @Nullable Object orderedTopic,
+ boolean sync)
+ throws IgniteCheckedException {
+ assert nodeId != null;
+ assert routineId != null;
+
+ assert !nodeId.equals(ctx.localNodeId());
+
+ RemoteRoutineInfo info = rmtInfos.get(routineId);
+
+ if (info != null) {
+ assert info.interval == 0 || !sync;
+
+ if (sync) {
+ SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx,
nodeId);
+
+ IgniteUuid futId = IgniteUuid.randomUuid();
+
+ syncMsgFuts.put(futId, fut);
+
+ try {
+ sendNotification(nodeId, routineId, futId, F.asList(obj),
orderedTopic);
+ }
+ catch (IgniteCheckedException e) {
+ syncMsgFuts.remove(futId);
+
+ throw e;
+ }
+
+ fut.get();
+ }
+ else {
+ Collection<Object> toSnd = info.add(obj);
+
+ if (toSnd != null)
+ sendNotification(nodeId, routineId, null, toSnd,
orderedTopic);
+ }
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param routineId Routine ID.
+ * @param futId Future ID.
+ * @param toSnd Notification object to send.
+ * @param orderedTopic Topic for ordered notifications.
+ * If {@code null}, non-ordered message will be sent.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void sendNotification(UUID nodeId,
+ UUID routineId,
+ @Nullable IgniteUuid futId,
+ Collection<Object> toSnd,
+ @Nullable Object orderedTopic) throws IgniteCheckedException {
+ assert nodeId != null;
+ assert routineId != null;
+ assert toSnd != null;
+ assert !toSnd.isEmpty();
+
+ sendWithRetries(nodeId, new
GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd),
orderedTopic);
+ }
+
+ /**
+ * @param nodeId Sender ID.
+ * @param req Start request.
+ */
+ private void processStartRequest(UUID nodeId, GridContinuousMessage req) {
+ assert nodeId != null;
+ assert req != null;
+
+ UUID routineId = req.routineId();
+ StartRequestData data = req.data();
+
+ GridContinuousHandler hnd = data.hnd;
+
+ IgniteCheckedException err = null;
+
+ try {
+ if (ctx.config().isPeerClassLoadingEnabled()) {
+ String clsName = data.clsName;
+
+ if (clsName != null) {
+ GridDeploymentInfo depInfo = data.depInfo;
+
+ GridDeployment dep =
ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+ depInfo.userVersion(), nodeId,
depInfo.classLoaderId(), depInfo.participants(), null);
+
+ if (dep == null)
+ throw new IgniteDeploymentException("Failed to obtain
deployment for class: " + clsName);
+
+ data.p2pUnmarshal(marsh, dep.classLoader());
+ }
+
+ hnd.p2pUnmarshal(nodeId, ctx);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+
+ U.error(log, "Failed to register handler [nodeId=" + nodeId + ",
routineId=" + routineId + ']', e);
+ }
+
+ boolean registered = false;
+
+ if (err == null) {
+ try {
+ IgnitePredicate<ClusterNode> prjPred = data.prjPred;
+
+ if (prjPred == null ||
prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) {
+ registered = registerHandler(nodeId, routineId, hnd,
data.bufSize, data.interval,
+ data.autoUnsubscribe, false);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+
+ U.error(log, "Failed to register handler [nodeId=" + nodeId +
", routineId=" + routineId + ']', e);
+ }
+ }
+
+ try {
+ sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK,
routineId, null, err), null);
+ }
+ catch (ClusterTopologyException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send start acknowledgement to node (is
node alive?): " + nodeId);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send start acknowledgement to node: " +
nodeId, e);
+ }
+
+ if (registered)
+ hnd.onListenerRegistered(routineId, ctx);
+ }
+
+ /**
+ * @param nodeId Sender ID.
+ * @param ack Start acknowledgement.
+ */
+ private void processStartAck(UUID nodeId, GridContinuousMessage ack) {
+ assert nodeId != null;
+ assert ack != null;
+
+ UUID routineId = ack.routineId();
+
+ final IgniteCheckedException err = ack.data();
+
+ if (err != null) {
+ if (waitForStartAck.remove(routineId) != null) {
+ final StartFuture fut = startFuts.remove(routineId);
+
+ if (fut != null) {
+ fut.onDone(err);
+
+ stopRoutine(routineId);
+ }
+ }
+ }
+
+ Collection<UUID> nodeIds = waitForStartAck.get(routineId);
+
+ if (nodeIds != null) {
+ nodeIds.remove(nodeId);
+
+ if (nodeIds.isEmpty())
+ completeStartFuture(routineId);
+ }
+ }
+
+ /**
+ * @param nodeId Sender ID.
+ * @param req Stop request.
+ */
+ private void processStopRequest(UUID nodeId, GridContinuousMessage req) {
+ assert nodeId != null;
+ assert req != null;
+
+ UUID routineId = req.routineId();
+
+ unregisterRemote(routineId);
+
+ try {
+ sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK,
routineId, null, null), null);
+ }
+ catch (ClusterTopologyException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send stop acknowledgement to node (is
node alive?): " + nodeId);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send stop acknowledgement to node: " +
nodeId, e);
+ }
+ }
+
+ /**
+ * @param nodeId Sender ID.
+ * @param ack Stop acknowledgement.
+ */
+ private void processStopAck(UUID nodeId, GridContinuousMessage ack) {
+ assert nodeId != null;
+ assert ack != null;
+
+ UUID routineId = ack.routineId();
+
+ Collection<UUID> nodeIds = waitForStopAck.get(routineId);
+
+ if (nodeIds != null) {
+ nodeIds.remove(nodeId);
+
+ if (nodeIds.isEmpty())
+ completeStopFuture(routineId);
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processMessageAck(GridContinuousMessage msg) {
+ assert msg.futureId() != null;
+
+ SyncMessageAckFuture fut = syncMsgFuts.remove(msg.futureId());
+
+ if (fut != null)
+ fut.onDone();
+ }
+
+ /**
+ * @param nodeId Sender ID.
+ * @param msg Message.
+ */
+ private void processNotification(UUID nodeId, GridContinuousMessage msg) {
+ assert nodeId != null;
+ assert msg != null;
+
+ UUID routineId = msg.routineId();
+
+ try {
+ LocalRoutineInfo routine = locInfos.get(routineId);
+
+ if (routine != null)
+ routine.hnd.notifyCallback(nodeId, routineId,
(Collection<?>)msg.data(), ctx);
+ }
+ finally {
+ if (msg.futureId() != null) {
+ try {
+ sendWithRetries(nodeId,
+ new GridContinuousMessage(MSG_EVT_ACK, null,
msg.futureId(), null),
+ null);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to send event acknowledgment to node: "
+ nodeId, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param routineId Consume ID.
+ */
+ private void completeStartFuture(UUID routineId) {
+ assert routineId != null;
+
+ if (waitForStartAck.remove(routineId) != null) {
+ StartFuture fut = startFuts.remove(routineId);
+
+ assert fut != null;
+
+ fut.onRemoteRegistered();
+ }
+ }
+
+ /**
+ * @param routineId Consume ID.
+ */
+ private void completeStopFuture(UUID routineId) {
+ assert routineId != null;
+
+ if (waitForStopAck.remove(routineId) != null) {
+ GridFutureAdapter <?> fut = stopFuts.remove(routineId);
+
+ assert fut != null;
+
+ fut.onDone();
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param routineId Consume ID.
+ * @param hnd Handler.
+ * @param bufSize Buffer size.
+ * @param interval Time interval.
+ * @param autoUnsubscribe Automatic unsubscribe flag.
+ * @param loc Local registration flag.
+ * @return Whether listener was actually registered.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private boolean registerHandler(final UUID nodeId,
+ final UUID routineId,
+ final GridContinuousHandler hnd,
+ int bufSize,
+ final long interval,
+ boolean autoUnsubscribe,
+ boolean loc) throws IgniteCheckedException {
+ assert nodeId != null;
+ assert routineId != null;
+ assert hnd != null;
+ assert bufSize > 0;
+ assert interval >= 0;
+
+ final RemoteRoutineInfo info = new RemoteRoutineInfo(nodeId, hnd,
bufSize, interval, autoUnsubscribe);
+
+ boolean doRegister = loc;
+
+ if (!doRegister) {
+ stopLock.lock();
+
+ try {
+ doRegister = !stopped.remove(routineId) &&
rmtInfos.putIfAbsent(routineId, info) == null;
+ }
+ finally {
+ stopLock.unlock();
+ }
+ }
+
+ if (doRegister) {
+ if (interval > 0) {
+ IgniteThread checker = new IgniteThread(new
GridWorker(ctx.gridName(), "continuous-buffer-checker", log) {
+ @SuppressWarnings("ConstantConditions")
+ @Override protected void body() {
+ long interval0 = interval;
+
+ while (!isCancelled()) {
+ try {
+ U.sleep(interval0);
+ }
+ catch (IgniteInterruptedException ignored) {
+ break;
+ }
+
+ IgniteBiTuple<Collection<Object>, Long> t =
info.checkInterval();
+
+ Collection<Object> toSnd = t.get1();
+
+ if (toSnd != null) {
+ try {
+ sendNotification(nodeId, routineId, null,
toSnd, hnd.orderedTopic());
+ }
+ catch (ClusterTopologyException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send
notification to node (is node alive?): " + nodeId);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send notification
to node: " + nodeId, e);
+ }
+ }
+
+ interval0 = t.get2();
+ }
+ }
+ });
+
+ threads.add(checker);
+
+ checker.start();
+ }
+
+ return hnd.register(nodeId, routineId, ctx);
+ }
+
+ return false;
+ }
+
+ /**
+ * @param routineId Routine ID.
+ * @param hnd Handler
+ * @param loc If Handler unregistered on master node.
+ */
+ private void unregisterHandler(UUID routineId, GridContinuousHandler hnd,
boolean loc) {
+ assert routineId != null;
+ assert hnd != null;
+
+ if (loc && hnd.orderedTopic() != null)
+ ctx.io().removeMessageListener(hnd.orderedTopic());
+
+ hnd.unregister(routineId, ctx);
+ }
+
+ /**
+ * @param routineId Routine ID.
+ */
+ @SuppressWarnings("TooBroadScope")
+ private void unregisterRemote(UUID routineId) {
+ RemoteRoutineInfo info;
+
+ stopLock.lock();
+
+ try {
+ info = rmtInfos.remove(routineId);
+
+ if (info == null)
+ stopped.add(routineId);
+ }
+ finally {
+ stopLock.unlock();
+ }
+
+ if (info != null)
+ unregisterHandler(routineId, info.hnd, false);
+ }
+
+ /**
+ * @param nodeId Destination node ID.
+ * @param msg Message.
+ * @param orderedTopic Topic for ordered notifications.
+ * If {@code null}, non-ordered message will be sent.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void sendWithRetries(UUID nodeId, GridContinuousMessage msg,
@Nullable Object orderedTopic)
+ throws IgniteCheckedException {
+ assert nodeId != null;
+ assert msg != null;
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node != null)
+ sendWithRetries(node, msg, orderedTopic);
+ else
+ throw new ClusterTopologyException("Node for provided ID doesn't
exist (did it leave the grid?): " + nodeId);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param msg Message.
+ * @param orderedTopic Topic for ordered notifications.
+ * If {@code null}, non-ordered message will be sent.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void sendWithRetries(ClusterNode node, GridContinuousMessage msg,
@Nullable Object orderedTopic)
+ throws IgniteCheckedException {
+ assert node != null;
+ assert msg != null;
+
+ sendWithRetries(F.asList(node), msg, orderedTopic);
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param msg Message.
+ * @param orderedTopic Topic for ordered notifications.
+ * If {@code null}, non-ordered message will be sent.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void sendWithRetries(Collection<? extends ClusterNode> nodes,
GridContinuousMessage msg,
+ @Nullable Object orderedTopic) throws IgniteCheckedException {
+ assert !F.isEmpty(nodes);
+ assert msg != null;
+
+ if (msg.data() != null && (nodes.size() > 1 ||
!ctx.localNodeId().equals(F.first(nodes).id())))
+ msg.dataBytes(marsh.marshal(msg.data()));
+
+ boolean first = true;
+
+ for (ClusterNode node : nodes) {
+ msg = first ? msg : (GridContinuousMessage)msg.clone();
+
+ first = false;
+
+ int cnt = 0;
+
+ while (cnt <= retryCnt) {
+ try {
+ cnt++;
+
+ if (orderedTopic != null) {
+ ctx.io().sendOrderedMessage(
+ node,
+ orderedTopic,
- ctx.io().nextMessageId(orderedTopic, node.id()),
+ msg,
+ SYSTEM_POOL,
+ 0,
+ true);
+ }
+ else
+ ctx.io().send(node, TOPIC_CONTINUOUS, msg,
SYSTEM_POOL);
+
+ break;
+ }
+ catch (IgniteInterruptedException e) {
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ if (!ctx.discovery().alive(node.id()))
+ throw new ClusterTopologyException("Node left grid
while sending message to: " + node.id(), e);
+
+ if (cnt == retryCnt)
+ throw e;
+ else if (log.isDebugEnabled())
+ log.debug("Failed to send message to node (will
retry): " + node.id());
+ }
+
+ U.sleep(retryDelay);
+ }
+ }
+ }
+
+ /**
+ * Local routine info.
+ */
+ @SuppressWarnings("PackageVisibleInnerClass")
+ static class LocalRoutineInfo {
+ /** Projection predicate. */
+ private final IgnitePredicate<ClusterNode> prjPred;
+
+ /** Continuous routine handler. */
+ private final GridContinuousHandler hnd;
+
+ /** Buffer size. */
+ private final int bufSize;
+
+ /** Time interval. */
+ private final long interval;
+
+ /**
+ * @param prjPred Projection predicate.
+ * @param hnd Continuous routine handler.
+ * @param bufSize Buffer size.
+ * @param interval Interval.
+ */
+ LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred,
GridContinuousHandler hnd, int bufSize,
+ long interval) {
+ assert hnd != null;
+ assert bufSize > 0;
+ assert interval >= 0;
+
+ this.prjPred = prjPred;
+ this.hnd = hnd;
+ this.bufSize = bufSize;
+ this.interval = interval;
+ }
+
+ /**
+ * @return Handler.
+ */
+ GridContinuousHandler handler() {
+ return hnd;
+ }
+ }
+
+ /**
+ * Remote routine info.
+ */
+ private static class RemoteRoutineInfo {
+ /** Master node ID. */
+ private final UUID nodeId;
+
+ /** Continuous routine handler. */
+ private final GridContinuousHandler hnd;
+
+ /** Buffer size. */
+ private final int bufSize;
+
+ /** Time interval. */
+ private final long interval;
+
+ /** Lock. */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** Buffer. */
+ private ConcurrentLinkedDeque8<Object> buf;
+
+ /** Last send time. */
+ private long lastSndTime = U.currentTimeMillis();
+
+ /** Automatic unsubscribe flag. */
+ private boolean autoUnsubscribe;
+
+ /**
+ * @param nodeId Master node ID.
+ * @param hnd Continuous routine handler.
+ * @param bufSize Buffer size.
+ * @param interval Interval.
+ * @param autoUnsubscribe Automatic unsubscribe flag.
+ */
+ RemoteRoutineInfo(UUID nodeId, GridContinuousHandler hnd, int
bufSize, long interval,
+ boolean autoUnsubscribe) {
+ assert nodeId != null;
+ assert hnd != null;
+ assert bufSize > 0;
+ assert interval >= 0;
+
+ this.nodeId = nodeId;
+ this.hnd = hnd;
+ this.bufSize = bufSize;
+ this.interval = interval;
+ this.autoUnsubscribe = autoUnsubscribe;
+
+ buf = new ConcurrentLinkedDeque8<>();
+ }
+
+ /**
+ * @param obj Object to add.
+ * @return Object to send or {@code null} if there is nothing to send
for now.
+ */
+ @Nullable Collection<Object> add(@Nullable Object obj) {
+ Collection<Object> toSnd = null;
+
+ if (buf.sizex() >= bufSize - 1) {
+ lock.writeLock().lock();
+
+ try {
+ buf.add(obj);
+
+ toSnd = buf;
+
+ buf = new ConcurrentLinkedDeque8<>();
+
+ if (interval > 0)
+ lastSndTime = U.currentTimeMillis();
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+ else {
+ lock.readLock().lock();
+
+ try {
+ buf.add(obj);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ return toSnd != null ? new ArrayList<>(toSnd) : null;
+ }
+
+ /**
+ * @return Tuple with objects to sleep (or {@code null} if there is
nothing to
+ * send for now) and time interval after next check is needed.
+ */
+ @SuppressWarnings("TooBroadScope")
+ IgniteBiTuple<Collection<Object>, Long> checkInterval() {
+ assert interval > 0;
+
+ Collection<Object> toSnd = null;
+ long diff;
+
+ long now = U.currentTimeMillis();
+
+ lock.writeLock().lock();
+
+ try {
+ diff = now - lastSndTime;
+
+ if (diff >= interval && !buf.isEmpty()) {
+ toSnd = buf;
+
+ buf = new ConcurrentLinkedDeque8<>();
+
+ lastSndTime = now;
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ return F.t(toSnd, diff < interval ? interval - diff : interval);
+ }
+ }
+
+ /**
+ * Start request data.
+ */
+ private static class StartRequestData implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Projection predicate. */
+ private IgnitePredicate<ClusterNode> prjPred;
+
+ /** Serialized projection predicate. */
+ private byte[] prjPredBytes;
+
+ /** Deployment class name. */
+ private String clsName;
+
+ /** Deployment info. */
+ private GridDeploymentInfo depInfo;
+
+ /** Handler. */
+ private GridContinuousHandler hnd;
+
+ /** Buffer size. */
+ private int bufSize;
+
+ /** Time interval. */
+ private long interval;
+
+ /** Automatic unsubscribe flag. */
+ private boolean autoUnsubscribe;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public StartRequestData() {
+ // No-op.
+ }
+
+ /**
+ * @param prjPred Serialized projection predicate.
+ * @param hnd Handler.
+ * @param bufSize Buffer size.
+ * @param interval Time interval.
+ * @param autoUnsubscribe Automatic unsubscribe flag.
+ */
+ StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred,
GridContinuousHandler hnd,
+ int bufSize, long interval, boolean autoUnsubscribe) {
+ assert hnd != null;
+ assert bufSize > 0;
+ assert interval >= 0;
+
+ this.prjPred = prjPred;
+ this.hnd = hnd;
+ this.bufSize = bufSize;
+ this.interval = interval;
+ this.autoUnsubscribe = autoUnsubscribe;
+ }
+
+ /**
+ * @param marsh Marshaller.
+ * @throws IgniteCheckedException In case of error.
+ */
+ void p2pMarshal(IgniteMarshaller marsh) throws IgniteCheckedException
{
+ assert marsh != null;
+
+ prjPredBytes = marsh.marshal(prjPred);
+ }
+
+ /**
+ * @param marsh Marshaller.
+ * @param ldr Class loader.
+ * @throws IgniteCheckedException In case of error.
+ */
+ void p2pUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr)
throws IgniteCheckedException {
+ assert marsh != null;
+
+ assert prjPred == null;
+ assert prjPredBytes != null;
+
+ prjPred = marsh.unmarshal(prjPredBytes, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws
IOException {
+ boolean b = prjPredBytes != null;
+
+ out.writeBoolean(b);
+
+ if (b) {
+ U.writeByteArray(out, prjPredBytes);
+ U.writeString(out, clsName);
+ out.writeObject(depInfo);
+ }
+ else
+ out.writeObject(prjPred);
+
+ out.writeObject(hnd);
+ out.writeInt(bufSize);
+ out.writeLong(interval);
+ out.writeBoolean(autoUnsubscribe);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws
IOException, ClassNotFoundException {
+ boolean b = in.readBoolean();
+
+ if (b) {
+ prjPredBytes = U.readByteArray(in);
+ clsName = U.readString(in);
+ depInfo = (GridDeploymentInfo)in.readObject();
+ }
+ else
+ prjPred = (IgnitePredicate<ClusterNode>)in.readObject();
+
+ hnd = (GridContinuousHandler)in.readObject();
+ bufSize = in.readInt();
+ interval = in.readLong();
+ autoUnsubscribe = in.readBoolean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StartRequestData.class, this);
+ }
+ }
+
+ /**
+ * Discovery data.
+ */
+ private static class DiscoveryData implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Node ID. */
+ private UUID nodeId;
+
+ /** Items. */
+ @GridToStringInclude
+ private Collection<DiscoveryDataItem> items;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public DiscoveryData() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ DiscoveryData(UUID nodeId) {
+ assert nodeId != null;
+
+ this.nodeId = nodeId;
+
+ items = new ArrayList<>();
+ }
+
+ /**
+ * @param item Item.
+ */
+ public void addItem(DiscoveryDataItem item) {
+ items.add(item);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws
IOException {
+ U.writeUuid(out, nodeId);
+ U.writeCollection(out, items);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws
IOException, ClassNotFoundException {
+ nodeId = U.readUuid(in);
+ items = U.readCollection(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DiscoveryData.class, this);
+ }
+ }
+
+ /**
+ * Discovery data item.
+ */
+ private static class DiscoveryDataItem implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Consume ID. */
+ private UUID routineId;
+
+ /** Projection predicate. */
+ private IgnitePredicate<ClusterNode> prjPred;
+
+ /** Handler. */
+ private GridContinuousHandler hnd;
+
+ /** Buffer size. */
+ private int bufSize;
+
+ /** Time interval. */
+ private long interval;
+
+ /** Automatic unsubscribe flag. */
+ private boolean autoUnsubscribe;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public DiscoveryDataItem() {
+ // No-op.
+ }
+
+ /**
+ * @param routineId Consume ID.
+ * @param prjPred Projection predicate.
+ * @param hnd Handler.
+ * @param bufSize Buffer size.
+ * @param interval Time interval.
+ */
+ DiscoveryDataItem(UUID routineId, @Nullable
IgnitePredicate<ClusterNode> prjPred,
+ GridContinuousHandler hnd, int bufSize, long interval) {
+ assert routineId != null;
+ assert hnd != null;
+ assert bufSize > 0;
+ assert interval >= 0;
+
+ this.routineId = routineId;
+ this.prjPred = prjPred;
+ this.hnd = hnd;
+ this.bufSize = bufSize;
+ this.interval = interval;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws
IOException {
+ U.writeUuid(out, routineId);
+ out.writeObject(prjPred);
+ out.writeObject(hnd);
+ out.writeInt(bufSize);
+ out.writeLong(interval);
+ out.writeBoolean(autoUnsubscribe);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws
IOException, ClassNotFoundException {
+ routineId = U.readUuid(in);
+ prjPred = (IgnitePredicate<ClusterNode>)in.readObject();
+ hnd = (GridContinuousHandler)in.readObject();
+ bufSize = in.readInt();
+ interval = in.readLong();
+ autoUnsubscribe = in.readBoolean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DiscoveryDataItem.class, this);
+ }
+ }
+
+ /**
+ * Future for start routine.
+ */
+ private static class StartFuture extends GridFutureAdapter<UUID> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Consume ID. */
+ private UUID routineId;
+
+ /** Local listener is registered. */
+ private volatile boolean loc;
+
+ /** All remote listeners are registered. */
+ private volatile boolean rmt;
+
+ /** Timeout object. */
+ private volatile GridTimeoutObject timeoutObj;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public StartFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Kernal context.
+ * @param routineId Consume ID.
+ */
+ StartFuture(GridKernalContext ctx, UUID routineId) {
+ super(ctx);
+
+ this.routineId = routineId;
+ }
+
+ /**
+ * Called when local listener is registered.
+ */
+ public void onLocalRegistered() {
+ loc = true;
+
+ if (rmt && !isDone())
+ onDone(routineId);
+ }
+
+ /**
+ * Called when all remote listeners are registered.
+ */
+ public void onRemoteRegistered() {
+ rmt = true;
+
+ if (loc && !isDone())
+ onDone(routineId);
+ }
+
+ /**
+ * @param timeoutObj Timeout object.
+ */
+ public void addTimeoutObject(GridTimeoutObject timeoutObj) {
+ assert timeoutObj != null;
+
+ this.timeoutObj = timeoutObj;
+
+ ctx.timeout().addTimeoutObject(timeoutObj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable UUID res, @Nullable
Throwable err) {
+ if (timeoutObj != null)
+ ctx.timeout().removeTimeoutObject(timeoutObj);
+
+ return super.onDone(res, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StartFuture.class, this);
+ }
+ }
+
+ /**
+ * Future for stop routine.
+ */
+ private static class StopFuture extends GridFutureAdapter<Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Timeout object. */
+ private volatile GridTimeoutObject timeoutObj;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public StopFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Kernal context.
+ */
+ StopFuture(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /**
+ * @param timeoutObj Timeout object.
+ */
+ public void addTimeoutObject(GridTimeoutObject timeoutObj) {
+ assert timeoutObj != null;
+
+ this.timeoutObj = timeoutObj;
+
+ ctx.timeout().addTimeoutObject(timeoutObj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Object res, @Nullable
Throwable err) {
+ if (timeoutObj != null)
+ ctx.timeout().removeTimeoutObject(timeoutObj);
+
+ return super.onDone(res, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StopFuture.class, this);
+ }
+ }
+
+ /**
+ * Synchronous message acknowledgement future.
+ */
+ private static class SyncMessageAckFuture extends
GridFutureAdapter<Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private UUID nodeId;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public SyncMessageAckFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Kernal context.
+ * @param nodeId Master node ID.
+ */
+ SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) {
+ super(ctx);
+
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Master node ID.
+ */
+ UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SyncMessageAckFuture.class, this);
+ }
+ }
+
+ /**
+ * Timeout object for stop process.
+ */
+ private class StopTimeoutObject extends GridTimeoutObjectAdapter {
+ /** Timeout. */
+ private final long timeout;
+
+ /** Routine ID. */
+ private final UUID routineId;
+
+ /** Request. */
+ private final GridContinuousMessage req;
+
+ /**
+ * @param timeout Timeout.
+ * @param routineId Routine ID.
+ * @param req Request.
+ */
+ protected StopTimeoutObject(long timeout, UUID routineId,
GridContinuousMessage req) {
+ super(timeout);
+
+ assert routineId != null;
+ assert req != null;
+
+ this.timeout = timeout;
+ this.routineId = routineId;
+ this.req = req;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ Collection<UUID> ids = waitForStopAck.remove(routineId);
+
+ if (ids != null) {
+ U.warn(log, "Failed to get stop acknowledgement from nodes
(timeout expired): " + ids +
+ ". Will retry.");
+
+ StopFuture f = stopFuts.get(routineId);
+
+ if (f != null) {
+ if (!ids.isEmpty()) {
+ waitForStopAck.put(routineId, ids);
+
+ // Resend requests.
+ for (UUID id : ids) {
+ try {
+ sendWithRetries(id, req, null);
+ }
+ catch (ClusterTopologyException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to resend stop request
to node (is node alive?): " + id);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to resend stop request
to node: " + id, e);
+
+ ids.remove(id);
+
+ if (ids.isEmpty())
+ f.onDone(e);
+ }
+ }
+
+ // Reschedule timeout.
+ ctx.timeout().addTimeoutObject(new
StopTimeoutObject(timeout, routineId, req));
+ }
+ else if (stopFuts.remove(routineId) != null)
+ f.onDone();
+ }
+ }
+ }
+ }
+ }