ignite-999 ignore duplicated discovery custom messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/80c6cf0b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/80c6cf0b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/80c6cf0b Branch: refs/heads/ignite-633 Commit: 80c6cf0b3d763e3fcfab37653de3f2c6dddf30e8 Parents: 14bb076 Author: sboikov <sboi...@gridgain.com> Authored: Tue Jun 9 11:14:09 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jun 9 11:14:09 2015 +0300 ---------------------------------------------------------------------- .../discovery/DiscoveryCustomMessage.java | 6 ++++ .../discovery/GridDiscoveryManager.java | 32 ++++++++++++++++++++ .../cache/DynamicCacheChangeBatch.java | 19 +++++++++--- .../continuous/AbstractContinuousMessage.java | 9 ++++++ .../DataStreamerMultinodeCreateCacheTest.java | 4 +-- 5 files changed, 63 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java index 693bbef..401486d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.managers.discovery; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.io.*; @@ -27,6 +28,11 @@ import java.io.*; */ public interface DiscoveryCustomMessage extends Serializable { /** + * @return Unique custom message ID. + */ + public IgniteUuid id(); + + /** * Whether or not minor version of topology should be increased on message receive. * * @return {@code true} if minor topology version should be increased. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 142dbaa..71fbc61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -178,6 +178,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + /** Received custom messages history. */ + private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>(); + /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { super(ctx, ctx.config().getDiscoverySpi()); @@ -359,6 +362,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null : ((CustomMessageWrapper)spiCustomMsg).delegate(); + if (skipMessage(type, customMsg)) + return; + final ClusterNode locNode = localNode(); if (snapshots != null) @@ -515,6 +521,32 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param type Message type. + * @param customMsg Custom message. + * @return {@code True} if should not process message. + */ + private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) { + if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + assert customMsg != null && customMsg.id() != null : customMsg; + + if (rcvdCustomMsgs.contains(customMsg.id())) { + if (log.isDebugEnabled()) + log.debug("Received duplicated custom message, will ignore [msg=" + customMsg + "]"); + + return true; + } + + rcvdCustomMsgs.addLast(customMsg.id()); + + while (rcvdCustomMsgs.size() > DISCOVERY_HISTORY_SIZE) + rcvdCustomMsgs.pollFirst(); + } + + return false; + } + + /** + * @param msgCls Message class. * @param lsnr Custom event listener. */ public <T extends DiscoveryCustomMessage> void setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index 5fcd0e2..dfc39c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.util.*; @@ -39,6 +40,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { @GridToStringInclude private Map<String, Map<UUID, Boolean>> clientNodes; + /** Custom message ID. */ + private IgniteUuid id = IgniteUuid.randomUuid(); + /** * @param reqs Requests. */ @@ -48,6 +52,11 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { this.reqs = reqs; } + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + /** * @return Collection of change requests. */ @@ -70,11 +79,6 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DynamicCacheChangeBatch.class, this); - } - - /** {@inheritDoc} */ @Override public boolean incrementMinorTopologyVersion() { return true; } @@ -88,4 +92,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { @Override public boolean isMutable() { return false; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DynamicCacheChangeBatch.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java index f375777..91768a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.continuous; import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.lang.*; import java.util.*; @@ -28,6 +29,9 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag /** Routine ID. */ protected final UUID routineId; + /** Custom message ID. */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + /** * @param id Id. */ @@ -35,6 +39,11 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag routineId = id; } + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + /** * @return Routine ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80c6cf0b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java index 2d19d6f..d258a33 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultinodeCreateCacheTest.java @@ -42,8 +42,8 @@ public class DataStreamerMultinodeCreateCacheTest extends GridCommonAbstractTest ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(5000); - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(5000); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setSocketTimeout(50); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setAckTimeout(50); return cfg; }