# IGNITE-831 Support listeners from clients.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37c60075 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37c60075 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37c60075 Branch: refs/heads/ignite-709_3 Commit: 37c60075d234480b9217a6456cbfc94e380c3b3a Parents: e028de8 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue May 5 19:38:29 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue May 5 19:38:29 2015 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 109 ++++++++++++------- 1 file changed, 67 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37c60075/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index d71609b..d1923d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -59,6 +60,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Local infos. */ private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new ConcurrentHashMap8<>(); + /** Local infos. */ + private final ConcurrentMap<UUID, Map<UUID, LocalRoutineInfo>> clientInfos = new ConcurrentHashMap8<>(); + /** Remote infos. */ private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new ConcurrentHashMap8<>(); @@ -77,9 +81,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Stopped IDs. */ private final Collection<UUID> stopped = new HashSet<>(); - /** Lock for pending requests. */ - private final Lock pendingLock = new ReentrantLock(); - /** Lock for stop process. */ private final Lock stopLock = new ReentrantLock(); @@ -113,10 +114,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @SuppressWarnings({"fallthrough", "TooBroadScope"}) @Override public void onEvent(Event evt) { assert evt instanceof DiscoveryEvent; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + clientInfos.remove(nodeId); // Unregister handlers created by left node. for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { @@ -148,7 +150,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { new CustomEventListener<StartRoutineDiscoveryMessage>() { @Override public void onCustomEvent(ClusterNode snd, StartRoutineDiscoveryMessage msg) { if (!snd.id().equals(ctx.localNodeId())) - processStartRequest(snd.id(), msg); + processStartRequest(snd, msg); } }); @@ -179,6 +181,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { UUID routineId = msg.routineId(); unregisterRemote(routineId); + + if (((TcpDiscoveryNode)snd).clientRouterNodeId() != null) { + Map<UUID, LocalRoutineInfo> infoMap = clientInfos.get(snd.id()); + + if (infoMap != null) + infoMap.remove(msg.routineId()); + } } } }); @@ -251,26 +260,19 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { if (!nodeId.equals(ctx.localNodeId())) { - pendingLock.lock(); - - try { - DiscoveryData data = new DiscoveryData(ctx.localNodeId()); + DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos); - // Collect listeners information (will be sent to - // joining node during discovery process). - for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { - UUID routineId = e.getKey(); - LocalRoutineInfo info = e.getValue(); - - data.addItem(new DiscoveryDataItem(routineId, info.prjPred, - info.hnd, info.bufSize, info.interval)); - } + // Collect listeners information (will be sent to + // joining node during discovery process). + for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); - return data; - } - finally { - pendingLock.unlock(); + data.addItem(new DiscoveryDataItem(routineId, info.prjPred, + info.hnd, info.bufSize, info.interval)); } + + return data; } else return null; @@ -294,6 +296,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } } + + for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) { + Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey()); + + if (map == null) { + map = new HashMap<>(); + + clientInfos.put(entry.getKey(), map); + } + + map.putAll(entry.getValue()); + } } } @@ -414,15 +428,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { }); } - pendingLock.lock(); - - try { - // Register routine locally. - locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval)); - } - finally { - pendingLock.unlock(); - } + // Register routine locally. + locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval)); StartFuture fut = new StartFuture(ctx, routineId); @@ -581,13 +588,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** - * @param nodeId Sender ID. + * @param node Sender. * @param req Start request. */ - private void processStartRequest(UUID nodeId, StartRoutineDiscoveryMessage req) { - assert nodeId != null; - assert req != null; - + private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) { UUID routineId = req.routineId(); StartRequestData data = req.startRequestData(); @@ -603,7 +607,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridDeploymentInfo depInfo = data.deploymentInfo(); GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, - depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); + depInfo.userVersion(), node.id(), depInfo.classLoaderId(), depInfo.participants(), null); if (dep == null) throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); @@ -611,13 +615,28 @@ public class GridContinuousProcessor extends GridProcessorAdapter { data.p2pUnmarshal(marsh, dep.classLoader()); } - hnd.p2pUnmarshal(nodeId, ctx); + hnd.p2pUnmarshal(node.id(), ctx); } } catch (IgniteCheckedException e) { err = e; - U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e); + U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); + } + + if (((TcpDiscoveryNode)node).clientRouterNodeId() != null) { + Map<UUID, LocalRoutineInfo> clientRouteMap = clientInfos.get(node.id()); + + if (clientRouteMap == null) { + clientRouteMap = new HashMap<>(); + + Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), clientRouteMap); + + assert old == null; + } + + clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(), + data.interval())); } boolean registered = false; @@ -627,14 +646,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter { IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate(); if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) { - registered = registerHandler(nodeId, routineId, hnd, data.bufferSize(), data.interval(), + registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(), data.autoUnsubscribe(), false); } } catch (IgniteCheckedException e) { err = e; - U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e); + U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); } } @@ -1129,6 +1148,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @GridToStringInclude private Collection<DiscoveryDataItem> items; + private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos; + /** * Required by {@link Externalizable}. */ @@ -1139,11 +1160,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param nodeId Node ID. */ - DiscoveryData(UUID nodeId) { + DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) { assert nodeId != null; this.nodeId = nodeId; + this.clientInfos = clientInfos; + items = new ArrayList<>(); } @@ -1158,12 +1181,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeUuid(out, nodeId); U.writeCollection(out, items); + U.writeMap(out, clientInfos); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { nodeId = U.readUuid(in); items = U.readCollection(in); + clientInfos = U.readMap(in); } /** {@inheritDoc} */