http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 0000000,255432c..538fa93
mode 000000,100644..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@@ -1,0 -1,2205 +1,2016 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.managers.communication;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.spi.*;
+ import org.apache.ignite.spi.communication.*;
+ import org.apache.ignite.internal.managers.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.util.*;
+ import java.util.Map.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ import java.util.concurrent.locks.*;
+
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ import static
org.apache.ignite.internal.util.nio.GridNioBackPressureControl.*;
+ import static org.jdk8.backport.ConcurrentLinkedHashMap.QueuePolicy.*;
+
+ /**
+ * Grid communication manager.
+ */
+ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Serializable>> {
+ /** Max closed topics to store. */
+ public static final int MAX_CLOSED_TOPICS = 10240;
+
- /** Ordered messages comparator. */
- private static final Comparator<IgniteBiTuple<GridIoMessage, Long>>
MSG_CMP =
- new Comparator<IgniteBiTuple<GridIoMessage, Long>>() {
- @Override public int compare(IgniteBiTuple<GridIoMessage, Long>
t1, IgniteBiTuple<GridIoMessage, Long> t2) {
- return t1.get1().messageId() < t2.get1().messageId() ? -1 :
- t1.get1().messageId() == t2.get1().messageId() ? 0 : 1;
- }
- };
-
+ /** Listeners by topic. */
+ private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new
ConcurrentHashMap8<>();
+
+ /** Disconnect listeners. */
+ private final Collection<GridDisconnectListener> disconnectLsnrs = new
ConcurrentLinkedQueue<>();
+
+ /** Public pool. */
+ private ExecutorService pubPool;
+
+ /** Internal P2P pool. */
+ private ExecutorService p2pPool;
+
+ /** Internal system pool. */
+ private ExecutorService sysPool;
+
+ /** Internal management pool. */
+ private ExecutorService mgmtPool;
+
+ /** Affinity assignment executor service. */
+ private ExecutorService affPool;
+
+ /** Utility cache pool. */
+ private ExecutorService utilityCachePool;
+
+ /** Discovery listener. */
+ private GridLocalEventListener discoLsnr;
+
+ /** */
+ private final ConcurrentMap<Object, ConcurrentMap<UUID,
GridCommunicationMessageSet>> msgSetMap =
+ new ConcurrentHashMap8<>();
+
- /** Messages ID generator (per topic). */
- private final ConcurrentMap<Object, ConcurrentMap<UUID, AtomicLong>>
msgIdMap =
- new ConcurrentHashMap8<>();
-
+ /** Local node ID. */
+ private final UUID locNodeId;
+
+ /** Discovery delay. */
+ private final long discoDelay;
+
+ /** Cache for messages that were received prior to discovery. */
+ private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>>
waitMap =
+ new ConcurrentHashMap8<>();
+
+ /** Communication message listener. */
+ private CommunicationListener<Serializable> commLsnr;
+
+ /** Grid marshaller. */
+ private final IgniteMarshaller marsh;
+
+ /** Busy lock. */
+ private final GridSpinReadWriteLock busyLock = new
GridSpinReadWriteLock();
+
+ /** Lock to sync maps access. */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** Message cache. */
+ private ThreadLocal<IgniteBiTuple<Object, byte[]>> cacheMsg =
+ new GridThreadLocal<IgniteBiTuple<Object, byte[]>>() {
+ @Nullable @Override protected IgniteBiTuple<Object, byte[]>
initialValue() {
+ return null;
+ }
+ };
+
+ /** Fully started flag. When set to true, can send and receive messages.
*/
+ private volatile boolean started;
+
+ /** Closed topics. */
+ private final GridBoundedConcurrentLinkedHashSet<Object> closedTopics =
+ new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_TOPICS,
MAX_CLOSED_TOPICS, 0.75f, 256,
+ PER_SEGMENT_Q_OPTIMIZED_RMV);
+
+ /** Workers count. */
+ private final LongAdder workersCnt = new LongAdder();
+
+ /**
+ * @param ctx Grid kernal context.
+ */
+ @SuppressWarnings("deprecation")
+ public GridIoManager(GridKernalContext ctx) {
+ super(ctx, ctx.config().getCommunicationSpi());
+
+ locNodeId = ctx.localNodeId();
+
+ discoDelay = ctx.config().getDiscoveryStartupDelay();
+
+ marsh = ctx.config().getMarshaller();
+ }
+
+ /**
+ * Resets metrics for this manager.
+ */
+ public void resetMetrics() {
+ getSpi().resetMetrics();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public void start() throws IgniteCheckedException {
+ assertParameter(discoDelay > 0, "discoveryStartupDelay > 0");
+
+ startSpi();
+
+ pubPool = ctx.config().getExecutorService();
+ p2pPool = ctx.config().getPeerClassLoadingExecutorService();
+ sysPool = ctx.config().getSystemExecutorService();
+ mgmtPool = ctx.config().getManagementExecutorService();
+ utilityCachePool = ctx.utilityCachePool();
+ affPool = Executors.newFixedThreadPool(1);
+
+ getSpi().setListener(commLsnr = new
CommunicationListener<Serializable>() {
+ @Override public void onMessage(UUID nodeId, Serializable msg,
IgniteRunnable msgC) {
+ try {
+ onMessage0(nodeId, (GridIoMessage)msg, msgC);
+ }
+ catch (ClassCastException ignored) {
+ U.error(log, "Communication manager received message of
unknown type (will ignore): " +
+ msg.getClass().getName() + ". Most likely
GridCommunicationSpi is being used directly, " +
+ "which is illegal - make sure to send messages only
via GridProjection API.");
+ }
+ }
+
+ @Override public void onDisconnected(UUID nodeId) {
+ for (GridDisconnectListener lsnr : disconnectLsnrs)
+ lsnr.onNodeDisconnected(nodeId);
+ }
+ });
+
+ if (log.isDebugEnabled())
+ log.debug(startInfo());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"deprecation",
"SynchronizationOnLocalVariableOrMethodParameter"})
+ @Override public void onKernalStart0() throws IgniteCheckedException {
+ discoLsnr = new GridLocalEventListener() {
+ @SuppressWarnings({"TooBroadScope", "fallthrough"})
+ @Override public void onEvent(IgniteEvent evt) {
+ assert evt instanceof IgniteDiscoveryEvent : "Invalid event:
" + evt;
+
+ IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+ UUID nodeId = discoEvt.eventNode().id();
+
+ switch (evt.type()) {
+ case EVT_NODE_JOINED:
- ConcurrentLinkedDeque8<DelayedMessage> delayedMsgs =
null;
-
- lock.writeLock().lock();
-
- try {
- if (started)
- delayedMsgs = waitMap.remove(nodeId);
- }
- finally {
- lock.writeLock().unlock();
- }
-
- if (log.isDebugEnabled())
- log.debug("Processing messages from discovery
startup delay list " +
- "(sender node joined topology): " +
delayedMsgs);
-
- // After write lock released.
- if (delayedMsgs != null)
- for (DelayedMessage msg : delayedMsgs)
- commLsnr.onMessage(msg.nodeId(),
msg.message(), msg.callback());
++ assert waitMap.get(nodeId) == null; // We can't
receive messages from undiscovered nodes.
+
+ break;
+
+ case EVT_NODE_LEFT:
+ case EVT_NODE_FAILED:
+ for (Map.Entry<Object, ConcurrentMap<UUID,
GridCommunicationMessageSet>> e :
+ msgSetMap.entrySet()) {
+ ConcurrentMap<UUID, GridCommunicationMessageSet>
map = e.getValue();
+
+ GridCommunicationMessageSet set;
+
+ boolean empty;
+
+ synchronized (map) {
+ set = map.remove(nodeId);
+
+ empty = map.isEmpty();
+ }
+
+ if (set != null) {
+ if (log.isDebugEnabled())
+ log.debug("Removed message set due to
node leaving grid: " + set);
+
+ // Unregister timeout listener.
+ ctx.timeout().removeTimeoutObject(set);
+
+ // Node may still send stale messages for
this topic
+ // even after discovery notification is done.
+ closedTopics.add(set.topic());
+ }
+
+ if (empty)
+ msgSetMap.remove(e.getKey(), map);
+ }
+
+ // Clean up delayed and ordered messages (need
exclusive lock).
+ lock.writeLock().lock();
+
+ try {
+ ConcurrentLinkedDeque8<DelayedMessage> waitList =
waitMap.remove(nodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Removed messages from discovery
startup delay list " +
+ "(sender node left topology): " +
waitList);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ break;
+
+ default:
+ assert false : "Unexpected event: " + evt;
+ }
+ }
+ };
+
+ ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_JOINED,
EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+ // Make sure that there are no stale messages due to window between
communication
+ // manager start and kernal start.
+ // 1. Process wait list.
+ Collection<Collection<DelayedMessage>> delayedMsgs = new
ArrayList<>();
+
+ lock.writeLock().lock();
+
+ try {
+ started = true;
+
+ for (Entry<UUID, ConcurrentLinkedDeque8<DelayedMessage>> e :
waitMap.entrySet()) {
+ if (ctx.discovery().node(e.getKey()) != null) {
+ ConcurrentLinkedDeque8<DelayedMessage> waitList =
waitMap.remove(e.getKey());
+
+ if (log.isDebugEnabled())
+ log.debug("Processing messages from discovery startup
delay list: " + waitList);
+
+ if (waitList != null)
+ delayedMsgs.add(waitList);
+ }
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ // After write lock released.
+ if (!delayedMsgs.isEmpty()) {
+ for (Collection<DelayedMessage> col : delayedMsgs)
+ for (DelayedMessage msg : col)
+ commLsnr.onMessage(msg.nodeId(), msg.message(),
msg.callback());
+ }
+
+ // 2. Process messages sets.
+ for (Map.Entry<Object, ConcurrentMap<UUID,
GridCommunicationMessageSet>> e : msgSetMap.entrySet()) {
+ ConcurrentMap<UUID, GridCommunicationMessageSet> map =
e.getValue();
+
+ for (GridCommunicationMessageSet set : map.values()) {
+ if (ctx.discovery().node(set.nodeId()) == null) {
+ // All map modifications should be synced for consistency.
+ boolean rmv;
+
+ synchronized (map) {
+ rmv = map.remove(set.nodeId(), set);
+ }
+
+ if (rmv) {
+ if (log.isDebugEnabled())
+ log.debug("Removed message set due to node
leaving grid: " + set);
+
+ // Unregister timeout listener.
+ ctx.timeout().removeTimeoutObject(set);
+ }
+
+ }
+ }
+
+ boolean rmv;
+
+ synchronized (map) {
+ rmv = map.isEmpty();
+ }
+
+ if (rmv) {
+ msgSetMap.remove(e.getKey(), map);
+
+ // Node may still send stale messages for this topic
+ // even after discovery notification is done.
+ closedTopics.add(e.getKey());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("BusyWait")
+ @Override public void onKernalStop0(boolean cancel) {
+ // No more communication messages.
+ getSpi().setListener(null);
+
+ busyLock.writeLock();
+
+ U.shutdownNow(getClass(), affPool, log);
+
+ boolean interrupted = false;
+
+ while (workersCnt.sum() != 0) {
+ try {
+ Thread.sleep(200);
+ }
+ catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted)
+ Thread.currentThread().interrupt();
+
+ GridEventStorageManager evtMgr = ctx.event();
+
+ if (evtMgr != null && discoLsnr != null)
+ evtMgr.removeLocalEventListener(discoLsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ stopSpi();
+
+ // Clear cache.
+ cacheMsg.set(null);
+
+ if (log.isDebugEnabled())
+ log.debug(stopInfo());
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param msg Message bytes.
+ * @param msgC Closure to call when message processing finished.
+ */
+ @SuppressWarnings("fallthrough")
+ private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable
msgC) {
+ assert nodeId != null;
+ assert msg != null;
+
+ if (!busyLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Received communication message while stopping
grid.");
+
+ return;
+ }
+
+ try {
+ // Check discovery.
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring message from dead node [senderId=" +
nodeId + ", msg=" + msg + ']');
+
+ return; // We can't receive messages from non-discovered ones.
+ }
+
+ if (msg.topic() == null) {
+ int topicOrd = msg.topicOrdinal();
+
+ msg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) :
marsh.unmarshal(msg.topicBytes(), null));
+ }
+
+ if (!started) {
+ lock.readLock().lock();
+
+ try {
+ if (!started) { // Sets to true in write lock, so double
checking.
+ // Received message before valid context is set to
manager.
+ if (log.isDebugEnabled())
+ log.debug("Adding message to waiting list
[senderId=" + nodeId +
+ ", msg=" + msg + ']');
+
+ ConcurrentLinkedDeque8<DelayedMessage> list =
+ F.addIfAbsent(waitMap, nodeId,
F.<DelayedMessage>newDeque());
+
+ assert list != null;
+
+ list.add(new DelayedMessage(nodeId, msg, msgC));
+
+ return;
+ }
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ // If message is P2P, then process in P2P service.
+ // This is done to avoid extra waiting and potential deadlocks
+ // as thread pool may not have any available threads to give.
+ GridIoPolicy plc = msg.policy();
+
+ switch (plc) {
+ case P2P_POOL: {
- processP2PMessage(node, msg, msgC);
++ processP2PMessage(nodeId, msg, msgC);
+
+ break;
+ }
+
+ case PUBLIC_POOL:
+ case SYSTEM_POOL:
+ case MANAGEMENT_POOL:
+ case AFFINITY_POOL:
+ case UTILITY_CACHE_POOL: {
+ if (msg.isOrdered())
- processOrderedMessage(node, msg, plc, msgC);
++ processOrderedMessage(nodeId, msg, plc, msgC);
+ else
- processRegularMessage(node, msg, plc, msgC);
++ processRegularMessage(nodeId, msg, plc, msgC);
+
+ break;
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process message (will ignore): " + msg,
e);
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /**
+ * Gets execution pool for policy.
+ *
+ * @param plc Policy.
+ * @return Execution pool.
+ */
+ private Executor pool(GridIoPolicy plc) {
+ switch (plc) {
+ case P2P_POOL:
+ return p2pPool;
+ case SYSTEM_POOL:
+ return sysPool;
+ case PUBLIC_POOL:
+ return pubPool;
+ case MANAGEMENT_POOL:
+ return mgmtPool;
+ case AFFINITY_POOL:
+ return affPool;
+ case UTILITY_CACHE_POOL:
+ assert utilityCachePool != null : "Utility cache pool is not
configured.";
+
+ return utilityCachePool;
+
+ default: {
+ assert false : "Invalid communication policy: " + plc;
+
+ // Never reached.
+ return null;
+ }
+ }
+ }
+
+ /**
- * @param msg Message bytes.
- * @return Policy.
- */
- private GridIoPolicy policy(byte[] msg) {
- GridIoPolicy plc = GridIoPolicy.fromOrdinal(msg[0]);
-
- if (plc == null)
- throw new IllegalStateException("Failed to parse message policy:
" + Arrays.toString(msg));
-
- return plc;
- }
-
- /**
- * @param msg Message bytes.
- * @return {@code True} if ordered.
- */
- private boolean ordered(byte[] msg) {
- return msg[1] == 1;
- }
-
- /**
- * @param node Node.
++ * @param nodeId Node ID.
+ * @param msg Message.
+ * @param msgC Closure to call when message processing finished.
+ */
- @SuppressWarnings("deprecation")
- private void processP2PMessage(final ClusterNode node, final
GridIoMessage msg, final IgniteRunnable msgC) {
++ private void processP2PMessage(
++ final UUID nodeId,
++ final GridIoMessage msg,
++ final IgniteRunnable msgC
++ ) {
+ workersCnt.increment();
+
+ Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
+ @Override protected void body() {
+ try {
+ threadProcessingMessage(true);
+
+ GridMessageListener lsnr = lsnrMap.get(msg.topic());
+
+ if (lsnr == null)
+ return;
+
+ Object obj = msg.message();
+
+ assert obj != null;
+
- lsnr.onMessage(node.id(), obj);
++ lsnr.onMessage(nodeId, obj);
+ }
+ finally {
+ threadProcessingMessage(false);
+
+ workersCnt.decrement();
+
+ msgC.run();
+ }
+ }
+ };
+
+ try {
+ p2pPool.execute(c);
+ }
+ catch (RejectedExecutionException e) {
+ U.error(log, "Failed to process P2P message due to execution
rejection. Increase the upper bound " +
+ "on 'ExecutorService' provided by
'GridConfiguration.getPeerClassLoadingExecutorService()'. " +
+ "Will attempt to process message in the listener thread
instead.", e);
+
+ c.run();
+ }
+ }
+
+ /**
- * @param node Node.
++ * @param nodeId Node ID.
+ * @param msg Message.
+ * @param plc Execution policy.
+ * @param msgC Closure to call when message processing finished.
+ */
- private void processRegularMessage(final ClusterNode node, final
GridIoMessage msg, GridIoPolicy plc,
- final IgniteRunnable msgC) {
++ private void processRegularMessage(
++ final UUID nodeId,
++ final GridIoMessage msg,
++ GridIoPolicy plc,
++ final IgniteRunnable msgC
++ ) {
+ workersCnt.increment();
+
+ Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
+ @Override protected void body() {
+ try {
+ threadProcessingMessage(true);
+
- processRegularMessage0(msg, node.id());
++ processRegularMessage0(msg, nodeId);
+ }
+ finally {
+ threadProcessingMessage(false);
+
+ workersCnt.decrement();
+
+ msgC.run();
+ }
+ }
+ };
+
+ try {
+ pool(plc).execute(c);
+ }
+ catch (RejectedExecutionException e) {
+ U.error(log, "Failed to process regular message due to execution
rejection. Increase the upper bound " +
+ "on 'ExecutorService' provided by
'GridConfiguration.getExecutorService()'. " +
+ "Will attempt to process message in the listener thread
instead.", e);
+
+ c.run();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ * @param nodeId Node ID.
+ */
+ @SuppressWarnings("deprecation")
+ private void processRegularMessage0(GridIoMessage msg, UUID nodeId) {
+ GridMessageListener lsnr = lsnrMap.get(msg.topic());
+
+ if (lsnr == null)
+ return;
+
+ Object obj = msg.message();
+
+ assert obj != null;
+
+ lsnr.onMessage(nodeId, obj);
+ }
+
+ /**
- * @param node Node.
++ * @param nodeId Node ID.
+ * @param msg Ordered message.
+ * @param plc Execution policy.
- * @param msgC Closure to call when message processing finished.
++ * @param msgC Closure to call when message processing finished ({@code
null} for sync processing).
+ */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- private void processOrderedMessage(final ClusterNode node, final
GridIoMessage msg, final GridIoPolicy plc,
- final IgniteRunnable msgC) {
++ private void processOrderedMessage(
++ final UUID nodeId,
++ final GridIoMessage msg,
++ final GridIoPolicy plc,
++ @Nullable final IgniteRunnable msgC
++ ) {
+ assert msg != null;
+
- workersCnt.increment();
-
- Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
- @Override protected void body() {
- try {
- threadProcessingMessage(true);
-
- processOrderedMessage0(msg, plc, node.id());
- }
- finally {
- threadProcessingMessage(false);
-
- workersCnt.decrement();
-
- msgC.run();
- }
- }
- };
-
- try {
- pool(plc).execute(c);
- }
- catch (RejectedExecutionException e) {
- U.error(log, "Failed to process ordered message due to execution
rejection. " +
- "Increase the upper bound on executor service provided by
corresponding " +
- "configuration property. Will attempt to process message in
the listener " +
- "thread instead [msgPlc=" + plc + ']', e);
-
- c.run();
- }
- }
-
- /**
- * @param msg Message.
- * @param plc Policy.
- * @param nodeId Node ID.
- */
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- private void processOrderedMessage0(GridIoMessage msg, GridIoPolicy plc,
UUID nodeId) {
+ long timeout = msg.timeout();
+ boolean skipOnTimeout = msg.skipOnTimeout();
+
+ boolean isNew = false;
+
+ ConcurrentMap<UUID, GridCommunicationMessageSet> map;
+
+ GridCommunicationMessageSet set = null;
+
+ while (true) {
+ map = msgSetMap.get(msg.topic());
+
+ if (map == null) {
+ set = new GridCommunicationMessageSet(plc, msg.topic(),
nodeId, timeout, skipOnTimeout, msg);
+
+ map = new ConcurrentHashMap0<>();
+
+ map.put(nodeId, set);
+
+ ConcurrentMap<UUID, GridCommunicationMessageSet> old =
msgSetMap.putIfAbsent(
+ msg.topic(), map);
+
+ if (old != null)
+ map = old;
+ else {
+ isNew = true;
+
+ // Put succeeded.
+ break;
+ }
+ }
+
+ boolean rmv = false;
+
+ synchronized (map) {
+ if (map.isEmpty())
+ rmv = true;
+ else {
+ set = map.get(nodeId);
+
+ if (set == null) {
+ GridCommunicationMessageSet old =
map.putIfAbsent(nodeId,
+ set = new GridCommunicationMessageSet(plc,
msg.topic(),
+ nodeId, timeout, skipOnTimeout, msg));
+
+ assert old == null;
+
+ isNew = true;
+
+ // Put succeeded.
+ break;
+ }
+ }
+ }
+
+ if (rmv)
+ msgSetMap.remove(msg.topic(), map);
+ else {
+ assert set != null;
+ assert !isNew;
+
+ set.add(msg);
+
+ break;
+ }
+ }
+
- if (ctx.discovery().node(nodeId) == null) {
++ if (isNew && ctx.discovery().node(nodeId) == null) {
+ if (log.isDebugEnabled())
+ log.debug("Message is ignored as sender has left the grid: "
+ msg);
+
+ assert map != null;
+
+ boolean rmv;
+
+ synchronized (map) {
+ map.remove(nodeId);
+
+ rmv = map.isEmpty();
+ }
+
+ if (rmv)
+ msgSetMap.remove(msg.topic(), map);
+
+ return;
+ }
+
+ if (isNew && set.endTime() != Long.MAX_VALUE)
+ ctx.timeout().addTimeoutObject(set);
+
- GridMessageListener lsnr = lsnrMap.get(msg.topic());
++ if (set.reserved()) {
++ // Set is reserved which means that it is currently processed by
worker thread.
++ msgC.run();
+
- if (lsnr != null)
- unwindMessageSet(set, lsnr, false);
- else if (closedTopics.contains(msg.topic())) {
- if (log.isDebugEnabled())
- log.debug("Message is ignored as it came for the closed
topic: " + msg);
++ return;
++ }
+
- assert map != null;
++ final GridMessageListener lsnr = lsnrMap.get(msg.topic());
++
++ if (lsnr == null) {
++ if (closedTopics.contains(msg.topic())) {
++ if (log.isDebugEnabled())
++ log.debug("Message is ignored as it came for the closed
topic: " + msg);
++
++ assert map != null;
++
++ msgSetMap.remove(msg.topic(), map);
++ }
++ else if (log.isDebugEnabled()) {
++ // Note that we simply keep messages if listener is not
++ // registered yet, until one will be registered.
++ log.debug("Received message for unknown listener (messages
will be kept until a " +
++ "listener is registered): " + msg);
++ }
++
++ // Mark the message as processed.
++ if (msgC != null)
++ msgC.run();
+
- msgSetMap.remove(msg.topic(), map);
++ return;
+ }
- else if (log.isDebugEnabled()) {
- // Note that we simply keep messages if listener is not
- // registered yet, until one will be registered.
- log.debug("Received message for unknown listener (messages will
be kept until a " +
- "listener is registered): " + msg);
++
++ if (msgC == null) {
++ // Message from local node can be processed in sync manner.
++ assert locNodeId.equals(nodeId);
++
++ unwindMessageSet(set, lsnr);
++
++ return;
++ }
++
++ // Set is not reserved and new worker should be submitted.
++ workersCnt.increment();
++
++ final GridCommunicationMessageSet msgSet0 = set;
++
++ Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
++ @Override protected void body() {
++ try {
++ threadProcessingMessage(true);
++
++ unwindMessageSet(msgSet0, lsnr);
++ }
++ finally {
++ threadProcessingMessage(false);
++
++ workersCnt.decrement();
++
++ msgC.run();
++ }
++ }
++ };
++
++ try {
++ pool(plc).execute(c);
++ }
++ catch (RejectedExecutionException e) {
++ U.error(log, "Failed to process ordered message due to execution
rejection. " +
++ "Increase the upper bound on executor service provided by
corresponding " +
++ "configuration property. Will attempt to process message in
the listener " +
++ "thread instead [msgPlc=" + plc + ']', e);
++
++ c.run();
+ }
+ }
+
+ /**
+ * @param msgSet Message set to unwind.
+ * @param lsnr Listener to notify.
- * @param force Whether to force unwind and drop missing
- * ordered messages that are not received yet.
+ */
- @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
"deprecation"})
- private void unwindMessageSet(GridCommunicationMessageSet msgSet,
GridMessageListener lsnr, boolean force) {
++ private void unwindMessageSet(GridCommunicationMessageSet msgSet,
GridMessageListener lsnr) {
+ // Loop until message set is empty or
+ // another thread owns the reservation.
+ while (true) {
+ if (msgSet.reserve()) {
+ try {
- Collection<GridIoMessage> orderedMsgs =
msgSet.unwind(force);
-
- if (!orderedMsgs.isEmpty()) {
- for (GridIoMessage msg : orderedMsgs) {
- Object obj = msg.message();
-
- assert obj != null;
-
- lsnr.onMessage(msgSet.nodeId(), obj);
- }
- }
- else if (log.isDebugEnabled())
- log.debug("No messages were unwound: " + msgSet);
++ msgSet.unwind(lsnr);
+ }
+ finally {
+ msgSet.release();
+ }
+
+ // Check outside of reservation block.
+ if (!msgSet.changed()) {
+ if (log.isDebugEnabled())
+ log.debug("Message set has not been changed: " +
msgSet);
+
+ break;
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Another thread owns reservation: " + msgSet);
+
+ return;
+ }
+ }
+ }
+
+ /**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param topicOrd GridTopic enumeration ordinal.
+ * @param msg Message to send.
+ * @param plc Type of processing.
- * @param msgId Message ID.
++ * @param ordered Ordered flag.
+ * @param timeout Timeout.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
- private void send(ClusterNode node, Object topic, int topicOrd,
GridTcpCommunicationMessageAdapter msg,
- GridIoPolicy plc, long msgId, long timeout, boolean skipOnTimeout)
throws IgniteCheckedException {
++ private void send(
++ ClusterNode node,
++ Object topic,
++ int topicOrd,
++ GridTcpCommunicationMessageAdapter msg,
++ GridIoPolicy plc,
++ boolean ordered,
++ long timeout,
++ boolean skipOnTimeout
++ ) throws IgniteCheckedException {
+ assert node != null;
+ assert topic != null;
+ assert msg != null;
+ assert plc != null;
+
- GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg,
msgId, timeout, skipOnTimeout);
++ GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg,
ordered, timeout, skipOnTimeout);
+
+ if (locNodeId.equals(node.id())) {
+ assert plc != P2P_POOL;
+
+ CommunicationListener commLsnr = this.commLsnr;
+
+ if (commLsnr == null)
+ throw new IgniteCheckedException("Trying to send message when
grid is not fully started.");
+
- if (msgId > 0)
- processOrderedMessage0(ioMsg, plc, locNodeId);
++ if (ordered)
++ processOrderedMessage(locNodeId, ioMsg, plc, null);
+ else
+ processRegularMessage0(ioMsg, locNodeId);
+ }
+ else {
+ if (topicOrd < 0)
+ ioMsg.topicBytes(marsh.marshal(topic));
+
+ try {
+ getSpi().sendMessage(node, ioMsg);
+ }
+ catch (IgniteSpiException e) {
+ throw new IgniteCheckedException("Failed to send message
(node may have left the grid or " +
+ "TCP connection cannot be established due to firewall
issues) " +
+ "[node=" + node + ", topic=" + topic +
+ ", msg=" + msg + ", policy=" + plc + ']', e);
+ }
+ }
+ }
+
+ /**
+ * @param nodeId Id of destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(UUID nodeId, Object topic,
GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+ throws IgniteCheckedException {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to send message to node
(has node left grid?): " + nodeId);
+
+ send(node, topic, msg, plc);
+ }
+
+ /**
+ * @param nodeId Id of destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ public void send(UUID nodeId, GridTopic topic,
GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+ throws IgniteCheckedException {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to send message to node
(has node left grid?): " + nodeId);
+
- send(node, topic, topic.ordinal(), msg, plc, -1, 0, false);
++ send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(ClusterNode node, Object topic,
GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+ throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, -1, 0, false);
++ send(node, topic, -1, msg, plc, false, 0, false);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(ClusterNode node, GridTopic topic,
GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
+ throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, -1, 0, false);
- }
-
- /**
- * @param topic Message topic.
- * @param nodeId Node ID.
- * @return Next ordered message ID.
- */
- public long nextMessageId(Object topic, UUID nodeId) {
- ConcurrentMap<UUID, AtomicLong> map = msgIdMap.get(topic);
-
- if (map == null) {
- ConcurrentMap<UUID, AtomicLong> lastMap =
msgIdMap.putIfAbsent(topic,
- map = new ConcurrentHashMap8<>());
-
- if (lastMap != null)
- map = lastMap;
- }
-
- AtomicLong msgId = map.get(nodeId);
-
- if (msgId == null) {
- AtomicLong lastMsgId = map.putIfAbsent(nodeId, msgId = new
AtomicLong(0));
-
- if (lastMsgId != null)
- msgId = lastMsgId;
- }
-
- long id = msgId.incrementAndGet();
-
- if (log.isDebugEnabled())
- log.debug("Got next message ID [topic=" + topic + ", nodeId=" +
nodeId + ", id=" + id + ']');
-
- return id;
- }
-
- /**
- * @param topic Message topic.
- */
- public void removeMessageId(Object topic) {
- if (log.isDebugEnabled())
- log.debug("Remove message ID for topic: " + topic);
-
- msgIdMap.remove(topic);
++ send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
- * @param msgId Ordered message ID.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
- public void sendOrderedMessage(ClusterNode node, Object topic, long
msgId, GridTcpCommunicationMessageAdapter msg,
- GridIoPolicy plc, long timeout, boolean skipOnTimeout) throws
IgniteCheckedException {
++ public void sendOrderedMessage(
++ ClusterNode node,
++ Object topic,
++ GridTcpCommunicationMessageAdapter msg,
++ GridIoPolicy plc,
++ long timeout,
++ boolean skipOnTimeout
++ ) throws IgniteCheckedException {
+ assert timeout > 0 || skipOnTimeout;
+
- send(node, topic, (byte)-1, msg, plc, msgId, timeout, skipOnTimeout);
- }
-
- /**
- * @param nodeId Destination node.
- * @param topic Topic to send the message to.
- * @param msgId Ordered message ID.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param timeout Timeout to keep a message on receiving queue.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public void sendOrderedMessage(UUID nodeId, Object topic, long msgId,
GridTcpCommunicationMessageAdapter msg,
- GridIoPolicy plc, long timeout, boolean skipOnTimeout) throws
IgniteCheckedException {
- assert timeout > 0 || skipOnTimeout;
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null)
- throw new IgniteCheckedException("Failed to send message to node
(has node left grid?): " + nodeId);
-
- send(node, topic, (byte)-1, msg, plc, msgId, timeout, skipOnTimeout);
++ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param topic Topic to send the message to.
- * @param msgId Ordered message ID.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
- public void sendOrderedMessage(Collection<? extends ClusterNode> nodes,
Object topic, long msgId,
- GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc, long
timeout, boolean skipOnTimeout)
++ public void sendOrderedMessage(
++ Collection<? extends ClusterNode> nodes,
++ Object topic,
++ GridTcpCommunicationMessageAdapter msg,
++ GridIoPolicy plc,
++ long timeout,
++ boolean skipOnTimeout
++ )
+ throws IgniteCheckedException {
+ assert timeout > 0 || skipOnTimeout;
+
- send(nodes, topic, -1, msg, plc, msgId, timeout, skipOnTimeout);
++ send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
- public void send(Collection<? extends ClusterNode> nodes, Object topic,
GridTcpCommunicationMessageAdapter msg,
- GridIoPolicy plc) throws IgniteCheckedException {
- send(nodes, topic, -1, msg, plc, -1, 0, false);
++ public void send(
++ Collection<? extends ClusterNode> nodes,
++ Object topic,
++ GridTcpCommunicationMessageAdapter msg,
++ GridIoPolicy plc
++ ) throws IgniteCheckedException {
++ send(nodes, topic, -1, msg, plc, false, 0, false);
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
- public void send(Collection<? extends ClusterNode> nodes, GridTopic
topic, GridTcpCommunicationMessageAdapter msg,
- GridIoPolicy plc) throws IgniteCheckedException {
- send(nodes, topic, topic.ordinal(), msg, plc, -1, 0, false);
++ public void send(
++ Collection<? extends ClusterNode> nodes,
++ GridTopic topic,
++ GridTcpCommunicationMessageAdapter msg,
++ GridIoPolicy plc
++ ) throws IgniteCheckedException {
++ send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false);
+ }
+
+ /**
+ * Sends a peer deployable user message.
+ *
+ * @param nodes Destination nodes.
+ * @param msg Message to send.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendUserMessage(Collection<? extends ClusterNode> nodes,
Object msg) throws IgniteCheckedException {
+ sendUserMessage(nodes, msg, null, false, 0);
+ }
+
+ /**
+ * Sends a peer deployable user message.
+ *
+ * @param nodes Destination nodes.
+ * @param msg Message to send.
+ * @param topic Message topic to use.
+ * @param ordered Is message ordered?
+ * @param timeout Message timeout in milliseconds for ordered messages.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void sendUserMessage(Collection<? extends ClusterNode> nodes,
Object msg,
+ @Nullable Object topic, boolean ordered, long timeout) throws
IgniteCheckedException {
+ boolean loc = nodes.size() == 1 &&
F.first(nodes).id().equals(locNodeId);
+
+ byte[] serMsg = null;
+ byte[] serTopic = null;
+
+ if (!loc) {
+ serMsg = marsh.marshal(msg);
+
+ if (topic != null)
+ serTopic = marsh.marshal(topic);
+ }
+
+ GridDeployment dep = null;
+
+ String depClsName = null;
+
+ if (ctx.config().isPeerClassLoadingEnabled()) {
+ Class<?> cls0 = U.detectClass(msg);
+
+ if (U.isJdk(cls0) && topic != null)
+ cls0 = U.detectClass(topic);
+
+ dep = ctx.deploy().deploy(cls0, U.detectClassLoader(cls0));
+
+ if (dep == null)
+ throw new IgniteDeploymentException("Failed to deploy user
message: " + msg);
+
+ depClsName = cls0.getName();
+ }
+
+ GridTcpCommunicationMessageAdapter ioMsg = new GridIoUserMessage(
+ msg,
+ serMsg,
+ depClsName,
+ topic,
+ serTopic,
+ dep != null ? dep.classLoaderId() : null,
+ dep != null ? dep.deployMode() : null,
+ dep != null ? dep.userVersion() : null,
+ dep != null ? dep.participants() : null);
+
- if (ordered) {
- long msgId = nextMessageId(TOPIC_COMM_USER, locNodeId);
-
- sendOrderedMessage(nodes, TOPIC_COMM_USER, msgId, ioMsg,
PUBLIC_POOL, timeout, true);
- }
++ if (ordered)
++ sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL,
timeout, true);
+ else if (loc)
+ send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ else {
+ ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
+
+ Collection<? extends ClusterNode> rmtNodes = F.view(nodes,
F.remoteNodes(locNodeId));
+
+ if (locNode != null)
+ send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+
+ if (!rmtNodes.isEmpty())
+ send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ }
+ }
+
+ /**
+ * @param topic Topic to subscribe to.
+ * @param p Message predicate.
+ */
+ public void addUserMessageListener(@Nullable final Object topic,
@Nullable final IgniteBiPredicate<UUID, ?> p) {
+ if (p != null) {
+ try {
+ addMessageListener(TOPIC_COMM_USER,
+ new GridUserMessageListener(topic,
(IgniteBiPredicate<UUID, Object>)p));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * @param topic Topic to unsubscribe from.
+ * @param p Message predicate.
+ */
+ public void removeUserMessageListener(@Nullable Object topic,
IgniteBiPredicate<UUID, ?> p) {
+ try {
+ removeMessageListener(TOPIC_COMM_USER,
+ new GridUserMessageListener(topic, (IgniteBiPredicate<UUID,
Object>)p));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param topicOrd Topic ordinal value.
+ * @param msg Message to send.
+ * @param plc Type of processing.
- * @param msgId Message ID (for ordered messages) or -1 (for unordered
messages).
++ * @param ordered Ordered flag.
+ * @param timeout Message timeout.
+ * @param skipOnTimeout Whether message can be skipped in timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
- private void send(Collection<? extends ClusterNode> nodes, Object topic,
int topicOrd,
- GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc, long msgId,
long timeout, boolean skipOnTimeout)
++ private void send(
++ Collection<? extends ClusterNode> nodes,
++ Object topic,
++ int topicOrd,
++ GridTcpCommunicationMessageAdapter msg,
++ GridIoPolicy plc,
++ boolean ordered,
++ long timeout,
++ boolean skipOnTimeout
++ )
+ throws IgniteCheckedException {
+ assert nodes != null;
+ assert topic != null;
+ assert msg != null;
+ assert plc != null;
+
- if (msgId < 0)
++ if (!ordered)
+ assert F.find(nodes, null, F.localNode(locNodeId)) == null :
+ "Internal GridGain code should never call the method with
local node in a node list.";
+
+ try {
+ // Small optimization, as communication SPIs may have lighter
implementation for sending
+ // messages to one node vs. many.
+ if (!nodes.isEmpty()) {
+ boolean first = true;
+
+ for (ClusterNode node : nodes) {
+ GridTcpCommunicationMessageAdapter msg0 = first ? msg :
msg.clone();
+
+ first = false;
+
- send(node, topic, topicOrd, msg0, plc, msgId, timeout,
skipOnTimeout);
++ send(node, topic, topicOrd, msg0, plc, ordered, timeout,
skipOnTimeout);
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Failed to send message to empty nodes collection
[topic=" + topic + ", msg=" +
+ msg + ", policy=" + plc + ']');
+ }
+ catch (IgniteSpiException e) {
+ throw new IgniteCheckedException("Failed to send message (nodes
may have left the grid or " +
+ "TCP connection cannot be established due to firewall issues)
" +
+ "[nodes=" + nodes + ", topic=" + topic +
+ ", msg=" + msg + ", policy=" + plc + ']', e);
+ }
+ }
+
+ /**
+ * @param topic Listener's topic.
+ * @param lsnr Listener to add.
+ */
+ @SuppressWarnings({"TypeMayBeWeakened", "deprecation"})
+ public void addMessageListener(GridTopic topic, GridMessageListener lsnr)
{
+ addMessageListener((Object)topic, lsnr);
+ }
+
+ /**
+ * @param lsnr Listener to add.
+ */
+ public void addDisconnectListener(GridDisconnectListener lsnr) {
+ disconnectLsnrs.add(lsnr);
+ }
+
+ /**
+ * @param topic Listener's topic.
+ * @param lsnr Listener to add.
+ */
+ @SuppressWarnings({"deprecation",
"SynchronizationOnLocalVariableOrMethodParameter"})
+ public void addMessageListener(Object topic, final GridMessageListener
lsnr) {
+ assert lsnr != null;
+ assert topic != null;
+
+ // Make sure that new topic is not in the list of closed topics.
+ closedTopics.remove(topic);
+
+ GridMessageListener lsnrs;
+
+ for (;;) {
+ lsnrs = lsnrMap.putIfAbsent(topic, lsnr);
+
+ if (lsnrs == null) {
+ lsnrs = lsnr;
+
+ break;
+ }
+
+ assert lsnrs != null;
+
+ if (!(lsnrs instanceof ArrayListener)) { // We are putting the
second listener, creating array.
+ GridMessageListener arrLsnr = new ArrayListener(lsnrs, lsnr);
+
+ if (lsnrMap.replace(topic, lsnrs, arrLsnr)) {
+ lsnrs = arrLsnr;
+
+ break;
+ }
+ }
+ else {
+ if (((ArrayListener)lsnrs).add(lsnr))
+ break;
+
+ // Add operation failed because array is already empty and is
about to be removed, helping and retrying.
+ lsnrMap.remove(topic, lsnrs);
+ }
+ }
+
+ Map<UUID, GridCommunicationMessageSet> map = msgSetMap.get(topic);
+
+ Collection<GridCommunicationMessageSet> msgSets = map != null ?
map.values() : null;
+
+ if (msgSets != null) {
+ final GridMessageListener lsnrs0 = lsnrs;
+
+ boolean success = true;
+
+ try {
+ for (final GridCommunicationMessageSet msgSet : msgSets) {
+ success = false;
+
+ workersCnt.increment();
+
+ pool(msgSet.policy()).execute(new
GridWorker(ctx.gridName(), "msg-worker", log) {
+ @Override protected void body() {
+ try {
- unwindMessageSet(msgSet, lsnrs0, false);
++ unwindMessageSet(msgSet, lsnrs0);
+ }
+ finally {
+ workersCnt.decrement();
+ }
+ }
+ });
+
+ success = true;
+ }
+ }
+ catch (RejectedExecutionException e) {
+ U.error(log, "Failed to process delayed message due to
execution rejection. Increase the upper bound " +
+ "on executor service provided in
'GridConfiguration.getExecutorService()'). Will attempt to " +
+ "process message in the listener thread instead.", e);
+
+ for (GridCommunicationMessageSet msgSet : msgSets)
- unwindMessageSet(msgSet, lsnr, false);
++ unwindMessageSet(msgSet, lsnr);
+ }
+ finally {
+ // Decrement for last runnable submission of which failed.
+ if (!success)
+ workersCnt.decrement();
+ }
+ }
+ }
+
+ /**
+ * @param topic Message topic.
+ * @return Whether or not listener was indeed removed.
+ */
+ public boolean removeMessageListener(GridTopic topic) {
+ return removeMessageListener((Object)topic);
+ }
+
+ /**
+ * @param topic Message topic.
+ * @return Whether or not listener was indeed removed.
+ */
+ public boolean removeMessageListener(Object topic) {
+ return removeMessageListener(topic, null);
+ }
+
+ /**
+ * @param topic Listener's topic.
+ * @param lsnr Listener to remove.
+ * @return Whether or not the lsnr was removed.
+ */
+ @SuppressWarnings("deprecation")
+ public boolean removeMessageListener(GridTopic topic, @Nullable
GridMessageListener lsnr) {
+ return removeMessageListener((Object)topic, lsnr);
+ }
+
+ /**
+ * @param topic Listener's topic.
+ * @param lsnr Listener to remove.
+ * @return Whether or not the lsnr was removed.
+ */
+ @SuppressWarnings({"deprecation",
"SynchronizationOnLocalVariableOrMethodParameter"})
+ public boolean removeMessageListener(Object topic, @Nullable final
GridMessageListener lsnr) {
+ assert topic != null;
+
+ boolean rmv = true;
+
+ Collection<GridCommunicationMessageSet> msgSets = null;
+
+ // If listener is null, then remove all listeners.
+ if (lsnr == null) {
+ closedTopics.add(topic);
+
+ rmv = lsnrMap.remove(topic) != null;
+
+ Map<UUID, GridCommunicationMessageSet> map =
msgSetMap.remove(topic);
+
+ if (map != null)
+ msgSets = map.values();
+ }
+ else {
+ for (;;) {
+ GridMessageListener lsnrs = lsnrMap.get(topic);
+
+ // If removing listener before subscription happened.
+ if (lsnrs == null) {
+ closedTopics.add(topic);
+
+ Map<UUID, GridCommunicationMessageSet> map =
msgSetMap.remove(topic);
+
+ if (map != null)
+ msgSets = map.values();
+
+ rmv = false;
+
+ break;
+ }
+ else {
+ boolean empty = false;
+
+ if (!(lsnrs instanceof ArrayListener)) {
+ if (lsnrs.equals(lsnr)) {
+ if (!lsnrMap.remove(topic, lsnrs))
+ continue; // Retry because it can be packed
to array listener.
+
+ empty = true;
+ }
+ else
+ rmv = false;
+ }
+ else {
+ ArrayListener arrLsnr = (ArrayListener)lsnrs;
+
+ if (arrLsnr.remove(lsnr))
+ empty = arrLsnr.isEmpty();
+ else
+ // Listener was not found.
+ rmv = false;
+
+ if (empty)
+ lsnrMap.remove(topic, lsnrs);
+ }
+
+ // If removing last subscribed listener.
+ if (empty) {
+ closedTopics.add(topic);
+
+ Map<UUID, GridCommunicationMessageSet> map =
msgSetMap.remove(topic);
+
+ if (map != null)
+ msgSets = map.values();
+ }
+
+ break;
+ }
+ }
+ }
+
+ if (msgSets != null)
+ for (GridCommunicationMessageSet msgSet : msgSets)
+ ctx.timeout().removeTimeoutObject(msgSet);
+
+ if (rmv && log.isDebugEnabled())
+ log.debug("Removed message listener [topic=" + topic + ", lsnr="
+ lsnr + ']');
+
+ return rmv;
+ }
+
+ /**
+ * Gets sent messages count.
+ *
+ * @return Sent messages count.
+ */
+ public int getSentMessagesCount() {
+ return getSpi().getSentMessagesCount();
+ }
+
+ /**
+ * Gets sent bytes count.
+ *
+ * @return Sent bytes count.
+ */
+ public long getSentBytesCount() {
+ return getSpi().getSentBytesCount();
+ }
+
+ /**
+ * Gets received messages count.
+ *
+ * @return Received messages count.
+ */
+ public int getReceivedMessagesCount() {
+ return getSpi().getReceivedMessagesCount();
+ }
+
+ /**
+ * Gets received bytes count.
+ *
+ * @return Received bytes count.
+ */
+ public long getReceivedBytesCount() {
+ return getSpi().getReceivedBytesCount();
+ }
+
+ /**
+ * Gets outbound messages queue size.
+ *
+ * @return Outbound messages queue size.
+ */
+ public int getOutboundMessagesQueueSize() {
+ return getSpi().getOutboundMessagesQueueSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ X.println(">>>");
+ X.println(">>> IO manager memory stats [grid=" + ctx.gridName() +
']');
+ X.println(">>> lsnrMapSize: " + lsnrMap.size());
+ X.println(">>> msgSetMapSize: " + msgSetMap.size());
- X.println(">>> msgIdMapSize: " + msgIdMap.size());
+ X.println(">>> closedTopicsSize: " + closedTopics.sizex());
+ X.println(">>> discoWaitMapSize: " + waitMap.size());
+ }
+
+ /**
+ * Linked chain of listeners.
+ */
+ private static class ArrayListener implements GridMessageListener {
+ /** */
+ private volatile GridMessageListener[] arr;
+
+ /**
+ * @param arr Array of listeners.
+ */
+ ArrayListener(GridMessageListener... arr) {
+ this.arr = arr;
+ }
+
+ /**
+ * Passes message to the whole chain.
+ *
+ * @param nodeId Node ID.
+ * @param msg Message.
+ */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ GridMessageListener[] arr0 = arr;
+
+ if (arr0 == null)
+ return;
+
+ for (GridMessageListener l : arr0)
+ l.onMessage(nodeId, msg);
+ }
+
+ /**
+ * @return {@code true} If this instance is empty.
+ */
+ boolean isEmpty() {
+ return arr == null;
+ }
+
+ /**
+ * @param l Listener.
+ * @return {@code true} If listener was removed.
+ */
+ synchronized boolean remove(GridMessageListener l) {
+ GridMessageListener[] arr0 = arr;
+
+ if (arr0 == null)
+ return false;
+
+ if (arr0.length == 1) {
+ if (!arr0[0].equals(l))
+ return false;
+
+ arr = null;
+
+ return true;
+ }
+
+ for (int i = 0; i < arr0.length; i++) {
+ if (arr0[i].equals(l)) {
+ int newLen = arr0.length - 1;
+
+ if (i == newLen) // Remove last.
+ arr = Arrays.copyOf(arr0, newLen);
+ else {
+ GridMessageListener[] arr1 = new
GridMessageListener[newLen];
+
+ if (i != 0) // Not remove first.
+ System.arraycopy(arr0, 0, arr1, 0, i);
+
+ System.arraycopy(arr0, i + 1, arr1, i, newLen - i);
+
+ arr = arr1;
+ }
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param l Listener.
+ * @return {@code true} if listener was added. Add can fail if this
instance is empty and is about to be removed
+ * from map.
+ */
+ synchronized boolean add(GridMessageListener l) {
+ GridMessageListener[] arr0 = arr;
+
+ if (arr0 == null)
+ return false;
+
+ int oldLen = arr0.length;
+
+ arr0 = Arrays.copyOf(arr0, oldLen + 1);
+
+ arr0[oldLen] = l;
+
+ arr = arr0;
+
+ return true;
+ }
+ }
+
+ /**
+ * This class represents a message listener wrapper that knows about peer
deployment.
+ */
+ private class GridUserMessageListener implements GridMessageListener {
+ /** Predicate listeners. */
+ private final IgniteBiPredicate<UUID, Object> predLsnr;
+
+ /** User message topic. */
+ private final Object topic;
+
+ /**
+ * @param topic User topic.
+ * @param predLsnr Predicate listener.
+ * @throws IgniteCheckedException If failed to inject resources to
predicates.
+ */
+ GridUserMessageListener(@Nullable Object topic, @Nullable
IgniteBiPredicate<UUID, Object> predLsnr)
+ throws IgniteCheckedException {
+ this.topic = topic;
+ this.predLsnr = predLsnr;
+
+ if (predLsnr != null)
+ ctx.resource().injectGeneric(predLsnr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
"ConstantConditions",
+ "OverlyStrongTypeCast"})
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ if (!(msg instanceof GridIoUserMessage)) {
+ U.error(log, "Received unknown message (potentially fatal
problem): " + msg);
+
+ return;
+ }
+
+ GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null) {
+ U.warn(log, "Failed to resolve sender node (did the node left
grid?): " + nodeId);
+
+ return;
+ }
+
+ Object msgBody = ioMsg.body();
+
+ assert msgBody != null || ioMsg.bodyBytes() != null;
+
+ try {
+ byte[] msgTopicBytes = ioMsg.topicBytes();
+
+ Object msgTopic = ioMsg.topic();
+
+ GridDeployment dep = ioMsg.deployment();
+
+ if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+ ioMsg.deploymentClassName() != null) {
+ dep = ctx.deploy().getGlobalDeployment(
+ ioMsg.deploymentMode(),
+ ioMsg.deploymentClassName(),
+ ioMsg.deploymentClassName(),
+ ioMsg.userVersion(),
+ nodeId,
+ ioMsg.classLoaderId(),
+ ioMsg.loaderParticipants(),
+ null);
+
+ if (dep == null)
+ throw new IgniteDeploymentException(
+ "Failed to obtain deployment information for user
message. " +
+ "If you are using custom message or topic class,
try implementing " +
+ "GridPeerDeployAware interface. [msg=" + ioMsg +
']');
+
+ ioMsg.deployment(dep); // Cache deployment.
+ }
+
+ // Unmarshall message topic if needed.
+ if (msgTopic == null && msgTopicBytes != null) {
+ msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ?
dep.classLoader() : null);
+
+ ioMsg.topic(msgTopic); // Save topic to avoid future
unmarshallings.
+ }
+
+ if (!F.eq(topic, msgTopic))
+ return;
+
+ if (msgBody == null) {
+ msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null
? dep.classLoader() : null);
+
+ ioMsg.body(msgBody); // Save body to avoid future
unmarshallings.
+ }
+
+ // Resource injection.
+ if (dep != null)
+ ctx.resource().inject(dep,
dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal user message [node=" +
nodeId + ", message=" +
+ msg + ']', e);
+ }
+
+ if (msgBody != null) {
+ if (predLsnr != null) {
+ if (!predLsnr.apply(nodeId, msgBody))
+ removeMessageListener(TOPIC_COMM_USER, this);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ GridUserMessageListener l = (GridUserMessageListener)o;
+
+ return F.eq(predLsnr, l.predLsnr) && F.eq(topic, l.topic);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = predLsnr != null ? predLsnr.hashCode() : 0;
+
+ res = 31 * res + (topic != null ? topic.hashCode() : 0);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridUserMessageListener.class, this);
+ }
+ }
+
+ /**
+ * Ordered communication message set.
+ */
+ private class GridCommunicationMessageSet implements GridTimeoutObject {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private long endTime;
+
+ /** */
+ private final IgniteUuid timeoutId;
+
+ /** */
++ @GridToStringInclude
+ private final Object topic;
+
+ /** */
+ private final GridIoPolicy plc;
+
+ /** */
+ @GridToStringInclude
- private final List<IgniteBiTuple<GridIoMessage, Long>> msgs = new
ArrayList<>();
-
- /** */
- private long nextMsgId = 1;
++ private final Queue<IgniteBiTuple<GridIoMessage, Long>> msgs = new
ConcurrentLinkedDeque<>();
+
+ /** */
+ private final AtomicBoolean reserved = new AtomicBoolean();
+
+ /** */
+ private final long timeout;
+
+ /** */
+ private final boolean skipOnTimeout;
+
+ /** */
+ private long lastTs;
+
- /** */
- private volatile boolean changed;
-
+ /**
+ * @param plc Communication policy.
+ * @param topic Communication topic.
+ * @param nodeId Node ID.
+ * @param timeout Timeout.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @param msg Message to add immediately.
+ */
- GridCommunicationMessageSet(GridIoPolicy plc, Object topic, UUID
nodeId, long timeout, boolean skipOnTimeout,
- GridIoMessage msg) {
++ GridCommunicationMessageSet(
++ GridIoPolicy plc,
++ Object topic,
++ UUID nodeId,
++ long timeout,
++ boolean skipOnTimeout,
++ GridIoMessage msg
++ ) {
+ assert nodeId != null;
+ assert topic != null;
+ assert plc != null;
+ assert msg != null;
+
+ this.plc = plc;
+ this.nodeId = nodeId;
+ this.topic = topic;
+ this.timeout = timeout == 0 ? ctx.config().getNetworkTimeout() :
timeout;
+ this.skipOnTimeout = skipOnTimeout;
+
+ endTime = endTime(timeout);
+
+ timeoutId = IgniteUuid.randomUuid();
+
+ lastTs = U.currentTimeMillis();
+
+ msgs.add(F.t(msg, lastTs));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return timeoutId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ @Override public void onTimeout() {
+ GridMessageListener lsnr = lsnrMap.get(topic);
+
+ if (lsnr != null) {
+ long delta = 0;
+
+ if (skipOnTimeout) {
+ while (true) {
+ delta = 0;
+
+ boolean unwind = false;
+
+ synchronized (this) {
+ if (!msgs.isEmpty()) {
+ delta = U.currentTimeMillis() - lastTs;
+
+ if (delta >= timeout)
+ unwind = true;
+ }
+ }
+
+ if (unwind)
- unwindMessageSet(this, lsnr, true);
++ unwindMessageSet(this, lsnr);
+ else
+ break;
+ }
+ }
+
+ // Someone is still listening to messages, so delay set
removal.
+ endTime = endTime(timeout - delta);
+
+ ctx.timeout().addTimeoutObject(this);
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Removing message set due to timeout: " + this);
+
+ ConcurrentMap<UUID, GridCommunicationMessageSet> map =
msgSetMap.get(topic);
+
+ if (map != null) {
+ boolean rmv;
+
+ synchronized (map) {
+ rmv = map.remove(nodeId, this) && map.isEmpty();
+ }
+
+ if (rmv)
+ msgSetMap.remove(topic, map);
+ }
+ }
+
+ /**
+ * @return ID of node that sent the messages in the set.
+ */
+ UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Communication policy.
+ */
+ GridIoPolicy policy() {
+ return plc;
+ }
+
+ /**
+ * @return Message topic.
+ */
+ Object topic() {
+ return topic;
+ }
+
+ /**
+ * @return {@code True} if successful.
+ */
+ boolean reserve() {
+ return reserved.compareAndSet(false, true);
+ }
+
+ /**
++ * @return {@code True} if set is reserved.
++ */
++ boolean reserved() {
++ return reserved.get();
++ }
++
++ /**
+ * Releases reservation.
+ */
+ void release() {
+ assert reserved.get() : "Message set was not reserved: " + this;
+
+ reserved.set(false);
+ }
+
+ /**
- * @param force Whether to force unwind and drop missing
- * ordered messages that are not received yet.
- * @return Session request.
++ * @param lsnr Listener to notify.
+ */
- synchronized Collection<GridIoMessage> unwind(boolean force) {
++ void unwind(GridMessageListener lsnr) {
+ assert reserved.get();
+
- changed = false;
-
- if (msgs.isEmpty())
- return Collections.emptyList();
-
- if (msgs.size() == 1) {
- IgniteBiTuple<GridIoMessage, Long> t = msgs.get(0);
-
- GridIoMessage msg = t.get1();
-
- if (force || msg.messageId() == nextMsgId) {
- if (msg.messageId() != nextMsgId) {
- for (long skipped = nextMsgId; skipped <
msg.messageId(); skipped++) {
- U.warn(log, "Skipped ordered message due to
timeout, consider increasing " +
- "networkTimeout configuration property
[topic=" + topic + ", msgId=" +
- skipped + ", timeout=" + timeout + ']');
- }
- }
-
- nextMsgId = msg.messageId() + 1;
-
- lastTs = t.get2();
-
- msgs.clear();
-
- return Collections.singleton(msg);
- }
-
- return Collections.emptyList();
- }
-
- // Sort before unwinding.
- Collections.sort(msgs, MSG_CMP);
-
- Collection<GridIoMessage> orderedMsgs = new LinkedList<>();
-
- for (Iterator<IgniteBiTuple<GridIoMessage, Long>> iter =
msgs.iterator(); iter.hasNext();) {
- IgniteBiTuple<GridIoMessage, Long> t = iter.next();
-
- GridIoMessage msg = t.get1();
-
- if (force || msg.messageId() == nextMsgId) {
- if (msg.messageId() != nextMsgId) {
- for (long skipped = nextMsgId; skipped <
msg.messageId(); skipped++) {
- U.warn(log, "Skipped ordered message due to
timeout, consider increasing " +
- "networkTimeout configuration property
[topic=" + topic + ", msgId=" +
- skipped + ", timeout=" + timeout + ']');
- }
- }
-
- force = false;
-
- orderedMsgs.add(msg);
-
- nextMsgId = msg.messageId() + 1;
-
- lastTs = t.get2();
-
- iter.remove();
- }
- else
- break;
- }
-
- return orderedMsgs;
++ for (IgniteBiTuple<GridIoMessage, Long> t = msgs.poll(); t !=
null; t = msgs.poll())
++ lsnr.onMessage(nodeId, t.get1().message());
+ }
+
+ /**
+ * @param msg Message to add.
+ */
- synchronized void add(GridIoMessage msg) {
- if (msg.messageId() >= nextMsgId) {
- msgs.add(F.t(msg, U.currentTimeMillis()));
-
- changed = true;
- }
- else {
- U.warn(log, "Received previously skipped ordered message
(will be dropped) [topic=" + topic +
- ", msgId=" + msg.messageId() + ", timeout=" + timeout +
']');
- }
++ void add(GridIoMessage msg) {
++ msgs.add(F.t(msg, U.currentTimeMillis()));
+ }
+
+ /**
+ * @return {@code True} if set has messages to unwind.
+ */
+ boolean changed() {
- return changed;
++ return !msgs.isEmpty();
+ }
+
+ /**
+ * Calculates end time with overflow check.
+ *
+ * @param timeout Timeout in milliseconds.
+ * @return End time in milliseconds.
+ */
+ private long endTime(long timeout) {
+ long endTime = U.currentTimeMillis() + timeout;
+
+ // Account for overflow.
+ if (endTime < 0)
+ endTime = Long.MAX_VALUE;
+
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
- @Override public synchronized String toString() {
++ @Override public String toString() {
+ return S.toString(GridCommunicationMessageSet.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ConcurrentHashMap0<K, V> extends
ConcurrentHashMap8<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int hash;
+
+ /**
+ * @param o Object to be compared for equality with this map.
+ * @return {@code True} only for {@code this}.
+ */
+ @Override public boolean equals(Object o) {
+ return o == this;
+ }
+
+ /**
+ * @return Identity hash code.
+ */
+ @Override public int hashCode() {
+ if (hash == 0) {
+ int hash0 = System.identityHashCode(this);
+
+ hash = hash0 != 0 ? hash0 : -1;
+ }
+
+ return hash;
+ }
+ }
+
+ /**
+ *
+ */
- private static class ConcurrentHashSet0<E> extends
GridConcurrentHashSet<E> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private int hash;
-
- /**
- *
- */
- private ConcurrentHashSet0() {
- super(1, 1, 1);
- }
-
- /**
- * @param o Object to be compared for equality with this map.
- * @return {@code True} only for {@code this}.
- */
- @Override public boolean equals(Object o) {
- return o == this;
- }
-
- /**
- * @return Identity hash code.
- */
- @Override public int hashCode() {
- if (hash == 0) {
- int hash0 = System.identityHashCode(this);
-
- hash = hash0 != 0 ? hash0 : -1;
- }
-
- return hash;
- }
- }
-
- /**
- *
- */
+ private static class DelayedMessage {
+ /** */
+ private final UUID nodeId;
+
+ /** */
+ private final GridIoMessage msg;
+
+ /** */
+ private final IgniteRunnable msgC;
+
- /** */
- private final long rcvTime = U.currentTimeMillis();
-
+ /**
+ * @param nodeId Node ID.
+ * @param msg Message.
+ * @param msgC Callback.
+ */
+ private DelayedMessage(UUID nodeId, GridIoMessage msg, IgniteRunnable
msgC) {
+ this.nodeId = nodeId;
+ this.msg = msg;
+ this.msgC = msgC;
+ }
+
+ /**
- * @return Receive time.
- */
- public long receiveTime() {
- return rcvTime;
- }
-
- /**
+ * @return Message char.
+ */
+ public IgniteRunnable callback() {
+ return msgC;
+ }
+
+ /**
+ * @return Message.
+ */
+ public GridIoMessage message() {
+ return msg;
+ }
+
+ /**
+ * @return Node id.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DelayedMessage.class, this, super.toString());
+ }
+ }
+ }