IGNITE-709 Fix CacheListenerTest#testDeregistration()
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8fbb590f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8fbb590f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8fbb590f Branch: refs/heads/ignite-929 Commit: 8fbb590f2e659c785c6d89ea49e1e4bb3fdc666f Parents: 0ae75d8 Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Sun May 24 20:23:01 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Sun May 24 20:23:01 2015 +0300 ---------------------------------------------------------------------- .../discovery/CustomMessageWrapper.java | 5 ++ .../discovery/DiscoveryCustomMessage.java | 5 ++ .../cache/DynamicCacheChangeBatch.java | 5 ++ .../continuous/AbstractContinuousMessage.java | 54 ++++++++++++++++++++ .../StartRoutineAckDiscoveryMessage.java | 20 ++------ .../StartRoutineDiscoveryMessage.java | 25 +++------ .../StopRoutineAckDiscoveryMessage.java | 19 +------ .../continuous/StopRoutineDiscoveryMessage.java | 19 +------ .../discovery/DiscoverySpiCustomMessage.java | 5 ++ .../discovery/tcp/TcpClientDiscoverySpi.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 14 ++--- .../TcpDiscoveryCustomEventMessage.java | 31 +++++++++-- 12 files changed, 126 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java index 0afb6cf..23f8bda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java @@ -44,6 +44,11 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage { return res == null ? null : new CustomMessageWrapper(res); } + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return delegate.isMutable(); + } + /** * @return Delegate. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java index 13c0b9c..693bbef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java @@ -40,4 +40,9 @@ public interface DiscoveryCustomMessage extends Serializable { * @return Ack message or {@code null} if ack is not required. */ @Nullable public DiscoveryCustomMessage ackMessage(); + + /** + * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. + */ + public boolean isMutable(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index ca257a9..5fcd0e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -83,4 +83,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { @Nullable @Override public DiscoveryCustomMessage ackMessage() { return null; } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java new file mode 100644 index 0000000..f375777 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.internal.managers.discovery.*; + +import java.util.*; + +/** + * + */ +public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage { + /** Routine ID. */ + protected final UUID routineId; + + /** + * @param id Id. + */ + protected AbstractContinuousMessage(UUID id) { + routineId = id; + } + + /** + * @return Routine ID. + */ + public UUID routineId() { + return routineId; + } + + /** {@inheritDoc} */ + @Override public boolean incrementMinorTopologyVersion() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java index 66892b1..3e3e6fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -26,13 +26,10 @@ import java.util.*; /** * */ -public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { +public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; - /** Routine ID. */ - private final UUID routineId; - /** */ private final Map<UUID, IgniteCheckedException> errs; @@ -41,13 +38,9 @@ public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { * @param errs Errs. */ public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) { - this.routineId = routineId; - this.errs = new HashMap<>(errs); - } + super(routineId); - /** {@inheritDoc} */ - @Override public boolean incrementMinorTopologyVersion() { - return false; + this.errs = new HashMap<>(errs); } /** {@inheritDoc} */ @@ -56,13 +49,6 @@ public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { } /** - * @return Routine ID. - */ - public UUID routineId() { - return routineId; - } - - /** * @return Errs. */ public Map<UUID, IgniteCheckedException> errs() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index 2199fd0..ec0d36b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -26,13 +26,10 @@ import java.util.*; /** * */ -public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage { +public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; - /** Routine ID. */ - private final UUID routineId; - /** */ private final StartRequestData startReqData; @@ -44,13 +41,9 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage { * @param startReqData Start request data. */ public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) { - this.routineId = routineId; - this.startReqData = startReqData; - } + super(routineId); - /** {@inheritDoc} */ - @Override public boolean incrementMinorTopologyVersion() { - return false; + this.startReqData = startReqData; } /** @@ -69,13 +62,6 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage { } /** - * @return Routine ID. - */ - public UUID routineId() { - return routineId; - } - - /** * @return Errs. */ public Map<UUID, IgniteCheckedException> errs() { @@ -83,6 +69,11 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean isMutable() { + return true; + } + + /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { return new StartRoutineAckDiscoveryMessage(routineId, errs); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java index a640222..350f13c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java @@ -25,34 +25,19 @@ import java.util.*; /** * */ -public class StopRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { +public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; - /** Routine ID. */ - private final UUID routineId; - /** * @param routineId Routine id. */ public StopRoutineAckDiscoveryMessage(UUID routineId) { - this.routineId = routineId; - } - - /** {@inheritDoc} */ - @Override public boolean incrementMinorTopologyVersion() { - return false; + super(routineId); } /** {@inheritDoc} */ @Nullable @Override public DiscoveryCustomMessage ackMessage() { return null; } - - /** - * @return Routine ID. - */ - public UUID routineId() { - return routineId; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java index e8a43a3..5b0dc5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java @@ -25,30 +25,15 @@ import java.util.*; /** * */ -public class StopRoutineDiscoveryMessage implements DiscoveryCustomMessage { +public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private static final long serialVersionUID = 0L; - /** Routine ID. */ - private final UUID routineId; - /** * @param routineId Routine id. */ public StopRoutineDiscoveryMessage(UUID routineId) { - this.routineId = routineId; - } - - /** {@inheritDoc} */ - @Override public boolean incrementMinorTopologyVersion() { - return false; - } - - /** - * @return Routine ID. - */ - public UUID routineId() { - return routineId; + super(routineId); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java index 72ba9db..15e943b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -32,4 +32,9 @@ public interface DiscoverySpiCustomMessage extends Serializable { * Called when message passed the ring. */ @Nullable public DiscoverySpiCustomMessage ackMessage(); + + /** + * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. + */ + public boolean isMutable(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 46e9635..22bb49b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -461,7 +461,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp throw new IgniteException("Failed to send custom message: client is disconnected"); try { - sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt))); + sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt))); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); @@ -1481,7 +1481,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (node != null && node.visible()) { try { - DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + DiscoverySpiCustomMessage msgObj = msg.message(marsh); notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 0164e5c..34e1ca8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1266,7 +1266,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { try { - msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt))); + msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt))); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); @@ -4536,7 +4536,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov DiscoverySpiCustomMessage msgObj = null; try { - msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + msgObj = msg.message(marsh); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); @@ -4547,7 +4547,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (nextMsg != null) { try { - addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(nextMsg))); + addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg, + marsh.marshal(nextMsg))); } catch (IgniteCheckedException e) { U.error(log, "Failed to marshal discovery custom message.", e); @@ -4584,13 +4585,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion()); if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) { - assert msg.messageBytes() != null; - TcpDiscoveryNode node = ring.node(msg.creatorNodeId()); if (node != null) { try { - DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + DiscoverySpiCustomMessage msgObj = msg.message(marsh); lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), @@ -4599,7 +4598,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov hist, msgObj); - msg.messageBytes(marsh.marshal(msgObj)); + if (msgObj.isMutable()) + msg.message(msgObj, marsh.marshal(msgObj)); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 372aa18..0739c1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -18,6 +18,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.spi.discovery.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -31,15 +34,21 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage private static final long serialVersionUID = 0L; /** */ + private transient volatile DiscoverySpiCustomMessage msg; + + /** */ private byte[] msgBytes; /** * @param creatorNodeId Creator node id. + * @param msg Message. * @param msgBytes Serialized message. */ - public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, byte[] msgBytes) { + public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg, + @NotNull byte[] msgBytes) { super(creatorNodeId); + this.msg = msg; this.msgBytes = msgBytes; } @@ -51,12 +60,28 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage } /** - * @param msgBytes New message bytes. + * @param msg Message. + * @param msgBytes Serialized message. */ - public void messageBytes(byte[] msgBytes) { + public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) { + this.msg = msg; this.msgBytes = msgBytes; } + /** + * @return Deserialized message, + * @throws java.lang.Throwable if unmarshal failed. + */ + @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable { + if (msg == null) { + msg = marsh.unmarshal(msgBytes, U.gridClassLoader()); + + assert msg != null; + } + + return msg; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString());