# IGNITE-831 Support listeners from clients.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37c60075
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37c60075
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37c60075

Branch: refs/heads/ignite-709_3
Commit: 37c60075d234480b9217a6456cbfc94e380c3b3a
Parents: e028de8
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Tue May 5 19:38:29 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Tue May 5 19:38:29 2015 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousProcessor.java     | 109 ++++++++++++-------
 1 file changed, 67 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37c60075/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d71609b..d1923d9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
 import org.apache.ignite.thread.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
@@ -59,6 +60,9 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /** Local infos. */
     private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new 
ConcurrentHashMap8<>();
 
+    /** Local infos. */
+    private final ConcurrentMap<UUID, Map<UUID, LocalRoutineInfo>> clientInfos 
= new ConcurrentHashMap8<>();
+
     /** Remote infos. */
     private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new 
ConcurrentHashMap8<>();
 
@@ -77,9 +81,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /** Stopped IDs. */
     private final Collection<UUID> stopped = new HashSet<>();
 
-    /** Lock for pending requests. */
-    private final Lock pendingLock = new ReentrantLock();
-
     /** Lock for stop process. */
     private final Lock stopLock = new ReentrantLock();
 
@@ -113,10 +114,11 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             @SuppressWarnings({"fallthrough", "TooBroadScope"})
             @Override public void onEvent(Event evt) {
                 assert evt instanceof DiscoveryEvent;
+                assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
 
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
-                assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
+                clientInfos.remove(nodeId);
 
                 // Unregister handlers created by left node.
                 for (Map.Entry<UUID, RemoteRoutineInfo> e : 
rmtInfos.entrySet()) {
@@ -148,7 +150,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             new CustomEventListener<StartRoutineDiscoveryMessage>() {
                 @Override public void onCustomEvent(ClusterNode snd, 
StartRoutineDiscoveryMessage msg) {
                     if (!snd.id().equals(ctx.localNodeId()))
-                        processStartRequest(snd.id(), msg);
+                        processStartRequest(snd, msg);
                 }
             });
 
@@ -179,6 +181,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                         UUID routineId = msg.routineId();
 
                         unregisterRemote(routineId);
+
+                        if (((TcpDiscoveryNode)snd).clientRouterNodeId() != 
null) {
+                            Map<UUID, LocalRoutineInfo> infoMap = 
clientInfos.get(snd.id());
+
+                            if (infoMap != null)
+                                infoMap.remove(msg.routineId());
+                        }
                     }
                 }
             });
@@ -251,26 +260,19 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
         if (!nodeId.equals(ctx.localNodeId())) {
-            pendingLock.lock();
-
-            try {
-                DiscoveryData data = new DiscoveryData(ctx.localNodeId());
+            DiscoveryData data = new DiscoveryData(ctx.localNodeId(), 
clientInfos);
 
-                // Collect listeners information (will be sent to
-                // joining node during discovery process).
-                for (Map.Entry<UUID, LocalRoutineInfo> e : 
locInfos.entrySet()) {
-                    UUID routineId = e.getKey();
-                    LocalRoutineInfo info = e.getValue();
-
-                    data.addItem(new DiscoveryDataItem(routineId, info.prjPred,
-                        info.hnd, info.bufSize, info.interval));
-                }
+            // Collect listeners information (will be sent to
+            // joining node during discovery process).
+            for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
+                UUID routineId = e.getKey();
+                LocalRoutineInfo info = e.getValue();
 
-                return data;
-            }
-            finally {
-                pendingLock.unlock();
+                data.addItem(new DiscoveryDataItem(routineId, info.prjPred,
+                    info.hnd, info.bufSize, info.interval));
             }
+
+            return data;
         }
         else
             return null;
@@ -294,6 +296,18 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     }
                 }
             }
+
+            for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : 
data.clientInfos.entrySet()) {
+                Map<UUID, LocalRoutineInfo> map = 
clientInfos.get(entry.getKey());
+
+                if (map == null) {
+                    map = new HashMap<>();
+
+                    clientInfos.put(entry.getKey(), map);
+                }
+
+                map.putAll(entry.getValue());
+            }
         }
     }
 
@@ -414,15 +428,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             });
         }
 
-        pendingLock.lock();
-
-        try {
-            // Register routine locally.
-            locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, 
bufSize, interval));
-        }
-        finally {
-            pendingLock.unlock();
-        }
+        // Register routine locally.
+        locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, 
interval));
 
         StartFuture fut = new StartFuture(ctx, routineId);
 
@@ -581,13 +588,10 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param nodeId Sender ID.
+     * @param node Sender.
      * @param req Start request.
      */
-    private void processStartRequest(UUID nodeId, StartRoutineDiscoveryMessage 
req) {
-        assert nodeId != null;
-        assert req != null;
-
+    private void processStartRequest(ClusterNode node, 
StartRoutineDiscoveryMessage req) {
         UUID routineId = req.routineId();
         StartRequestData data = req.startRequestData();
 
@@ -603,7 +607,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     GridDeploymentInfo depInfo = data.deploymentInfo();
 
                     GridDeployment dep = 
ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
-                        depInfo.userVersion(), nodeId, 
depInfo.classLoaderId(), depInfo.participants(), null);
+                        depInfo.userVersion(), node.id(), 
depInfo.classLoaderId(), depInfo.participants(), null);
 
                     if (dep == null)
                         throw new IgniteDeploymentCheckedException("Failed to 
obtain deployment for class: " + clsName);
@@ -611,13 +615,28 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     data.p2pUnmarshal(marsh, dep.classLoader());
                 }
 
-                hnd.p2pUnmarshal(nodeId, ctx);
+                hnd.p2pUnmarshal(node.id(), ctx);
             }
         }
         catch (IgniteCheckedException e) {
             err = e;
 
-            U.error(log, "Failed to register handler [nodeId=" + nodeId + ", 
routineId=" + routineId + ']', e);
+            U.error(log, "Failed to register handler [nodeId=" + node.id() + 
", routineId=" + routineId + ']', e);
+        }
+
+        if (((TcpDiscoveryNode)node).clientRouterNodeId() != null) {
+            Map<UUID, LocalRoutineInfo> clientRouteMap = 
clientInfos.get(node.id());
+
+            if (clientRouteMap == null) {
+                clientRouteMap = new HashMap<>();
+
+                Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), 
clientRouteMap);
+
+                assert old == null;
+            }
+
+            clientRouteMap.put(routineId, new 
LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(),
+                data.interval()));
         }
 
         boolean registered = false;
@@ -627,14 +646,14 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 IgnitePredicate<ClusterNode> prjPred = 
data.projectionPredicate();
 
                 if (prjPred == null || 
prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) {
-                    registered = registerHandler(nodeId, routineId, hnd, 
data.bufferSize(), data.interval(),
+                    registered = registerHandler(node.id(), routineId, hnd, 
data.bufferSize(), data.interval(),
                         data.autoUnsubscribe(), false);
                 }
             }
             catch (IgniteCheckedException e) {
                 err = e;
 
-                U.error(log, "Failed to register handler [nodeId=" + nodeId + 
", routineId=" + routineId + ']', e);
+                U.error(log, "Failed to register handler [nodeId=" + node.id() 
+ ", routineId=" + routineId + ']', e);
             }
         }
 
@@ -1129,6 +1148,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         @GridToStringInclude
         private Collection<DiscoveryDataItem> items;
 
+        private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos;
+
         /**
          * Required by {@link Externalizable}.
          */
@@ -1139,11 +1160,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         /**
          * @param nodeId Node ID.
          */
-        DiscoveryData(UUID nodeId) {
+        DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> 
clientInfos) {
             assert nodeId != null;
 
             this.nodeId = nodeId;
 
+            this.clientInfos = clientInfos;
+
             items = new ArrayList<>();
         }
 
@@ -1158,12 +1181,14 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
             U.writeUuid(out, nodeId);
             U.writeCollection(out, items);
+            U.writeMap(out, clientInfos);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
             nodeId = U.readUuid(in);
             items = U.readCollection(in);
+            clientInfos = U.readMap(in);
         }
 
         /** {@inheritDoc} */

Reply via email to