Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-426 [created] 9fe0a8d56


IGNITE-426 - Continous queries failover


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9fe0a8d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9fe0a8d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9fe0a8d5

Branch: refs/heads/ignite-426
Commit: 9fe0a8d562084e15c77e6023c5162c1741b7bc93
Parents: 19fb305
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Mon Aug 10 18:05:58 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Mon Aug 10 18:05:58 2015 -0700

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |  14 +-
 .../internal/GridMessageListenHandler.java      |  10 ++
 .../communication/GridIoMessageFactory.java     |   7 +-
 .../processors/cache/GridCacheMapEntry.java     |  18 +--
 .../distributed/dht/GridDhtLocalPartition.java  |  10 ++
 .../CacheContinuousQueryBatchAck.java           | 151 +++++++++++++++++++
 .../continuous/CacheContinuousQueryEntry.java   |  58 ++++++-
 .../continuous/CacheContinuousQueryHandler.java |  96 ++++++++++--
 .../CacheContinuousQueryListener.java           |   9 ++
 .../continuous/CacheContinuousQueryManager.java |  40 ++++-
 .../continuous/GridContinuousQueryBatch.java    |  47 ++++++
 .../continuous/GridContinuousBatch.java         |  44 ++++++
 .../continuous/GridContinuousBatchAdapter.java  |  47 ++++++
 .../continuous/GridContinuousHandler.java       |  18 +++
 .../continuous/GridContinuousProcessor.java     | 123 +++++++++------
 15 files changed, 615 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index f33fa39..0867c6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -197,8 +197,8 @@ class GridEventConsumeHandler implements 
GridContinuousHandler {
                                                     }
                                                 }
 
-                                                
ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
-                                                    false);
+                                                
ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
+                                                    false, false);
                                             }
                                             catch 
(ClusterTopologyCheckedException ignored) {
                                                 // No-op.
@@ -361,6 +361,16 @@ class GridEventConsumeHandler implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousBatch createBatch() {
+        return new GridContinuousBatchAdapter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBatchAcknowledged(UUID routineId, 
GridContinuousBatch batch, GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 4bfb57b..e1ba4e8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -162,6 +162,16 @@ public class GridMessageListenHandler implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousBatch createBatch() {
+        return new GridContinuousBatchAdapter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBatchAcknowledged(UUID routineId, 
GridContinuousBatch batch, GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7fe8da8..6103a46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -600,7 +600,12 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
-            // [-3..112] - this
+            case 113:
+                msg = new CacheContinuousQueryBatchAck();
+
+                break;
+
+            // [-3..113] - this
             // [120..123] - DR
             // [-4..-22] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f85a18b..8a34441 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1061,8 +1061,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     subjId, null, taskName);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdated(this, key, val, old, 
false);
+            if (!isNear())
+                cctx.continuousQueries().onEntryUpdated(this, key, val, old, 
tx.local(), false, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1219,8 +1219,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     taskName);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdated(this, key, null, old, 
false);
+            if (!isNear())
+                cctx.continuousQueries().onEntryUpdated(this, key, null, old, 
tx.local(), false, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, true);
         }
@@ -1557,7 +1557,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            cctx.continuousQueries().onEntryUpdated(this, key, val, old, 
false);
+            if (!isNear())
+                cctx.continuousQueries().onEntryUpdated(this, key, val, old, 
true, false, AffinityTopologyVersion.NONE);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -2167,8 +2168,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdated(this, key, val, 
oldVal, false);
+            if (!isNear())
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
oldVal, primary, false, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -2984,8 +2985,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 drReplicate(drType, val, ver);
 
                 if (!skipQryNtf) {
-                    if (cctx.isLocal() || cctx.isReplicated() || 
cctx.affinity().primary(cctx.localNode(), key, topVer))
-                        cctx.continuousQueries().onEntryUpdated(this, key, 
val, null, preload);
+                    cctx.continuousQueries().onEntryUpdated(this, key, val, 
null, true, preload, topVer);
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 87c7f0e..d5d65b1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -95,6 +95,9 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
     /** Group reservations. */
     private final CopyOnWriteArrayList<GridDhtPartitionsReservation> 
reservations = new CopyOnWriteArrayList<>();
 
+    /** Continuous query update index. */
+    private final AtomicLong contQueryUpdIdx = new AtomicLong();
+
     /**
      * @param cctx Context.
      * @param id Partition ID.
@@ -579,6 +582,13 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
     }
 
     /**
+     * @return Next update index.
+     */
+    public long nextContinuousQueryUpdateIndex() {
+        return contQueryUpdIdx.incrementAndGet();
+    }
+
+    /**
      * Clears values for this partition.
      */
     private void clearAll() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
new file mode 100644
index 0000000..49c1a3f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Batch acknowledgement.
+ */
+public class CacheContinuousQueryBatchAck extends GridCacheMessage {
+    /** Routine ID. */
+    private UUID routineId;
+
+    /** Update indexes. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = Integer.class, valueType = Long.class)
+    private Map<Integer, Long> updateIdxs;
+
+    /**
+     * Default constructor.
+     */
+    public CacheContinuousQueryBatchAck() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param routineId Routine ID.
+     * @param updateIdxs Update indexes.
+     */
+    CacheContinuousQueryBatchAck(int cacheId, UUID routineId, Map<Integer, 
Long> updateIdxs) {
+        this.cacheId = cacheId;
+        this.routineId = routineId;
+        this.updateIdxs = updateIdxs;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    UUID routineId() {
+        return routineId;
+    }
+
+    /**
+     * @return Update indexes.
+     */
+    Map<Integer, Long> updateIndexes() {
+        return updateIdxs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeUuid("routineId", routineId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMap("updateIdxs", updateIdxs, 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                routineId = reader.readUuid("routineId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                updateIdxs = reader.readMap("updateIdxs", 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 113;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryBatchAck.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 060afb9..956a99b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -70,6 +70,12 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
     @GridDirectTransient
     private GridDeploymentInfo depInfo;
 
+    /** Partition. */
+    private int part;
+
+    /** Update index. */
+    private long updateIdx;
+
     /**
      * Required by {@link 
org.apache.ignite.plugin.extensions.communication.Message}.
      */
@@ -83,18 +89,24 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
+     * @param part Partition.
+     * @param updateIdx Update index.
      */
     CacheContinuousQueryEntry(
         int cacheId,
         EventType evtType,
         KeyCacheObject key,
         @Nullable CacheObject newVal,
-        @Nullable CacheObject oldVal) {
+        @Nullable CacheObject oldVal,
+        int part,
+        long updateIdx) {
         this.cacheId = cacheId;
         this.evtType = evtType;
         this.key = key;
         this.newVal = newVal;
         this.oldVal = oldVal;
+        this.part = part;
+        this.updateIdx = updateIdx;
     }
 
     /**
@@ -112,6 +124,20 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
     }
 
     /**
+     * @return Partition.
+     */
+    int partition() {
+        return part;
+    }
+
+    /**
+     * @return Update index.
+     */
+    long updateIndex() {
+        return updateIdx;
+    }
+
+    /**
      * @param cctx Cache context.
      * @throws IgniteCheckedException In case of error.
      */
@@ -220,6 +246,18 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
 
                 writer.incrementState();
 
+            case 5:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeLong("updateIdx", updateIdx))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -277,6 +315,22 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
 
                 reader.incrementState();
 
+            case 5:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                updateIdx = reader.readLong("updateIdx");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return true;
@@ -284,7 +338,7 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 7;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 879c30c..88ae39b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -23,13 +23,16 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
+import org.jsr166.*;
 
 import javax.cache.event.*;
 import javax.cache.event.EventType;
@@ -81,6 +84,9 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
     /** Whether to skip primary check for REPLICATED cache. */
     private transient boolean skipPrimaryCheck;
 
+    /** Backup queue. */
+    private transient Queue<CacheContinuousQueryEntry> backupQueue;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -164,6 +170,8 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         if (rmtFilter != null)
             ctx.resource().injectGeneric(rmtFilter);
 
+        backupQueue = new ConcurrentLinkedDeque8<>();
+
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
         CacheContinuousQueryListener<K, V> lsnr = new 
CacheContinuousQueryListener<K, V>() {
@@ -212,20 +220,24 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
                         locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? 
extends V>>asList(evt));
                     else {
                         try {
-                            ClusterNode node = ctx.discovery().node(nodeId);
+                            final CacheContinuousQueryEntry entry = 
evt.entry();
+
+                            if (primary) {
+                                if (ctx.config().isPeerClassLoadingEnabled() 
&& ctx.discovery().node(nodeId) != null) {
+                                    entry.prepareMarshal(cctx);
 
-                            if (ctx.config().isPeerClassLoadingEnabled() && 
node != null) {
-                                evt.entry().prepareMarshal(cctx);
+                                    GridCacheDeploymentManager depMgr =
+                                        
ctx.cache().internalCache(cacheName).context().deploy();
 
-                                GridCacheDeploymentManager depMgr =
-                                    
ctx.cache().internalCache(cacheName).context().deploy();
+                                    depMgr.prepare(entry);
+                                }
+                                else
+                                    entry.prepareMarshal(cctx);
 
-                                depMgr.prepare(evt.entry());
+                                ctx.continuous().addNotification(nodeId, 
routineId, entry, topic, sync, true);
                             }
                             else
-                                evt.entry().prepareMarshal(cctx);
-
-                            ctx.continuous().addNotification(nodeId, 
routineId, evt.entry(), topic, sync, true);
+                                backupQueue.add(entry);
                         }
                         catch (ClusterTopologyCheckedException ex) {
                             IgniteLogger log = ctx.log(getClass());
@@ -267,6 +279,31 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
                     
((CacheContinuousQueryFilterEx)rmtFilter).onQueryUnregister();
             }
 
+            @Override public void cleanupBackupQueue(Map<Integer, Long> 
updateIdxs) {
+                Iterator<CacheContinuousQueryEntry> it = 
backupQueue.iterator();
+
+                while (it.hasNext()) {
+                    CacheContinuousQueryEntry backupEntry = it.next();
+
+                    assert backupEntry != null;
+
+                    Long updateIdx = updateIdxs.get(backupEntry.partition());
+
+                    if (updateIdx != null) {
+                        assert backupEntry.updateIndex() <= updateIdx;
+
+                        it.remove();
+
+                        if (backupEntry.updateIndex() == updateIdx) {
+                            updateIdxs.remove(backupEntry.partition());
+
+                            if (updateIdxs.isEmpty())
+                                break;
+                        }
+                    }
+                }
+            }
+
             @Override public boolean oldValueRequired() {
                 return oldValRequired;
             }
@@ -298,7 +335,7 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         assert routineId != null;
         assert ctx != null;
 
-        GridCacheAdapter<K, V> cache = ctx.cache().<K, 
V>internalCache(cacheName);
+        GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName);
 
         if (cache != null)
             cache.context().continuousQueries().unregisterListener(internal, 
routineId);
@@ -381,6 +418,43 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousBatch createBatch() {
+        return new GridContinuousQueryBatch();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBatchAcknowledged(UUID routineId, 
GridContinuousBatch batch, GridKernalContext ctx)
+        throws IgniteCheckedException {
+        GridContinuousQueryBatch qryBatch = (GridContinuousQueryBatch)batch;
+
+        GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+        Collection<ClusterNode> nodes = new HashSet<>();
+
+        cctx.topology().readLock();
+
+        try {
+            AffinityTopologyVersion topVer = cctx.topology().topologyVersion();
+
+            for (Integer part : qryBatch.updateIndexes().keySet()) {
+                for (ClusterNode node : cctx.dht().topology().nodes(part, 
topVer)) {
+                    if (!node.equals(cctx.localNode()))
+                        nodes.add(node);
+                }
+            }
+        }
+        finally {
+            cctx.topology().readUnlock();
+        }
+
+        CacheContinuousQueryBatchAck msg = new 
CacheContinuousQueryBatchAck(cctx.cacheId(), routineId,
+            qryBatch.updateIndexes());
+
+        for (ClusterNode node : nodes)
+            cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return topic;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index a21213f..d5d5ff8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import java.util.*;
+
 /**
  * Continuous query listener.
  */
@@ -41,6 +43,13 @@ interface CacheContinuousQueryListener<K, V> {
     public void onUnregister();
 
     /**
+     * Cleans backup queue.
+     *
+     * @param updateIdxs Update indexes map.
+     */
+    public void cleanupBackupQueue(Map<Integer, Long> updateIdxs);
+
+    /**
      * @return Whether old value is required.
      */
     public boolean oldValueRequired();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6277c5d..c6a16c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -83,6 +84,16 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
     @Override protected void start0() throws IgniteCheckedException {
         // Append cache name to the topic.
         topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + 
cctx.name());
+
+        cctx.io().addHandler(cctx.cacheId(), 
CacheContinuousQueryBatchAck.class,
+            new CI2<UUID, CacheContinuousQueryBatchAck>() {
+                @Override public void apply(UUID uuid, 
CacheContinuousQueryBatchAck msg) {
+                    CacheContinuousQueryListener lsnr = 
lsnrs.get(msg.routineId());
+
+                    if (lsnr != null)
+                        lsnr.cleanupBackupQueue(msg.updateIndexes());
+                }
+            });
     }
 
     /** {@inheritDoc} */
@@ -123,12 +134,16 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         KeyCacheObject key,
         CacheObject newVal,
         CacheObject oldVal,
-        boolean preload)
+        boolean primary,
+        boolean preload,
+        AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
         assert e != null;
         assert key != null;
 
+        assert Thread.holdsLock(e);
+
         boolean internal = e.isInternal() || !e.context().userCache();
 
         if (preload && !internal)
@@ -150,11 +165,16 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         if (!hasNewVal && !hasOldVal)
             return;
 
+        GridDhtLocalPartition locPart = 
cctx.topology().localPartition(e.partition(), topVer, false);
+
+        assert locPart != null;
+
+        long updateIdx = locPart.nextContinuousQueryUpdateIndex();
+
         EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : 
UPDATED;
 
         boolean initialized = false;
 
-        boolean primary = cctx.affinity().primary(cctx.localNode(), key, 
AffinityTopologyVersion.NONE);
         boolean recordIgniteEvt = !internal && 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
@@ -180,7 +200,9 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                 evtType,
                 key,
                 newVal,
-                lsnr.oldValueRequired() ? oldVal : null);
+                lsnr.oldValueRequired() ? oldVal : null,
+                e.partition(),
+                updateIdx);
 
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -200,6 +222,8 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         assert e != null;
         assert key != null;
 
+        assert Thread.holdsLock(e);
+
         if (e.isInternal())
             return;
 
@@ -230,7 +254,9 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                    EXPIRED,
                    key,
                    null,
-                   lsnr.oldValueRequired() ? oldVal : null);
+                   lsnr.oldValueRequired() ? oldVal : null,
+                   e.partition(),
+                   0);
 
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, 
e0);
@@ -466,10 +492,12 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
 
                                 GridCacheEntryEx e = it.next();
 
+                                CacheContinuousQueryEntry entry = new 
CacheContinuousQueryEntry(
+                                    cctx.cacheId(), CREATED, e.key(), 
e.rawGet(), null, 0, 0);
+
                                 next = new CacheContinuousQueryEvent<>(
                                     
cctx.kernalContext().cache().jcache(cctx.name()),
-                                    cctx,
-                                    new 
CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null));
+                                    cctx, entry);
 
                                 if (rmtFilter != null && 
!rmtFilter.evaluate(next))
                                     next = null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridContinuousQueryBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridContinuousQueryBatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridContinuousQueryBatch.java
new file mode 100644
index 0000000..9e47d1d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridContinuousQueryBatch.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.internal.processors.continuous.*;
+import org.jsr166.*;
+
+import java.util.*;
+
+/**
+ * Continuous query batch.
+ */
+class GridContinuousQueryBatch extends GridContinuousBatchAdapter {
+    /** Update indexes. */
+    private final Map<Integer, Long> updateIdxs = new ConcurrentHashMap8<>();
+
+    /**
+     * @return Update indexes.
+     */
+    Map<Integer, Long> updateIndexes() {
+        return updateIdxs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void add(Object obj) {
+        super.add(obj);
+
+        CacheContinuousQueryEntry entry = (CacheContinuousQueryEntry)obj;
+
+        updateIdxs.put(entry.partition(), entry.updateIndex());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
new file mode 100644
index 0000000..ded7e87
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.*;
+
+/**
+ * Continuous routine batch.
+ */
+public interface GridContinuousBatch {
+    /**
+     * Adds element to this batch.
+     *
+     * @param obj Element to add.
+     */
+    public void add(Object obj);
+
+    /**
+     * Collects elements that are currently in this batch.
+     *
+     * @return Elements in this batch.
+     */
+    public Collection<Object> collect();
+
+    /**
+     * @return Current batch size.
+     */
+    public int size();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
new file mode 100644
index 0000000..ef4c069
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.jsr166.*;
+
+import java.util.*;
+
+/**
+ * Continuous routine batch adapter.
+ */
+public class GridContinuousBatchAdapter implements GridContinuousBatch {
+    /** Buffer. */
+    private final ConcurrentLinkedDeque8<Object> buf = new 
ConcurrentLinkedDeque8<>();
+
+    /** {@inheritDoc} */
+    @Override public void add(Object obj) {
+        assert obj != null;
+
+        buf.add(obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Object> collect() {
+        return buf;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return buf.sizex();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 79020da..72aaf0e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -98,6 +98,24 @@ public interface GridContinuousHandler extends 
Externalizable, Cloneable {
     public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws 
IgniteCheckedException;
 
     /**
+     * Creates new batch.
+     *
+     * @return New batch.
+     */
+    public GridContinuousBatch createBatch();
+
+    /**
+     * Called when ack for a batch is received from client.
+     *
+     * @param routineId Routine ID.
+     * @param batch Acknowledged batch.
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, 
GridKernalContext ctx)
+        throws IgniteCheckedException;
+
+    /**
      * @return Topic for ordered notifications. If {@code null}, notifications
      * will be sent in non-ordered messages.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fe0a8d5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index daa9494..457f150 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -600,8 +600,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     public void addNotification(UUID nodeId,
-        UUID routineId,
-        @Nullable Object obj,
+        final UUID routineId,
+        Object obj,
         @Nullable Object orderedTopic,
         boolean sync,
         boolean msg)
@@ -615,7 +615,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         if (processorStopped)
             return;
 
-        RemoteRoutineInfo info = rmtInfos.get(routineId);
+        final RemoteRoutineInfo info = rmtInfos.get(routineId);
 
         if (info != null) {
             assert info.interval == 0 || !sync;
@@ -628,7 +628,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 syncMsgFuts.put(futId, fut);
 
                 try {
-                    sendNotification(nodeId, routineId, futId, F.asList(obj), 
orderedTopic, msg);
+                    sendNotification(nodeId, routineId, futId, F.asList(obj), 
orderedTopic, msg, null);
                 }
                 catch (IgniteCheckedException e) {
                     syncMsgFuts.remove(futId);
@@ -639,10 +639,24 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 fut.get();
             }
             else {
-                Collection<Object> toSnd = info.add(obj);
+                final GridContinuousBatch batch = info.add(obj);
+
+                if (batch != null) {
+                    CI1<IgniteException> ackClosure = new 
CI1<IgniteException>() {
+                        @Override public void apply(IgniteException e) {
+                            if (e == null) {
+                                try {
+                                    info.hnd.onBatchAcknowledged(routineId, 
batch, ctx);
+                                }
+                                catch (IgniteCheckedException ex) {
+                                    U.error(log, "Failed to acknowledge batch: 
" + batch, ex);
+                                }
+                            }
+                        }
+                    };
 
-                if (toSnd != null)
-                    sendNotification(nodeId, routineId, null, toSnd, 
orderedTopic, msg);
+                    sendNotification(nodeId, routineId, null, batch.collect(), 
orderedTopic, msg, ackClosure);
+                }
             }
         }
     }
@@ -667,6 +681,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
      * @param msg If {@code true} then sent data is collection of messages.
+     * @param ackClosure Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
     private void sendNotification(UUID nodeId,
@@ -674,7 +689,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         @Nullable IgniteUuid futId,
         Collection<Object> toSnd,
         @Nullable Object orderedTopic,
-        boolean msg) throws IgniteCheckedException {
+        boolean msg,
+        IgniteInClosure<IgniteException> ackClosure) throws 
IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
         assert toSnd != null;
@@ -682,7 +698,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         sendWithRetries(nodeId,
             new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, 
toSnd, msg),
-            orderedTopic);
+            orderedTopic,
+            ackClosure);
     }
 
     /**
@@ -800,6 +817,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 try {
                     sendWithRetries(nodeId,
                         new GridContinuousMessage(MSG_EVT_ACK, null, 
msg.futureId(), null, false),
+                        null,
                         null);
                 }
                 catch (IgniteCheckedException e) {
@@ -863,15 +881,31 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                                 break;
                             }
 
-                            IgniteBiTuple<Collection<Object>, Long> t = 
info.checkInterval();
+                            IgniteBiTuple<GridContinuousBatch, Long> t = 
info.checkInterval();
 
-                            Collection<Object> toSnd = t.get1();
+                            final GridContinuousBatch batch = t.get1();
 
-                            if (toSnd != null && !toSnd.isEmpty()) {
+                            if (batch != null && batch.size() > 0) {
                                 try {
+                                    Collection<Object> toSnd = batch.collect();
+
                                     boolean msg = toSnd.iterator().next() 
instanceof Message;
 
-                                    sendNotification(nodeId, routineId, null, 
toSnd, hnd.orderedTopic(), msg);
+                                    CI1<IgniteException> ackClosure = new 
CI1<IgniteException>() {
+                                        @Override public void 
apply(IgniteException e) {
+                                            if (e == null) {
+                                                try {
+                                                    
info.hnd.onBatchAcknowledged(routineId, batch, ctx);
+                                                }
+                                                catch (IgniteCheckedException 
ex) {
+                                                    U.error(log, "Failed to 
acknowledge batch: " + batch, ex);
+                                                }
+                                            }
+                                        }
+                                    };
+
+                                    sendNotification(nodeId, routineId, null, 
toSnd, hnd.orderedTopic(), msg,
+                                        ackClosure);
                                 }
                                 catch (ClusterTopologyCheckedException 
ignored) {
                                     if (log.isDebugEnabled())
@@ -949,9 +983,11 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param msg Message.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param ackClosure Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
-    private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, 
@Nullable Object orderedTopic)
+    private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, 
@Nullable Object orderedTopic,
+        IgniteInClosure<IgniteException> ackClosure)
         throws IgniteCheckedException {
         assert nodeId != null;
         assert msg != null;
@@ -959,7 +995,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node != null)
-            sendWithRetries(node, msg, orderedTopic);
+            sendWithRetries(node, msg, orderedTopic, ackClosure);
         else
             throw new ClusterTopologyCheckedException("Node for provided ID 
doesn't exist (did it leave the grid?): " + nodeId);
     }
@@ -969,14 +1005,15 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param msg Message.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param ackClosure Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
-    private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, 
@Nullable Object orderedTopic)
-        throws IgniteCheckedException {
+    private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, 
@Nullable Object orderedTopic,
+        IgniteInClosure<IgniteException> ackClosure) throws 
IgniteCheckedException {
         assert node != null;
         assert msg != null;
 
-        sendWithRetries(F.asList(node), msg, orderedTopic);
+        sendWithRetries(F.asList(node), msg, orderedTopic, ackClosure);
     }
 
     /**
@@ -984,10 +1021,11 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param msg Message.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param ackClosure Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
     private void sendWithRetries(Collection<? extends ClusterNode> nodes, 
GridContinuousMessage msg,
-        @Nullable Object orderedTopic) throws IgniteCheckedException {
+        @Nullable Object orderedTopic, IgniteInClosure<IgniteException> 
ackClosure) throws IgniteCheckedException {
         assert !F.isEmpty(nodes);
         assert msg != null;
 
@@ -1013,7 +1051,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                             true);
                     }
                     else
-                        ctx.io().send(node, TOPIC_CONTINUOUS, msg, 
SYSTEM_POOL);
+                        ctx.io().send(node, TOPIC_CONTINUOUS, msg, 
SYSTEM_POOL, ackClosure);
 
                     break;
                 }
@@ -1114,8 +1152,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         /** Lock. */
         private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-        /** Buffer. */
-        private ConcurrentLinkedDeque8<Object> buf;
+        /** Batch. */
+        private GridContinuousBatch batch;
 
         /** Last send time. */
         private long lastSndTime = U.currentTimeMillis();
@@ -1146,7 +1184,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             this.interval = interval;
             this.autoUnsubscribe = autoUnsubscribe;
 
-            buf = new ConcurrentLinkedDeque8<>();
+            batch = hnd.createBatch();
         }
 
         /**
@@ -1175,20 +1213,22 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         /**
          * @param obj Object to add.
-         * @return Object to send or {@code null} if there is nothing to send 
for now.
+         * @return Batch to send or {@code null} if there is nothing to send 
for now.
          */
-        @Nullable Collection<Object> add(@Nullable Object obj) {
-            ConcurrentLinkedDeque8 buf0 = null;
+        @Nullable GridContinuousBatch add(Object obj) {
+            assert obj != null;
+
+            GridContinuousBatch toSnd = null;
 
-            if (buf.sizex() >= bufSize - 1) {
+            if (batch.size() >= bufSize - 1) {
                 lock.writeLock().lock();
 
                 try {
-                    buf.add(obj);
+                    batch.add(obj);
 
-                    buf0 = buf;
+                    toSnd = batch;
 
-                    buf = new ConcurrentLinkedDeque8<>();
+                    batch = hnd.createBatch();
 
                     if (interval > 0)
                         lastSndTime = U.currentTimeMillis();
@@ -1201,34 +1241,25 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 lock.readLock().lock();
 
                 try {
-                    buf.add(obj);
+                    batch.add(obj);
                 }
                 finally {
                     lock.readLock().unlock();
                 }
             }
 
-            Collection<Object> toSnd = null;
-
-            if (buf0 != null) {
-                toSnd = new ArrayList<>(buf0.sizex());
-
-                for (Object o : buf0)
-                    toSnd.add(o);
-            }
-
             return toSnd;
         }
 
         /**
-         * @return Tuple with objects to sleep (or {@code null} if there is 
nothing to
+         * @return Tuple with batch to send (or {@code null} if there is 
nothing to
          *      send for now) and time interval after next check is needed.
          */
         @SuppressWarnings("TooBroadScope")
-        IgniteBiTuple<Collection<Object>, Long> checkInterval() {
+        IgniteBiTuple<GridContinuousBatch, Long> checkInterval() {
             assert interval > 0;
 
-            Collection<Object> toSnd = null;
+            GridContinuousBatch toSnd = null;
             long diff;
 
             long now = U.currentTimeMillis();
@@ -1238,10 +1269,10 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             try {
                 diff = now - lastSndTime;
 
-                if (diff >= interval && !buf.isEmpty()) {
-                    toSnd = buf;
+                if (diff >= interval && batch.size() > 0) {
+                    toSnd = batch;
 
-                    buf = new ConcurrentLinkedDeque8<>();
+                    batch = hnd.createBatch();
 
                     lastSndTime = now;
                 }

Reply via email to