IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2d16d99f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2d16d99f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2d16d99f Branch: refs/heads/ignite-426 Commit: 2d16d99f64fdfbff591124abcb4c5d42ac29d8bf Parents: dad4691 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu Jul 30 16:48:30 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu Jul 30 16:48:30 2015 -0700 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 2 + .../org/apache/ignite/internal/GridTopic.java | 83 -------- .../managers/communication/GridIoManager.java | 199 ++++++++++++++++++- .../managers/communication/GridIoMessage.java | 48 +++-- .../processors/cache/GridCacheIoManager.java | 99 +++++++-- .../processors/cache/GridCacheUtils.java | 11 - .../dht/atomic/GridAtomicMappingKey.java | 86 ++++++++ .../dht/atomic/GridAtomicRequestTopic.java | 96 +++++++++ .../dht/atomic/GridDhtAtomicCache.java | 33 ++- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 89 ++------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 172 +++++++++++----- .../dht/atomic/GridNearAtomicUpdateRequest.java | 63 ++++-- .../atomic/GridNearAtomicUpdateResponse.java | 36 +++- .../preloader/GridDhtPartitionDemandPool.java | 15 +- .../query/GridCacheDistributedQueryManager.java | 8 +- .../resources/META-INF/classnames.properties | 2 +- 16 files changed, 730 insertions(+), 312 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index c560118..2510d65 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -142,6 +142,8 @@ public class MessageCodeGenerator { MessageCodeGenerator gen = new MessageCodeGenerator(srcDir); +// gen.generateAndWrite(GridIoMessage.class); + // gen.generateAndWrite(GridNearAtomicUpdateRequest.class); // gen.generateAndWrite(GridNearAtomicUpdateResponse.class); // gen.generateAndWrite(GridDhtAtomicUpdateRequest.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index e9da40c..56aea1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -183,15 +183,6 @@ public enum GridTopic { } /** - * @param id1 ID1. - * @param id2 ID2. - * @return Grid message topic with specified IDs. - */ - public Object topic(int id1, int id2) { - return new T9(this, id1, id2); - } - - /** * */ private static class T1 implements Externalizable { @@ -765,78 +756,4 @@ public enum GridTopic { return S.toString(T8.class, this); } } - - /** - */ - private static class T9 implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private GridTopic topic; - - /** */ - private int id1; - - /** */ - private int id2; - - /** - * No-arg constructor needed for {@link Serializable}. - */ - public T9() { - // No-op. - } - - /** - * @param topic Topic. - * @param id1 ID1. - * @param id2 ID2. - */ - private T9(GridTopic topic, int id1, int id2) { - this.topic = topic; - this.id1 = id1; - this.id2 = id2; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = topic.ordinal(); - - res += 31 * res + id1; - res += 31 * res + id2; - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (obj.getClass() == T9.class) { - T9 that = (T9)obj; - - return topic == that.topic && id1 == that.id1 && id2 == that.id2; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeByte(topic.ordinal()); - out.writeInt(id1); - out.writeInt(id2); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - topic = fromOrdinal(in.readByte()); - id1 = in.readInt(); - id2 = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(T9.class, this); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index c1fb79a..765ba65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -100,6 +100,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private final ConcurrentMap<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> msgSetMap = new ConcurrentHashMap8<>(); + /** */ + private final ConcurrentMap<Object, SequentialMessageSet> seqMsgs = new ConcurrentHashMap8<>(); + /** Local node ID. */ private final UUID locNodeId; @@ -576,6 +579,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa { if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); + else if (msg.isSequential()) + processSequentialMessage(nodeId, msg, plc, msgC); else processRegularMessage(nodeId, msg, plc, msgC); @@ -591,6 +596,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); + else if (msg.isSequential()) + processSequentialMessage(nodeId, msg, plc, msgC); else processRegularMessage(nodeId, msg, plc, msgC); } @@ -963,6 +970,78 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** + * @param nodeId Node ID. + * @param msg Message. + * @param plc Execution policy. + * @param msgC Closure to call when message processing finished. + */ + private void processSequentialMessage( + final UUID nodeId, + final GridIoMessage msg, + byte plc, + final IgniteRunnable msgC + ) throws IgniteCheckedException { + final GridMessageListener lsnr = lsnrMap.get(msg.topic()); + + if (lsnr == null) { + if (log.isDebugEnabled()) + log.debug("Ignoring message because listener is not found: " + msg); + + if (msgC != null) + msgC.run(); + + return; + } + + SequentialMessageSet msgSet = seqMsgs.get(msg.topic()); + + if (msgSet == null) { + SequentialMessageSet old = seqMsgs.putIfAbsent(msg.topic(), msgSet = new SequentialMessageSet()); + + if (old != null) + msgSet = old; + } + + msgSet.add(nodeId, msg, msgC); + + if (msgC == null) { + assert locNodeId.equals(nodeId); + + msgSet.unwind(lsnr); + } + else { + assert !locNodeId.equals(nodeId); + + final SequentialMessageSet msgSet0 = msgSet; + + Runnable c = new Runnable() { + @Override public void run() { + try { + threadProcessingMessage(true); + + msgSet0.unwind(lsnr); + } + finally { + threadProcessingMessage(false); + } + } + }; + + try { + pool(plc).execute(c); + } + catch (RejectedExecutionException e) { + U.error(log, "Failed to process sequential message due to execution rejection. " + + "Increase the upper bound on executor service provided by corresponding " + + "configuration property. Will attempt to process message in the listener " + + "thread instead [msgPlc=" + plc + ']', e); + + c.run(); + } + } + } + + /** * @param node Destination node. * @param topic Topic to send the message to. * @param topicOrd GridTopic enumeration ordinal. @@ -980,6 +1059,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Message msg, byte plc, boolean ordered, + boolean seq, long timeout, boolean skipOnTimeout ) throws IgniteCheckedException { @@ -987,7 +1067,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert topic != null; assert msg != null; - GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); + GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, seq, timeout, skipOnTimeout); if (locNodeId.equals(node.id())) { assert plc != P2P_POOL; @@ -999,6 +1079,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (ordered) processOrderedMessage(locNodeId, ioMsg, plc, null); + else if (seq) + processSequentialMessage(locNodeId, ioMsg, plc, null); else processRegularMessage0(ioMsg, locNodeId); } @@ -1050,7 +1132,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false); + send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false); } /** @@ -1062,7 +1144,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false); + send(node, topic, -1, msg, plc, false, false, 0, false); } /** @@ -1074,7 +1156,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false); + send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false); } /** @@ -1096,7 +1178,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); + send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout); } /** @@ -1123,7 +1205,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); + send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout); } /** @@ -1146,7 +1228,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout); + send(nodes, topic, -1, msg, plc, true, false, timeout, skipOnTimeout); } /** @@ -1162,7 +1244,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Message msg, byte plc ) throws IgniteCheckedException { - send(nodes, topic, -1, msg, plc, false, 0, false); + send(nodes, topic, -1, msg, plc, false, false, 0, false); } /** @@ -1178,7 +1260,48 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Message msg, byte plc ) throws IgniteCheckedException { - send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false); + send(nodes, topic, topic.ordinal(), msg, plc, false, false, 0, false); + } + + /** + * Sends sequential message. + * + * @param nodeId Destination node ID. + * @param topic Topic. + * @param msg Message. + * @param plc Policy. + * @throws IgniteCheckedException In case of error. + */ + public void sendSequentialMessage( + UUID nodeId, + Object topic, + Message msg, + byte plc + ) throws IgniteCheckedException { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) + throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); + + sendSequentialMessage(node, topic, msg, plc); + } + + /** + * Sends sequential message. + * + * @param node Destination node. + * @param topic Topic. + * @param msg Message. + * @param plc Policy. + * @throws IgniteCheckedException In case of error. + */ + public void sendSequentialMessage( + ClusterNode node, + Object topic, + Message msg, + byte plc + ) throws IgniteCheckedException { + send(node, topic, -1, msg, plc, false, true, 0, false); } /** @@ -1307,6 +1430,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param msg Message to send. * @param plc Type of processing. * @param ordered Ordered flag. + * @param seq Sequential message flag. * @param timeout Message timeout. * @param skipOnTimeout Whether message can be skipped in timeout. * @throws IgniteCheckedException Thrown in case of any errors. @@ -1318,6 +1442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Message msg, byte plc, boolean ordered, + boolean seq, long timeout, boolean skipOnTimeout ) throws IgniteCheckedException { @@ -1334,7 +1459,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // messages to one node vs. many. if (!nodes.isEmpty()) { for (ClusterNode node : nodes) - send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout); + send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout); } else if (log.isDebugEnabled()) log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + @@ -2216,4 +2341,58 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return S.toString(DelayedMessage.class, this, super.toString()); } } + + /** + */ + private static class SequentialMessageSet { + /** */ + private final Queue<GridTuple3<UUID, GridIoMessage, IgniteRunnable>> queue = new ConcurrentLinkedDeque8<>(); + + /** */ + private final AtomicBoolean reserve = new AtomicBoolean(); + + /** + * @param nodeId Node ID. + * @param msg Message. + * @param msgC Closure to call when message processing finished. + */ + void add(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) { + queue.add(F.t(nodeId, msg, msgC)); + } + + /** + * @param lsnr Message listener. + */ + void unwind(GridMessageListener lsnr) { + assert lsnr != null; + + while (true) { + if (reserve.compareAndSet(false, true)) { + try { + GridTuple3<UUID, GridIoMessage, IgniteRunnable> t; + + while ((t = queue.poll()) != null) { + try { + lsnr.onMessage(t.get1(), t.get2().message()); + } + finally { + IgniteRunnable msgC = t.get3(); + + if (msgC != null) + msgC.run(); + } + } + } + finally { + reserve.set(false); + } + + if (queue.isEmpty()) + return; + } + else + return; + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index 6cf1ae5..d729f75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -49,6 +49,9 @@ public class GridIoMessage implements Message { /** Message ordered flag. */ private boolean ordered; + /** Sequential message flag. */ + private boolean seq; + /** Message timeout. */ private long timeout; @@ -72,6 +75,7 @@ public class GridIoMessage implements Message { * @param topicOrd Topic ordinal value. * @param msg Message. * @param ordered Message ordered flag. + * @param seq Sequential message flag. * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. */ @@ -81,18 +85,21 @@ public class GridIoMessage implements Message { int topicOrd, Message msg, boolean ordered, + boolean seq, long timeout, boolean skipOnTimeout ) { assert topic != null; assert topicOrd <= Byte.MAX_VALUE; assert msg != null; + assert !ordered || !seq; // Message can't be ordered and sequential at the same time. this.plc = plc; this.msg = msg; this.topic = topic; this.topicOrd = topicOrd; this.ordered = ordered; + this.seq = seq; this.timeout = timeout; this.skipOnTimeout = skipOnTimeout; } @@ -167,6 +174,13 @@ public class GridIoMessage implements Message { return ordered; } + /** + * @return Sequential message flag. + */ + boolean isSequential() { + return seq; + } + /** {@inheritDoc} */ @Override public boolean equals(Object obj) { throw new AssertionError(); @@ -208,24 +222,30 @@ public class GridIoMessage implements Message { writer.incrementState(); case 3: - if (!writer.writeBoolean("skipOnTimeout", skipOnTimeout)) + if (!writer.writeBoolean("seq", seq)) return false; writer.incrementState(); case 4: - if (!writer.writeLong("timeout", timeout)) + if (!writer.writeBoolean("skipOnTimeout", skipOnTimeout)) return false; writer.incrementState(); case 5: - if (!writer.writeByteArray("topicBytes", topicBytes)) + if (!writer.writeLong("timeout", timeout)) return false; writer.incrementState(); case 6: + if (!writer.writeByteArray("topicBytes", topicBytes)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeInt("topicOrd", topicOrd)) return false; @@ -261,19 +281,15 @@ public class GridIoMessage implements Message { reader.incrementState(); case 2: - byte plc0; - - plc0 = reader.readByte("plc"); + plc = reader.readByte("plc"); if (!reader.isLastRead()) return false; - plc = plc0; - reader.incrementState(); case 3: - skipOnTimeout = reader.readBoolean("skipOnTimeout"); + seq = reader.readBoolean("seq"); if (!reader.isLastRead()) return false; @@ -281,7 +297,7 @@ public class GridIoMessage implements Message { reader.incrementState(); case 4: - timeout = reader.readLong("timeout"); + skipOnTimeout = reader.readBoolean("skipOnTimeout"); if (!reader.isLastRead()) return false; @@ -289,7 +305,7 @@ public class GridIoMessage implements Message { reader.incrementState(); case 5: - topicBytes = reader.readByteArray("topicBytes"); + timeout = reader.readLong("timeout"); if (!reader.isLastRead()) return false; @@ -297,6 +313,14 @@ public class GridIoMessage implements Message { reader.incrementState(); case 6: + topicBytes = reader.readByteArray("topicBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: topicOrd = reader.readInt("topicOrd"); if (!reader.isLastRead()) @@ -316,7 +340,7 @@ public class GridIoMessage implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 5858424..490a5d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -62,8 +62,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> clsHandlers = new ConcurrentHashMap8<>(); - /** Ordered handler registry. */ - private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers = + /** Per topic handler registry. */ + private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> perTopicHandlers = new ConcurrentHashMap8<>(); /** Stopping flag. */ @@ -173,7 +173,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @Override protected void onKernalStop0(boolean cancel) { cctx.gridIO().removeMessageListener(TOPIC_CACHE); - for (Object ordTopic : orderedHandlers.keySet()) + for (Object ordTopic : perTopicHandlers.keySet()) cctx.gridIO().removeMessageListener(ordTopic); boolean interrupted = false; @@ -394,7 +394,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), nodeId, - req.futureVersion()); + req.futureVersion(), + req.partition()); res.error(req.classError()); @@ -813,6 +814,64 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * @param nodeId Destination node ID. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc IO policy. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendSequentialMessage(UUID nodeId, Object topic, GridCacheMessage msg, byte plc) + throws IgniteCheckedException { + ClusterNode n = cctx.discovery().node(nodeId); + + if (n == null) + throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" + + nodeId + ", msg=" + msg + ']'); + + sendSequentialMessage(n, topic, msg, plc); + } + + /** + * @param node Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc IO policy. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendSequentialMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc) + throws IgniteCheckedException { + onSend(msg, node.id()); + + int cnt = 0; + + while (cnt <= retryCnt) { + try { + cnt++; + + cctx.gridIO().sendSequentialMessage(node, topic, msg, plc); + + if (log.isDebugEnabled()) + log.debug("Sent sequential cache message [topic=" + topic + ", msg=" + msg + + ", nodeId=" + node.id() + ']'); + + return; + } + catch (IgniteCheckedException e) { + if (cctx.discovery().node(node.id()) == null) + throw new ClusterTopologyCheckedException("Node left grid while sending sequential message [" + + "nodeId=" + node.id() + ", msg=" + msg + ']', e); + + if (cnt == retryCnt) + throw e; + else if (log.isDebugEnabled()) + log.debug("Failed to send message to node (will retry): " + node.id()); + } + + U.sleep(retryDelay); + } + } + + /** * @return ID that auto-grows based on local counter and counters received * from other nodes. */ @@ -940,39 +999,39 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** - * Adds ordered message handler. + * Adds per topic message handler. * * @param topic Topic. * @param c Handler. */ @SuppressWarnings({"unchecked"}) - public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { - if (orderedHandlers.putIfAbsent(topic, c) == null) { - cctx.gridIO().addMessageListener(topic, new OrderedMessageListener( + public void addPerTopicHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + if (perTopicHandlers.putIfAbsent(topic, c) == null) { + cctx.gridIO().addMessageListener(topic, new PerTopicMessageListener( (IgniteBiInClosure<UUID, GridCacheMessage>)c)); if (log != null && log.isDebugEnabled()) - log.debug("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']'); + log.debug("Registered per topic cache communication handler [topic=" + topic + ", handler=" + c + ']'); } else if (log != null) - U.warn(log, "Failed to register ordered cache communication handler because it is already " + + U.warn(log, "Failed to register per topic cache communication handler because it is already " + "registered for this topic [topic=" + topic + ", handler=" + c + ']'); } /** - * Removed ordered message handler. + * Removed per topic message handler. * * @param topic Topic. */ - public void removeOrderedHandler(Object topic) { - if (orderedHandlers.remove(topic) != null) { + public void removePerTopicHandler(Object topic) { + if (perTopicHandlers.remove(topic) != null) { cctx.gridIO().removeMessageListener(topic); if (log != null && log.isDebugEnabled()) - log.debug("Unregistered ordered cache communication handler for topic:" + topic); + log.debug("Unregistered per topic cache communication handler for topic:" + topic); } else if (log != null) - U.warn(log, "Failed to unregister ordered cache communication handler because it was not found " + + U.warn(log, "Failed to unregister per topic cache communication handler because it was not found " + "for topic: " + topic); } @@ -1019,20 +1078,20 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { X.println(">>> "); X.println(">>> Cache IO manager memory stats [grid=" + cctx.gridName() + ']'); X.println(">>> clsHandlersSize: " + clsHandlers.size()); - X.println(">>> orderedHandlersSize: " + orderedHandlers.size()); + X.println(">>> perTopicHandlersSize: " + perTopicHandlers.size()); } /** - * Ordered message listener. + * Per topic message listener. */ - private class OrderedMessageListener implements GridMessageListener { + private class PerTopicMessageListener implements GridMessageListener { /** */ private final IgniteBiInClosure<UUID, GridCacheMessage> c; /** * @param c Handler closure. */ - OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) { + PerTopicMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) { this.c = c; } @@ -1040,7 +1099,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @SuppressWarnings({"CatchGenericClass", "unchecked"}) @Override public void onMessage(final UUID nodeId, Object msg) { if (log.isDebugEnabled()) - log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']'); + log.debug("Received per topic cache message [nodeId=" + nodeId + ", msg=" + msg + ']'); final GridCacheMessage cacheMsg = (GridCacheMessage)msg; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index d82acca..a313e3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1733,15 +1733,4 @@ public class GridCacheUtils { } }; } - - /** - * @param ctx Cache context. - * @param part Partition. - * @return Per-partition message topic. - */ - public static Object partitionMessageTopic(GridCacheContext ctx, int part) { - assert part >= 0; - - return TOPIC_CACHE.topic(ctx.cacheId(), part); - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java new file mode 100644 index 0000000..52e3c7f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Mapping Key. + */ +class GridAtomicMappingKey { + /** Node ID. */ + private final UUID nodeId; + + /** Partition. */ + private final int part; + + /** + * @param nodeId Node ID. + * @param part Partition. + */ + GridAtomicMappingKey(UUID nodeId, int part) { + assert nodeId != null; + assert part >= -1 : part; + + this.nodeId = nodeId; + this.part = part; + } + + /** + * @return Node ID. + */ + UUID nodeId() { + return nodeId; + } + + /** + * @return Partition. + */ + int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + GridAtomicMappingKey key = (GridAtomicMappingKey)o; + + return nodeId.equals(key.nodeId) && part == key.part; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + + res = 31 * res + part; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridAtomicMappingKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java new file mode 100644 index 0000000..9feb409 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + */ +class GridAtomicRequestTopic implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private int cacheId; + + /** */ + private int part; + + /** */ + private boolean near; + + /** + * For {@link Externalizable}. + */ + public GridAtomicRequestTopic() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param part Partition. + * @param near Near flag. + */ + GridAtomicRequestTopic(int cacheId, int part, boolean near) { + this.cacheId = cacheId; + this.part = part; + this.near = near; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) + return false; + + GridAtomicRequestTopic topic = (GridAtomicRequestTopic)o; + + return cacheId == topic.cacheId && part == topic.part && near == topic.near; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = cacheId; + + res = 31 * res + part; + res = 31 * res + (near ? 1 : 0); + + return res; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(cacheId); + out.writeInt(part); + out.writeBoolean(near); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheId = in.readInt(); + part = in.readInt(); + near = in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridAtomicRequestTopic.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index be35d00..a010baa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -181,15 +181,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { - @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { - processNearAtomicUpdateRequest(nodeId, req); - } - }); - if (ctx.config().isAtomicOrderedUpdates()) { for (int part = 0; part < ctx.affinity().partitions(); part++) { - ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() { + Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true); + + ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() { + @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { + processNearAtomicUpdateRequest(nodeId, req); + } + }); + + Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false); + + ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { processDhtAtomicUpdateRequest(nodeId, req); } @@ -197,6 +201,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } else { + ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { + @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { + processNearAtomicUpdateRequest(nodeId, req); + } + }); + ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) { processDhtAtomicUpdateRequest(nodeId, req); @@ -238,8 +248,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { buf.finish(); if (ctx.config().isAtomicOrderedUpdates()) { - for (int part = 0; part < ctx.affinity().partitions(); part++) - ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part)); + for (int part = 0; part < ctx.affinity().partitions(); part++) { + ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true)); + ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false)); + } } } @@ -1033,7 +1045,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, - req.futureVersion()); + req.futureVersion(), + req.partition()); List<KeyCacheObject> keys = req.keys(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 93c20da..c05f4c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Mappings. */ @GridToStringInclude - private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); + private ConcurrentMap<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); /** Entries with readers. */ private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries; @@ -142,8 +142,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public Collection<? extends ClusterNode> nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() { - @Override public ClusterNode apply(MappingKey mappingKey) { + return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() { + @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) { return cctx.kernalContext().discovery().node(mappingKey.nodeId()); } }), F.notNull()); @@ -154,15 +154,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (log.isDebugEnabled()) log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); - Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size()); + Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size()); - for (MappingKey mappingKey : mappings.keySet()) { + for (GridAtomicMappingKey mappingKey : mappings.keySet()) { if (mappingKey.nodeId().equals(nodeId)) mappingKeys.add(mappingKey); } if (!mappingKeys.isEmpty()) { - for (MappingKey mappingKey : mappingKeys) + for (GridAtomicMappingKey mappingKey : mappingKeys) mappings.remove(mappingKey); checkComplete(); @@ -234,7 +234,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> for (ClusterNode node : dhtNodes) { UUID nodeId = node.id(); - MappingKey mappingKey = new MappingKey(nodeId, part); + GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); if (!nodeId.equals(cctx.localNodeId())) { GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey); @@ -287,7 +287,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1; for (UUID nodeId : readers) { - MappingKey mappingKey = new MappingKey(nodeId, part); + GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey); @@ -345,8 +345,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> */ public void map() { if (!mappings.isEmpty()) { - for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) { - MappingKey mappingKey = e.getKey(); + for (Map.Entry<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) { + GridAtomicMappingKey mappingKey = e.getKey(); GridDhtAtomicUpdateRequest req = e.getValue(); UUID nodeId = mappingKey.nodeId(); @@ -359,7 +359,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); if (part >= 0) { - Object topic = CU.partitionMessageTopic(cctx, part); + Object topic = new GridAtomicRequestTopic(cctx.cacheId(), part, false); cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(), 2 * cctx.gridConfig().getNetworkTimeout()); @@ -429,7 +429,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } } - mappings.remove(new MappingKey(nodeId, updateRes.partition())); + mappings.remove(new GridAtomicMappingKey(nodeId, updateRes.partition())); checkComplete(); } @@ -445,7 +445,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']'); for (Integer part : res.partitions()) - mappings.remove(new MappingKey(nodeId, part)); + mappings.remove(new GridAtomicMappingKey(nodeId, part)); checkComplete(); } @@ -468,67 +468,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> return S.toString(GridDhtAtomicUpdateFuture.class, this); } - /** - * Mapping Key. - */ - private static class MappingKey { - /** Node ID. */ - private final UUID nodeId; - - /** Partition. */ - private final int part; - - /** - * @param nodeId Node ID. - * @param part Partition. - */ - MappingKey(UUID nodeId, int part) { - assert nodeId != null; - assert part >= -1 : part; - - this.nodeId = nodeId; - this.part = part; - } - - /** - * @return Node ID. - */ - UUID nodeId() { - return nodeId; - } - - /** - * @return Partition. - */ - int partition() { - return part; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - MappingKey key = (MappingKey)o; - - return nodeId.equals(key.nodeId) && part == key.part; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + part; - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MappingKey.class, this); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 4c8a161..4642b1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Mappings. */ @GridToStringInclude - private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings; + private ConcurrentMap<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings; /** Error. */ private volatile CachePartialUpdateCheckedException err; @@ -246,7 +246,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** {@inheritDoc} */ @Override public Collection<? extends ClusterNode> nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull()); + return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() { + @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) { + return cctx.kernalContext().discovery().node(mappingKey.nodeId()); + } + }), F.notNull()); } /** @@ -283,13 +287,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return false; } - GridNearAtomicUpdateRequest req = mappings.get(nodeId); + Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size()); + Collection<KeyCacheObject> failedKeys = new ArrayList<>(); + + for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { + if (e.getKey().nodeId().equals(nodeId)) { + mappingKeys.add(e.getKey()); + + failedKeys.addAll(e.getValue().keys()); + } + } - if (req != null) { - addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " + - "received: " + nodeId)); + if (!mappingKeys.isEmpty()) { + if (!failedKeys.isEmpty()) + addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " + + "response is received: " + nodeId)); - mappings.remove(nodeId); + for (GridAtomicMappingKey key : mappingKeys) + mappings.remove(key); checkComplete(); @@ -529,7 +544,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } } else { - GridNearAtomicUpdateRequest req = mappings.get(nodeId); + GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, res.partition()); + + GridNearAtomicUpdateRequest req = mappings.get(mappingKey); if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft. updateNear(req, res); @@ -547,7 +564,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> opRes = ret; } - mappings.remove(nodeId); + mappings.remove(mappingKey); } checkComplete(); @@ -763,7 +780,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op != TRANSFORM) val = cctx.toCacheObject(val); - ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + int part = cctx.affinity().partition(cacheKey); + ClusterNode primary = cctx.affinity().primary(part, topVer); + + if (!ccfg.isAtomicOrderedUpdates()) + part = -1; if (primary == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + @@ -789,7 +810,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> subjId, taskNameHash, skipStore, - cctx.kernalContext().clientNode()); + cctx.kernalContext().clientNode(), + part); req.addUpdateEntry(cacheKey, val, @@ -805,7 +827,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } // Optimize mapping for single key. - mapSingle(primary.id(), req); + mapSingle(new GridAtomicMappingKey(primary.id(), part), req); return; } @@ -825,13 +847,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (conflictRmvVals != null) conflictRmvValsIt = conflictRmvVals.iterator(); - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); + Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); // Must do this in synchronized block because we need to atomically remove and add mapping. // Otherwise checkComplete() may see empty intermediate state. synchronized (this) { - if (oldNodeId != null) - removeMapping(oldNodeId); + if (oldNodeId != null) { + // TODO: IGNITE-104 - Try to avoid iteration. + for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { + if (e.getKey().nodeId().equals(oldNodeId)) + mappings.remove(e.getKey()); + } + } // For fastMap mode wait for all responses before remapping. if (remap && fastMap && !mappings.isEmpty()) { @@ -901,7 +928,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op != TRANSFORM) val = cctx.toCacheObject(val); - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); + T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap); + + int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1; + Collection<ClusterNode> affNodes = t.get2(); if (affNodes.isEmpty()) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + @@ -922,7 +952,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> UUID nodeId = affNode.id(); - GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); + GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); + + GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey); if (mapped == null) { mapped = new GridNearAtomicUpdateRequest( @@ -942,11 +974,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> subjId, taskNameHash, skipStore, - cctx.kernalContext().clientNode()); + cctx.kernalContext().clientNode(), + part); - pendingMappings.put(nodeId, mapped); + pendingMappings.put(mappingKey, mapped); - GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped); + GridNearAtomicUpdateRequest old = mappings.put(mappingKey, mapped); assert old == null || (old != null && remap) : "Invalid mapping state [old=" + old + ", remap=" + remap + ']'; @@ -964,7 +997,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } if ((single == null || single) && pendingMappings.size() == 1) { - Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet()); + Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet()); single = true; @@ -987,31 +1020,35 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. * @return Collection of nodes to which key is mapped. */ - private Collection<ClusterNode> mapKey( + private T2<Integer, Collection<ClusterNode>> mapKey( KeyCacheObject key, AffinityTopologyVersion topVer, boolean fastMap ) { GridCacheAffinityManager affMgr = cctx.affinity(); + int part = affMgr.partition(key); + // If we can send updates in parallel - do it. - return fastMap ? - cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); + Collection<ClusterNode> nodes = fastMap ? + cctx.topology().nodes(part, topVer) : + Collections.singletonList(affMgr.primary(part, topVer)); + + return new T2<>(part, nodes); } /** * Maps future to single node. * - * @param nodeId Node ID. + * @param mappingKey Mapping key. * @param req Request. */ - private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { - singleNodeId = nodeId; + private void mapSingle(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) { + singleNodeId = mappingKey.nodeId(); singleReq = req; - if (cctx.localNodeId().equals(nodeId)) { - cache.updateAllAsyncInternal(nodeId, req, + if (cctx.localNodeId().equals(mappingKey.nodeId())) { + cache.updateAllAsyncInternal(mappingKey.nodeId(), req, new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { @@ -1026,7 +1063,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (log.isDebugEnabled()) log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + sendRequest(mappingKey, req); if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) onDone(new GridCacheReturn(cctx, true, null, true)); @@ -1042,34 +1079,37 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * * @param mappings Mappings to send. */ - private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { + private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) { UUID locNodeId = cctx.localNodeId(); - GridNearAtomicUpdateRequest locUpdate = null; + Collection<GridNearAtomicUpdateRequest> locUpdates = null; // Send messages to remote nodes first, then run local update. - for (GridNearAtomicUpdateRequest req : mappings.values()) { + for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { + GridAtomicMappingKey mappingKey = e.getKey(); + GridNearAtomicUpdateRequest req = e.getValue(); + if (locNodeId.equals(req.nodeId())) { - assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + - ", req=" + req + ']'; + if (locUpdates == null) + locUpdates = new ArrayList<>(mappings.size()); - locUpdate = req; + locUpdates.add(req); } else { try { if (log.isDebugEnabled()) log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + sendRequest(mappingKey, req); } - catch (IgniteCheckedException e) { - addFailedKeys(req.keys(), e); + catch (IgniteCheckedException ex) { + addFailedKeys(req.keys(), ex); - removeMapping(req.nodeId()); + removeMapping(mappingKey); } if (syncMode == PRIMARY_SYNC && !req.hasPrimary()) - removeMapping(req.nodeId()); + removeMapping(mappingKey); } } @@ -1077,28 +1117,52 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // In FULL_ASYNC mode always return (null, true). opRes = new GridCacheReturn(cctx, true, null, true); - if (locUpdate != null) { - cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, - GridNearAtomicUpdateResponse res) { - assert res.futureVersion().equals(futVer) : futVer; + if (locUpdates != null) { + for (GridNearAtomicUpdateRequest locUpdate : locUpdates) { + cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, + GridNearAtomicUpdateResponse res) { + assert res.futureVersion().equals(futVer) : futVer; - onResult(res.nodeId(), res); - } - }); + onResult(res.nodeId(), res); + } + }); + } } checkComplete(); } /** + * Sends request. + * + * @param mappingKey Mapping key. + * @param req Update request. + * @throws IgniteCheckedException In case of error. + */ + private void sendRequest(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) + throws IgniteCheckedException { + if (mappingKey.partition() >= 0) { + Object topic = new GridAtomicRequestTopic(cctx.cacheId(), mappingKey.partition(), true); + + cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(), + 2 * cctx.gridConfig().getNetworkTimeout()); + } + else { + assert mappingKey.partition() == -1; + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + } + } + + /** * Removes mapping from future mappings map. * - * @param nodeId Node ID to remove mapping for. + * @param mappingKey Mapping key. */ - private void removeMapping(UUID nodeId) { - mappings.remove(nodeId); + private void removeMapping(GridAtomicMappingKey mappingKey) { + mappings.remove(mappingKey); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 86c5ab8..b3075c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -135,6 +135,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** */ private boolean clientReq; + /** Partition. */ + private int part; + /** * Empty constructor required by {@link Externalizable}. */ @@ -162,6 +165,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. * @param clientReq Client node request flag. + * @param part Partition. */ public GridNearAtomicUpdateRequest( int cacheId, @@ -180,7 +184,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @Nullable UUID subjId, int taskNameHash, boolean skipStore, - boolean clientReq + boolean clientReq, + int part ) { this.cacheId = cacheId; this.nodeId = nodeId; @@ -200,6 +205,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.taskNameHash = taskNameHash; this.skipStore = skipStore; this.clientReq = clientReq; + this.part = part; keys = new ArrayList<>(); } @@ -315,6 +321,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return Partition. + */ + public int partition() { + return part; + } + + /** * @param key Key to add. * @param val Optional update value. * @param conflictTtl Conflict TTL (optional). @@ -666,54 +679,60 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 16: - if (!writer.writeBoolean("retval", retval)) + if (!writer.writeInt("part", part)) return false; writer.incrementState(); case 17: - if (!writer.writeBoolean("skipStore", skipStore)) + if (!writer.writeBoolean("retval", retval)) return false; writer.incrementState(); case 18: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 19: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 20: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 21: - if (!writer.writeBoolean("topLocked", topLocked)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("topLocked", topLocked)) return false; writer.incrementState(); case 23: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 24: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 25: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -844,7 +863,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 16: - retval = reader.readBoolean("retval"); + part = reader.readInt("part"); if (!reader.isLastRead()) return false; @@ -852,7 +871,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 17: - skipStore = reader.readBoolean("skipStore"); + retval = reader.readBoolean("retval"); if (!reader.isLastRead()) return false; @@ -860,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 18: - subjId = reader.readUuid("subjId"); + skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) return false; @@ -868,6 +887,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 19: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -879,7 +906,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 20: + case 21: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -887,7 +914,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 21: + case 22: topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) @@ -895,7 +922,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 22: + case 23: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -903,7 +930,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 23: + case 24: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -911,7 +938,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 24: + case 25: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -931,7 +958,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 25; + return 26; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 8e1bee2..e2d33d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -92,6 +92,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** Near expire times. */ private GridLongList nearExpireTimes; + /** Partition. */ + private int part; + /** * Empty constructor required by {@link Externalizable}. */ @@ -103,11 +106,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param cacheId Cache ID. * @param nodeId Node ID this reply should be sent to. * @param futVer Future version. + * @param part Partition. */ - public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) { + public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, int part) { this.cacheId = cacheId; this.nodeId = nodeId; this.futVer = futVer; + this.part = part; } /** {@inheritDoc} */ @@ -138,7 +143,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** * Sets update error. - * @param err Exception. + * @param err */ public void error(IgniteCheckedException err){ this.err = err; @@ -188,6 +193,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** + * @return Partition. + */ + public int partition() { + return part; + } + + /** * Adds value to be put in near cache on originating node. * * @param keyIdx Key index. @@ -485,12 +497,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr writer.incrementState(); case 12: - if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) + if (!writer.writeInt("part", part)) return false; writer.incrementState(); case 13: + if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 14: if (!writer.writeMessage("ret", ret)) return false; @@ -585,7 +603,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 12: - remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); + part = reader.readInt("part"); if (!reader.isLastRead()) return false; @@ -593,6 +611,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 13: + remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: ret = reader.readMessage("ret"); if (!reader.isLastRead()) @@ -612,7 +638,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 14; + return 15; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index a6e6c4d..37824eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -599,7 +599,7 @@ public class GridDhtPartitionDemandPool { if (isCancelled() || topologyChanged()) return missed; - cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() { + cctx.io().addPerTopicHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() { @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { addMessage(new SupplyMessage(nodeId, msg)); } @@ -641,7 +641,7 @@ public class GridDhtPartitionDemandPool { growTimeout(timeout); // Ordered listener was removed if timeout expired. - cctx.io().removeOrderedHandler(d.topic()); + cctx.io().removePerTopicHandler(d.topic()); // Must create copy to be able to work with IO manager thread local caches. d = new GridDhtPartitionDemandMessage(d, remaining); @@ -650,13 +650,12 @@ public class GridDhtPartitionDemandPool { d.topic(topic(++cntr)); // Create new ordered listener. - cctx.io().addOrderedHandler(d.topic(), - new CI2<UUID, GridDhtPartitionSupplyMessage>() { - @Override public void apply(UUID nodeId, - GridDhtPartitionSupplyMessage msg) { + cctx.io().addPerTopicHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() { + @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { addMessage(new SupplyMessage(nodeId, msg)); } - }); + } + ); // Resend message with larger timeout. retry = true; @@ -800,7 +799,7 @@ public class GridDhtPartitionDemandPool { return missed; } finally { - cctx.io().removeOrderedHandler(d.topic()); + cctx.io().removePerTopicHandler(d.topic()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 316713f..a530a5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -556,11 +556,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedHandler(topic, resHnd); + cctx.io().addPerTopicHandler(topic, resHnd); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - cctx.io().removeOrderedHandler(topic); + cctx.io().removePerTopicHandler(topic); } }); @@ -665,11 +665,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedHandler(topic, resHnd); + cctx.io().addPerTopicHandler(topic, resHnd); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - cctx.io().removeOrderedHandler(topic); + cctx.io().removePerTopicHandler(topic); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index df4873a..3ec1d07 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -529,6 +529,7 @@ org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFu org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$MiniFuture$1 org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$MiniFuture$2 org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$MiniFuture$2$1 +org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridAtomicRequestTopic org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$10 org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$11 @@ -806,7 +807,6 @@ org.apache.ignite.internal.processors.closure.GridClosureProcessor$T5 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T6 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T7 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T8 -org.apache.ignite.internal.processors.closure.GridClosureProcessor$T9 org.apache.ignite.internal.processors.closure.GridClosureProcessor$TaskNoReduceAdapter org.apache.ignite.internal.processors.closure.GridPeerDeployAwareTaskAdapter org.apache.ignite.internal.processors.continuous.GridContinuousHandler