IGNITE-143 - Continuous queries refactoring (manual merge)

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4f649be2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4f649be2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4f649be2

Branch: refs/heads/ignite-nio
Commit: 4f649be29a298c3950cd09b641fba450d7fd0241
Parents: 3b8f9a6
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Fri Feb 13 16:47:03 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Fri Feb 13 16:47:03 2015 -0800

----------------------------------------------------------------------
 .../datagrid/CacheContinuousQueryExample.java   |  69 +-
 .../cache/query/CacheContinuousQuery.java       | 285 ------
 .../cache/query/CacheContinuousQueryEntry.java  |  49 -
 .../ignite/cache/query/ContinuousQuery.java     | 234 +++--
 .../org/apache/ignite/cache/query/Query.java    |   9 +
 .../ignite/events/CacheQueryExecutedEvent.java  |   8 +-
 .../ignite/events/CacheQueryReadEvent.java      |   8 +-
 .../processors/cache/CacheEntryEvent.java       |  78 --
 .../processors/cache/GridCacheContext.java      |   6 +-
 .../processors/cache/GridCacheMapEntry.java     |  26 +-
 .../processors/cache/GridCacheProcessor.java    |   8 +-
 .../processors/cache/GridCacheProjectionEx.java |   5 +
 .../processors/cache/IgniteCacheProxy.java      |  72 +-
 .../CacheDataStructuresManager.java             |  81 +-
 .../processors/cache/query/CacheQueries.java    |  10 -
 .../cache/query/GridCacheQueriesImpl.java       |   5 -
 .../cache/query/GridCacheQueriesProxy.java      |  12 -
 .../continuous/CacheContinuousQueryEntry.java   | 234 +++++
 .../continuous/CacheContinuousQueryEvent.java   |  87 ++
 .../CacheContinuousQueryFilterEx.java           |  31 +
 .../continuous/CacheContinuousQueryHandler.java | 490 ++++++++++
 .../CacheContinuousQueryListener.java           |  47 +
 .../continuous/CacheContinuousQueryManager.java | 664 ++++++++++++++
 .../GridCacheContinuousQueryAdapter.java        | 319 -------
 .../GridCacheContinuousQueryEntry.java          | 344 -------
 .../GridCacheContinuousQueryFilterEx.java       |  33 -
 .../GridCacheContinuousQueryHandler.java        | 571 ------------
 .../GridCacheContinuousQueryListener.java       |  41 -
 .../GridCacheContinuousQueryManager.java        | 784 ----------------
 .../processors/hadoop/GridHadoopJobId.java      |   3 +-
 .../service/GridServiceProcessor.java           |  75 +-
 .../optimized/optimized-classnames.properties   |   4 +-
 ...ridCacheContinuousQueryAbstractSelfTest.java | 888 ++++---------------
 ...dCacheContinuousQueryReplicatedSelfTest.java |  95 +-
 .../GridContinuousOperationsLoadTest.java       |  54 +-
 .../loadtests/hashmap/GridCacheTestContext.java |   2 +-
 .../hadoop/jobtracker/GridHadoopJobTracker.java |  31 +-
 37 files changed, 2181 insertions(+), 3581 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
index ce05988..ec7a040 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
@@ -18,11 +18,11 @@
 package org.apache.ignite.examples.datagrid;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.lang.*;
 
-import java.util.*;
+import javax.cache.*;
+import javax.cache.event.*;
 
 /**
  * This examples demonstrates continuous query API.
@@ -48,46 +48,51 @@ public class CacheContinuousQueryExample {
             System.out.println();
             System.out.println(">>> Cache continuous query example started.");
 
-            GridCache<Integer, String> cache = ignite.cache(CACHE_NAME);
+            IgniteCache<Integer, String> cache = ignite.jcache(CACHE_NAME);
 
             // Clean up caches on all nodes before run.
-            cache.clear(0);
+            cache.clear();
 
             int keyCnt = 20;
 
+            // These entries will be queried by initial predicate.
             for (int i = 0; i < keyCnt; i++)
-                cache.putx(i, Integer.toString(i));
+                cache.put(i, Integer.toString(i));
 
             // Create new continuous query.
-            try (CacheContinuousQuery<Integer, String> qry = 
cache.queries().createContinuousQuery()) {
-                // Callback that is called locally when update notifications 
are received.
-                qry.localCallback(
-                    new IgniteBiPredicate<UUID, 
Collection<CacheContinuousQueryEntry<Integer, String>>>() {
-                        @Override public boolean apply(
-                            UUID nodeId,
-                            Collection<CacheContinuousQueryEntry<Integer, 
String>> entries
-                        ) {
-                            for (CacheContinuousQueryEntry<Integer, String> e 
: entries)
-                                System.out.println("Queried entry [key=" + 
e.getKey() + ", val=" + e.getValue() + ']');
-
-                            return true; // Return true to continue listening.
-                        }
-                    });
-
-                // This filter will be evaluated remotely on all nodes
-                // Entry that pass this filter will be sent to the caller.
-                qry.remoteFilter(new 
IgnitePredicate<CacheContinuousQueryEntry<Integer, String>>() {
-                    @Override public boolean 
apply(CacheContinuousQueryEntry<Integer, String> e) {
-                        return e.getKey() > 15;
-                    }
-                });
-
-                // Execute query.
-                qry.execute();
+            ContinuousQuery<Integer, String> qry = Query.continuous();
+
+            qry.setInitialPredicate(Query.scan(new IgniteBiPredicate<Integer, 
String>() {
+                @Override public boolean apply(Integer key, String val) {
+                    return key > 10;
+                }
+            }));
+
+            // Callback that is called locally when update notifications are 
received.
+            qry.setLocalListener(new CacheEntryUpdatedListener<Integer, 
String>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends Integer, ? extends String>> evts) {
+                    for (CacheEntryEvent<? extends Integer, ? extends String> 
e : evts)
+                        System.out.println("Updated entry [key=" + e.getKey() 
+ ", val=" + e.getValue() + ']');
+                }
+            });
+
+            // This filter will be evaluated remotely on all nodes.
+            // Entry that pass this filter will be sent to the caller.
+            qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() {
+                @Override public boolean evaluate(CacheEntryEvent<? extends 
Integer, ? extends String> e) {
+                    return e.getKey() > 25;
+                }
+            });
+
+            // Execute query.
+            try (QueryCursor<Cache.Entry<Integer, String>> cur = 
cache.query(qry)) {
+                // Iterate through existing data.
+                for (Cache.Entry<Integer, String> e : cur)
+                    System.out.println("Queried existing entry [key=" + 
e.getKey() + ", val=" + e.getValue() + ']');
 
                 // Add a few more keys and watch more query notifications.
-                for (int i = keyCnt; i < keyCnt + 5; i++)
-                    cache.putx(i, Integer.toString(i));
+                for (int i = keyCnt; i < keyCnt + 10; i++)
+                    cache.put(i, Integer.toString(i));
 
                 // Wait for a while while callback is notified about remaining 
puts.
                 Thread.sleep(2000);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java
deleted file mode 100644
index eaac9b8..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.query;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * API for configuring and executing continuous cache queries.
- * <p>
- * Continuous queries are executed as follows:
- * <ol>
- * <li>
- *  Query is sent to requested grid nodes. Note that for {@link 
org.apache.ignite.cache.CacheMode#LOCAL LOCAL}
- *  and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches 
query will be always executed
- *  locally.
- * </li>
- * <li>
- *  Each node iterates through existing cache data and registers listeners 
that will
- *  notify about further updates.
- * <li>
- *  Each key-value pair is passed through optional filter and if this filter 
returns
- *  true, key-value pair is sent to the master node (the one that executed 
query).
- *  If filter is not provided, all pairs are sent.
- * </li>
- * <li>
- *  When master node receives key-value pairs, it notifies the local callback.
- * </li>
- * </ol>
- * <h2 class="header">NOTE</h2>
- * Under some concurrent circumstances callback may get several notifications
- * for one cache update. This should be taken into account when implementing 
callback.
- * <h1 class="header">Query usage</h1>
- * As an example, suppose we have cache with {@code 'Person'} objects and we 
need
- * to query all persons with salary above 1000.
- * <p>
- * Here is the {@code Person} class:
- * <pre name="code" class="java">
- * public class Person {
- *     // Name.
- *     private String name;
- *
- *     // Salary.
- *     private double salary;
- *
- *     ...
- * }
- * </pre>
- * <p>
- * You can create and execute continuous query like so:
- * <pre name="code" class="java">
- * // Create new continuous query.
- * qry = cache.createContinuousQuery();
- *
- * // Callback that is called locally when update notifications are received.
- * // It simply prints out information about all created persons.
- * qry.callback(new GridPredicate2&lt;UUID, Collection&lt;Map.Entry&lt;UUID, 
Person&gt;&gt;&gt;() {
- *     &#64;Override public boolean apply(UUID uuid, 
Collection&lt;Map.Entry&lt;UUID, Person&gt;&gt; entries) {
- *         for (Map.Entry&lt;UUID, Person&gt; e : entries) {
- *             Person p = e.getValue();
- *
- *             X.println("&gt;&gt;&gt;");
- *             X.println("&gt;&gt;&gt; " + p.getFirstName() + " " + 
p.getLastName() +
- *                 "'s salary is " + p.getSalary());
- *             X.println("&gt;&gt;&gt;");
- *         }
- *
- *         return true;
- *     }
- * });
- *
- * // This query will return persons with salary above 1000.
- * qry.filter(new GridPredicate2&lt;UUID, Person&gt;() {
- *     &#64;Override public boolean apply(UUID uuid, Person person) {
- *         return person.getSalary() &gt; 1000;
- *     }
- * });
- *
- * // Execute query.
- * qry.execute();
- * </pre>
- * This will execute query on all nodes that have cache you are working with 
and notify callback
- * with both data that already exists in cache and further updates.
- * <p>
- * To stop receiving updates call {@link #close()} method:
- * <pre name="code" class="java">
- * qry.cancel();
- * </pre>
- * Note that one query instance can be executed only once. After it's 
cancelled, it's non-operational.
- * If you need to repeat execution, use {@link 
CacheQueries#createContinuousQuery()} method to create
- * new query.
- */
-public interface CacheContinuousQuery<K, V> extends AutoCloseable {
-    /**
-     * Default buffer size. Size of {@code 1} means that all entries
-     * will be sent to master node immediately (buffering is disabled).
-     */
-    public static final int DFLT_BUF_SIZE = 1;
-
-    /** Maximum default time interval after which buffer will be flushed (if 
buffering is enabled). */
-    public static final long DFLT_TIME_INTERVAL = 0;
-
-    /**
-     * Default value for automatic unsubscription flag. Remote filters
-     * will be unregistered by default if master node leaves topology.
-     */
-    public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;
-
-    /**
-     * Sets local callback. This callback is called only
-     * in local node when new updates are received.
-     * <p>
-     * The callback predicate accepts ID of the node from where updates
-     * are received and collection of received entries. Note that
-     * for removed entries value will be {@code null}.
-     * <p>
-     * If the predicate returns {@code false}, query execution will
-     * be cancelled.
-     * <p>
-     * <b>WARNING:</b> all operations that involve any kind of JVM-local
-     * or distributed locking (e.g., synchronization or transactional
-     * cache operations), should be executed asynchronously without
-     * blocking the thread that called the callback. Otherwise, you
-     * can get deadlocks.
-     *
-     * @param locCb Local callback.
-     */
-    public void localCallback(IgniteBiPredicate<UUID, 
Collection<CacheContinuousQueryEntry<K, V>>> locCb);
-
-    /**
-     * Gets local callback. See {@link #localCallback(IgniteBiPredicate)} for 
more information.
-     *
-     * @return Local callback.
-     */
-    @Nullable public IgniteBiPredicate<UUID, 
Collection<CacheContinuousQueryEntry<K, V>>> localCallback();
-
-    /**
-     * Sets optional key-value filter. This filter is called before
-     * entry is sent to the master node.
-     * <p>
-     * <b>WARNING:</b> all operations that involve any kind of JVM-local
-     * or distributed locking (e.g., synchronization or transactional
-     * cache operations), should be executed asynchronously without
-     * blocking the thread that called the filter. Otherwise, you
-     * can get deadlocks.
-     *
-     * @param filter Key-value filter.
-     */
-    public void remoteFilter(@Nullable 
IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter);
-
-    /**
-     * Gets key-value filter. See {@link #remoteFilter(IgnitePredicate)} for 
more information.
-     *
-     * @return Key-value filter.
-     */
-    @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> 
remoteFilter();
-
-    /**
-     * Sets buffer size.
-     * <p>
-     * When a cache update happens, entry is first put into a buffer.
-     * Entries from buffer will be sent to the master node only if
-     * the buffer is full or time provided via {@link #timeInterval(long)}
-     * method is exceeded.
-     * <p>
-     * Default buffer size is {@code 1} which means that entries will
-     * be sent immediately (buffering is disabled).
-     *
-     * @param bufSize Buffer size.
-     */
-    public void bufferSize(int bufSize);
-
-    /**
-     * Gets buffer size. See {@link #bufferSize(int)} for more information.
-     *
-     * @return Buffer size.
-     */
-    public int bufferSize();
-
-    /**
-     * Sets time interval.
-     * <p>
-     * When a cache update happens, entry is first put into a buffer.
-     * Entries from buffer will be sent to the master node only if
-     * the buffer is full (its size can be provided via {@link 
#bufferSize(int)}
-     * method) or time provided via this method is exceeded.
-     * <p>
-     * Default time interval is {@code 0} which means that time check is
-     * disabled and entries will be sent only when buffer is full.
-     *
-     * @param timeInterval Time interval.
-     */
-    public void timeInterval(long timeInterval);
-
-    /**
-     * Gets time interval. See {@link #timeInterval(long)} for more 
information.
-     *
-     * @return Gets time interval.
-     */
-    public long timeInterval();
-
-    /**
-     * Sets automatic unsubscribe flag.
-     * <p>
-     * This flag indicates that query filters on remote nodes should be 
automatically
-     * unregistered if master node (node that initiated the query) leaves 
topology.
-     * If this flag is {@code false}, filters will be unregistered only when
-     * the query is cancelled from master node, and won't ever be unregistered 
if
-     * master node leaves grid.
-     * <p>
-     * Default value for this flag is {@code true}.
-     *
-     * @param autoUnsubscribe Automatic unsubscription flag.
-     */
-    public void autoUnsubscribe(boolean autoUnsubscribe);
-
-    /**
-     * Gets automatic unsubscribe flag. See {@link #autoUnsubscribe(boolean)}
-     * for more information.
-     *
-     * @return Automatic unsubscribe flag.
-     */
-    public boolean isAutoUnsubscribe();
-
-    /**
-     * Starts continuous query execution on the whole grid.
-     * <p>
-     * Note that if grid contains nodes without appropriate cache,
-     * these nodes will be filtered out.
-     * <p>
-     * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL}
-     * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} 
caches
-     * query will be always executed locally.
-     *
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void execute() throws IgniteCheckedException;
-
-    /**
-     * Starts continuous query execution on provided set of nodes.
-     * <p>
-     * Note that if provided projection contains nodes without
-     * appropriate cache, these nodes will be filtered out.
-     * <p>
-     * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL}
-     * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} 
caches
-     * query will be always executed locally.
-     *
-     * @param prj Grid projection.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void execute(@Nullable ClusterGroup prj) throws 
IgniteCheckedException;
-
-    /**
-     * Stops continuous query execution.
-     * <p>
-     * Note that one query instance can be executed only once.
-     * After it's cancelled, it's non-operational.
-     * If you need to repeat execution, use {@link 
CacheQueries#createContinuousQuery()}
-     * method to create new query.
-     *
-     * @throws IgniteCheckedException In case of error.
-     */
-    @Override public void close() throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java
deleted file mode 100644
index 90d3602..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.query;
-
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Entry used for continuous query notifications.
- */
-public interface CacheContinuousQueryEntry<K, V> extends Map.Entry<K, V>, 
Serializable {
-    /**
-     * Gets entry key.
-     *
-     * @return Entry key.
-     */
-    @Override public K getKey();
-
-    /**
-     * Gets entry new value. New value may be null, if entry is being removed.
-     *
-     * @return Entry new value.
-     */
-    @Override @Nullable public V getValue();
-
-    /**
-     * Gets entry old value. Old value may be null if entry is being inserted 
(not updated).
-     *
-     * @return Gets entry old value.
-     */
-    @Nullable public V getOldValue();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index b02c65f..35303ab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -22,31 +22,24 @@ import org.apache.ignite.*;
 import javax.cache.event.*;
 
 /**
- * API for configuring and executing continuous cache queries.
+ * API for configuring continuous cache queries.
  * <p>
- * Continuous queries are executed as follows:
- * <ol>
- * <li>
- *  Query is sent to requested grid nodes. Note that for {@link 
org.apache.ignite.cache.CacheMode#LOCAL LOCAL}
- *  and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches 
query will be always executed
- *  locally.
- * </li>
- * <li>
- *  Each node iterates through existing cache data and registers listeners 
that will
- *  notify about further updates.
- * <li>
- *  Each key-value pair is passed through optional filter and if this filter 
returns
- *  true, key-value pair is sent to the master node (the one that executed 
query).
- *  If filter is not provided, all pairs are sent.
- * </li>
- * <li>
- *  When master node receives key-value pairs, it notifies the local callback.
- * </li>
- * </ol>
- * <h2 class="header">NOTE</h2>
- * Under some concurrent circumstances callback may get several notifications
- * for one cache update. This should be taken into account when implementing 
callback.
- * <h1 class="header">Query usage</h1>
+ * Continuous queries allow to register a remote filter and a local listener
+ * for cache updates. If an update event passes the filter, it will be sent to
+ * the node that executed the query and local listener will be notified.
+ * <p>
+ * Additionally, you can execute initial query to get currently existing data.
+ * Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link 
#setInitialPredicate(Query)}
+ * method.
+ * <p>
+ * Query can be executed either on all nodes in topology using {@link 
IgniteCache#query(Query)}
+ * method of only on the local node using {@link 
IgniteCache#localQuery(Query)} method.
+ * Note that in case query is distributed and a new node joins, it will get 
the remote
+ * filter for the query during discovery process before it actually joins 
topology,
+ * so no updates will be missed.
+ * <p>
+ * To create a new instance of continuous query use {@link Query#continuous()} 
factory method.
+ * <h1 class="header">Example</h1>
  * As an example, suppose we have cache with {@code 'Person'} objects and we 
need
  * to query all persons with salary above 1000.
  * <p>
@@ -66,13 +59,21 @@ import javax.cache.event.*;
  * You can create and execute continuous query like so:
  * <pre name="code" class="java">
  * // Create new continuous query.
- * qry = cache.createContinuousQuery();
+ * ContinuousQuery qry = Query.continuous();
+ *
+ * // Initial iteration query will return all persons with salary above 1000.
+ * qry.setInitialPredicate(Query.scan(new IgniteBiPredicate&lt;UUID, 
Person&gt;() {
+ *     &#64;Override public boolean apply(UUID id, Person p) {
+ *         return p.getSalary() &gt; 1000;
+ *     }
+ * }));
+ *
  *
  * // Callback that is called locally when update notifications are received.
  * // It simply prints out information about all created persons.
- * qry.callback(new GridPredicate2&lt;UUID, Collection&lt;Map.Entry&lt;UUID, 
Person&gt;&gt;&gt;() {
- *     &#64;Override public boolean apply(UUID uuid, 
Collection&lt;Map.Entry&lt;UUID, Person&gt;&gt; entries) {
- *         for (Map.Entry&lt;UUID, Person&gt; e : entries) {
+ * qry.setLocalListener(new CacheEntryUpdatedListener&lt;UUID, Person&gt;() {
+ *     &#64;Override public void onUpdated(Iterable&lt;CacheEntryEvent&lt;? 
extends UUID, ? extends Person&gt;&gt; evts) {
+ *         for (CacheEntryEvent&lt;? extends UUID, ? extends Person&gt; e : 
evts) {
  *             Person p = e.getValue();
  *
  *             X.println("&gt;&gt;&gt;");
@@ -80,33 +81,31 @@ import javax.cache.event.*;
  *                 "'s salary is " + p.getSalary());
  *             X.println("&gt;&gt;&gt;");
  *         }
- *
- *         return true;
  *     }
  * });
  *
- * // This query will return persons with salary above 1000.
- * qry.filter(new GridPredicate2&lt;UUID, Person&gt;() {
- *     &#64;Override public boolean apply(UUID uuid, Person person) {
- *         return person.getSalary() &gt; 1000;
+ * // Continuous listener will be notified for persons with salary above 1000.
+ * qry.setRemoteFilter(new CacheEntryEventFilter&lt;UUID, Person&gt;() {
+ *     &#64;Override public boolean evaluate(CacheEntryEvent&lt;? extends 
UUID, ? extends Person&gt; e) {
+ *         return e.getValue().getSalary() &gt; 1000;
  *     }
  * });
  *
- * // Execute query.
- * qry.execute();
+ * // Execute query and get cursor that iterates through initial data.
+ * QueryCursor&lt;Cache.Entry&lt;UUID, Person&gt;&gt; cur = cache.query(qry);
  * </pre>
- * This will execute query on all nodes that have cache you are working with 
and notify callback
- * with both data that already exists in cache and further updates.
+ * This will execute query on all nodes that have cache you are working with 
and
+ * listener will start to receive notifications for cache updates.
  * <p>
- * To stop receiving updates call {@link #close()} method:
+ * To stop receiving updates call {@link QueryCursor#close()} method:
  * <pre name="code" class="java">
- * qry.cancel();
+ * cur.close();
  * </pre>
- * Note that one query instance can be executed only once. After it's 
cancelled, it's non-operational.
- * If you need to repeat execution, use {@link 
org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()}
 method to create
- * new query.
+ * Note that this works even if you didn't provide initial query. Cursor will
+ * be empty in this case, but it will still unregister listeners when {@link 
QueryCursor#close()}
+ * is called.
  */
-public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> 
implements AutoCloseable {
+public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -125,13 +124,50 @@ public final class ContinuousQuery<K, V> extends 
Query<ContinuousQuery<K,V>> imp
      */
     public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;
 
-    public void setInitialPredicate(Query filter) {
-        // TODO: implement.
+    /** Initial filter. */
+    private Query initFilter;
+
+    /** Local listener. */
+    private CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** Remote filter. */
+    private CacheEntryEventFilter<K, V> rmtFilter;
+
+    /** Buffer size. */
+    private int bufSize = DFLT_BUF_SIZE;
+
+    /** Time interval. */
+    private long timeInterval = DFLT_TIME_INTERVAL;
+
+    /** Automatic unsubscription flag. */
+    private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
+
+    /**
+     * Sets initial query.
+     * <p>
+     * This query will be executed before continuous listener is registered
+     * which allows to iterate through entries which already existed at the
+     * time continuous query is executed.
+     *
+     * @param initFilter Initial query.
+     */
+    public void setInitialPredicate(Query initFilter) {
+        this.initFilter = initFilter;
+    }
+
+    /**
+     * Gets initial query.
+     *
+     * @return Initial query.
+     */
+    public Query getInitialPredicate() {
+        return initFilter;
     }
 
     /**
      * Sets local callback. This callback is called only in local node when 
new updates are received.
-     * <p> The callback predicate accepts ID of the node from where updates 
are received and collection
+     * <p>
+     * The callback predicate accepts ID of the node from where updates are 
received and collection
      * of received entries. Note that for removed entries value will be {@code 
null}.
      * <p>
      * If the predicate returns {@code false}, query execution will be 
cancelled.
@@ -143,7 +179,16 @@ public final class ContinuousQuery<K, V> extends 
Query<ContinuousQuery<K,V>> imp
      * @param locLsnr Local callback.
      */
     public void setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) {
-        // TODO: implement.
+        this.locLsnr = locLsnr;
+    }
+
+    /**
+     * Gets local listener.
+     *
+     * @return Local listener.
+     */
+    public CacheEntryUpdatedListener<K, V> getLocalListener() {
+        return locLsnr;
     }
 
     /**
@@ -153,56 +198,99 @@ public final class ContinuousQuery<K, V> extends 
Query<ContinuousQuery<K,V>> imp
      * (e.g., synchronization or transactional cache operations), should be 
executed asynchronously
      * without blocking the thread that called the filter. Otherwise, you can 
get deadlocks.
      *
-     * @param filter Key-value filter.
+     * @param rmtFilter Key-value filter.
+     */
+    public void setRemoteFilter(CacheEntryEventFilter<K, V> rmtFilter) {
+        this.rmtFilter = rmtFilter;
+    }
+
+    /**
+     * Gets remote filter.
+     *
+     * @return Remote filter.
      */
-    public void setRemoteFilter(CacheEntryEventFilter<K, V> filter) {
-        // TODO: implement.
+    public CacheEntryEventFilter<K, V> getRemoteFilter() {
+        return rmtFilter;
     }
 
     /**
-     * Sets buffer size. <p> When a cache update happens, entry is first put 
into a buffer. Entries from buffer will be
-     * sent to the master node only if the buffer is full or time provided via 
{@link #timeInterval(long)} method is
-     * exceeded. <p> Default buffer size is {@code 1} which means that entries 
will be sent immediately (buffering is
+     * Sets buffer size.
+     * <p>
+     * When a cache update happens, entry is first put into a buffer. Entries 
from buffer will be
+     * sent to the master node only if the buffer is full or time provided via 
{@link #setTimeInterval(long)} method is
+     * exceeded.
+     * <p>
+     * Default buffer size is {@code 1} which means that entries will be sent 
immediately (buffering is
      * disabled).
      *
      * @param bufSize Buffer size.
      */
-    public void bufferSize(int bufSize) {
-        // TODO: implement.
+    public void setBufferSize(int bufSize) {
+        if (bufSize <= 0)
+            throw new IllegalArgumentException("Buffer size must be above 
zero.");
+
+        this.bufSize = bufSize;
     }
 
     /**
-     * Sets time interval. <p> When a cache update happens, entry is first put 
into a buffer. Entries from buffer will
-     * be sent to the master node only if the buffer is full (its size can be 
provided via {@link #bufferSize(int)}
-     * method) or time provided via this method is exceeded. <p> Default time 
interval is {@code 0} which means that
+     * Gets buffer size.
+     *
+     * @return Buffer size.
+     */
+    public int getBufferSize() {
+        return bufSize;
+    }
+
+    /**
+     * Sets time interval.
+     * <p>
+     * When a cache update happens, entry is first put into a buffer. Entries 
from buffer will
+     * be sent to the master node only if the buffer is full (its size can be 
provided via {@link #setBufferSize(int)}
+     * method) or time provided via this method is exceeded.
+     * <p>
+     * Default time interval is {@code 0} which means that
      * time check is disabled and entries will be sent only when buffer is 
full.
      *
      * @param timeInterval Time interval.
      */
-    public void timeInterval(long timeInterval) {
-        // TODO: implement.
+    public void setTimeInterval(long timeInterval) {
+        if (timeInterval < 0)
+            throw new IllegalArgumentException("Time interval can't be 
negative.");
+
+        this.timeInterval = timeInterval;
+    }
+
+    /**
+     * Gets time interval.
+     *
+     * @return Time interval.
+     */
+    public long getTimeInterval() {
+        return timeInterval;
     }
 
     /**
-     * Sets automatic unsubscribe flag. <p> This flag indicates that query 
filters on remote nodes should be
-     * automatically unregistered if master node (node that initiated the 
query) leaves topology. If this flag is {@code
-     * false}, filters will be unregistered only when the query is cancelled 
from master node, and won't ever be
-     * unregistered if master node leaves grid. <p> Default value for this 
flag is {@code true}.
+     * Sets automatic unsubscribe flag.
+     * <p>
+     * This flag indicates that query filters on remote nodes should be
+     * automatically unregistered if master node (node that initiated the 
query) leaves topology. If this flag is
+     * {@code false}, filters will be unregistered only when the query is 
cancelled from master node, and won't ever be
+     * unregistered if master node leaves grid.
+     * <p>
+     * Default value for this flag is {@code true}.
      *
      * @param autoUnsubscribe Automatic unsubscription flag.
      */
-    public void autoUnsubscribe(boolean autoUnsubscribe) {
-        // TODO: implement.
+    public void setAutoUnsubscribe(boolean autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
     }
 
     /**
-     * Stops continuous query execution. <p> Note that one query instance can 
be executed only once. After it's
-     * cancelled, it's non-operational. If you need to repeat execution, use 
{@link
-     * 
org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()}
 method to create new query.
+     * Gets automatic unsubscription flag value.
      *
-     * @throws IgniteCheckedException In case of error.
+     * @return Automatic unsubscription flag.
      */
-    @Override public void close() throws IgniteCheckedException {
-        // TODO: implement.
+    public boolean isAutoUnsubscribe() {
+        return autoUnsubscribe;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
index 744d8d2..c24d704 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
@@ -106,6 +106,15 @@ public abstract class Query<T extends Query> implements 
Serializable {
     }
 
     /**
+     * Factory method for continuous queries.
+     *
+     * @return Continuous query.
+     */
+    public static <K, V> ContinuousQuery<K, V> continuous() {
+        return new ContinuousQuery<>();
+    }
+
+    /**
      * Gets optional page size, if {@code 0}, then {@link 
CacheQueryConfiguration#getPageSize()} is used.
      *
      * @return Optional page size.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
 
b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
index 51810a2..a7563a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.events;
 
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -25,6 +24,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import java.util.*;
 
 /**
@@ -84,7 +84,7 @@ public class CacheQueryExecutedEvent<K, V> extends 
EventAdapter {
 
     /** Continuous query filter. */
     @GridToStringInclude
-    private final IgnitePredicate<CacheContinuousQueryEntry<K, V>> 
contQryFilter;
+    private final CacheEntryEventFilter<K, V> contQryFilter;
 
     /** Query arguments. */
     @GridToStringInclude
@@ -117,7 +117,7 @@ public class CacheQueryExecutedEvent<K, V> extends 
EventAdapter {
         @Nullable String clsName,
         @Nullable String clause,
         @Nullable IgniteBiPredicate<K, V> scanQryFilter,
-        @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> 
contQryFilter,
+        @Nullable CacheEntryEventFilter<K, V> contQryFilter,
         @Nullable Object[] args,
         @Nullable UUID subjId,
         @Nullable String taskName) {
@@ -194,7 +194,7 @@ public class CacheQueryExecutedEvent<K, V> extends 
EventAdapter {
      *
      * @return Continuous query filter.
      */
-    @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> 
continuousQueryFilter() {
+    @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() {
         return contQryFilter;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java 
b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java
index 79b5eca..1959976 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.events;
 
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -25,6 +24,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import java.util.*;
 
 /**
@@ -84,7 +84,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter {
 
     /** Continuous query filter. */
     @GridToStringInclude
-    private final IgnitePredicate<CacheContinuousQueryEntry<K, V>> 
contQryFilter;
+    private final CacheEntryEventFilter<K, V> contQryFilter;
 
     /** Query arguments. */
     @GridToStringInclude
@@ -135,7 +135,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter 
{
         @Nullable String clsName,
         @Nullable String clause,
         @Nullable IgniteBiPredicate<K, V> scanQryFilter,
-        @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> 
contQryFilter,
+        @Nullable CacheEntryEventFilter<K, V> contQryFilter,
         @Nullable Object[] args,
         @Nullable UUID subjId,
         @Nullable String taskName,
@@ -220,7 +220,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter 
{
      *
      * @return Continuous query filter.
      */
-    @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> 
continuousQueryFilter() {
+    @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() {
         return contQryFilter;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java
deleted file mode 100644
index ffbc85b..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-
-import javax.cache.event.*;
-
-/**
- * Implementation of {@link javax.cache.event.CacheEntryEvent}.
- */
-public class CacheEntryEvent<K, V> extends 
javax.cache.event.CacheEntryEvent<K, V> {
-    /** */
-    private final CacheContinuousQueryEntry<K, V> e;
-
-    /**
-     * @param src Cache.
-     * @param type Event type.
-     * @param e Ignite event.
-     */
-    public CacheEntryEvent(IgniteCache src, EventType type, 
CacheContinuousQueryEntry<K, V> e) {
-        super(src, type);
-
-        this.e = e;
-    }
-
-    /** {@inheritDoc} */
-    @Override public V getOldValue() {
-        return e.getOldValue();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isOldValueAvailable() {
-        return e.getOldValue() != null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public K getKey() {
-        return e.getKey();
-    }
-
-    /** {@inheritDoc} */
-    @Override public V getValue() {
-        return e.getValue();
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T unwrap(Class<T> cls) {
-        if(cls.isAssignableFrom(getClass()))
-            return cls.cast(this);
-
-        throw new IllegalArgumentException("Unwrapping to class is not 
supported: " + cls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "CacheEntryEvent [evtType=" + getEventType() +
-            ", key=" + getKey() +
-            ", val=" + getValue() +
-            ", oldVal=" + getOldValue() + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 39758a6..36d7f1a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -111,7 +111,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     private GridCacheQueryManager<K, V> qryMgr;
 
     /** Continuous query manager. */
-    private GridCacheContinuousQueryManager<K, V> contQryMgr;
+    private CacheContinuousQueryManager<K, V> contQryMgr;
 
     /** Swap manager. */
     private GridCacheSwapManager<K, V> swapMgr;
@@ -240,7 +240,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
         GridCacheStoreManager<K, V> storeMgr,
         GridCacheEvictionManager<K, V> evictMgr,
         GridCacheQueryManager<K, V> qryMgr,
-        GridCacheContinuousQueryManager<K, V> contQryMgr,
+        CacheContinuousQueryManager<K, V> contQryMgr,
         GridCacheAffinityManager<K, V> affMgr,
         CacheDataStructuresManager<K, V> dataStructuresMgr,
         GridCacheTtlManager<K, V> ttlMgr,
@@ -867,7 +867,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     /**
      * @return Continuous query manager, {@code null} if disabled.
      */
-    public GridCacheContinuousQueryManager<K, V> continuousQueries() {
+    public CacheContinuousQueryManager<K, V> continuousQueries() {
         return contQryMgr;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 2f8bb62..c69ad4a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1165,7 +1165,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             }
 
             if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdate(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1324,7 +1324,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                 }
 
                 if (cctx.isLocal() || cctx.isReplicated() || (tx != null && 
tx.local() && !isNear()))
-                    cctx.continuousQueries().onEntryUpdate(this, key, null, 
null, old, oldBytes, false);
+                    cctx.continuousQueries().onEntryUpdated(this, key, null, 
null, old, oldBytes);
 
                 cctx.dataStructures().onEntryUpdated(key, true);
             }
@@ -1633,7 +1633,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             if (res)
                 updateMetrics(op, metrics);
 
-            cctx.continuousQueries().onEntryUpdate(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
+            cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -1645,7 +1645,8 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             }
         }
 
-        return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != 
null ? interceptorRes.get2() : old), invokeRes);
+        return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != 
null ? interceptorRes.get2() : old),
+            invokeRes);
     }
 
     /** {@inheritDoc} */
@@ -2204,8 +2205,8 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
             if (res)
                 updateMetrics(op, metrics);
 
-            if (primary)
-                cctx.continuousQueries().onEntryUpdate(this, key, val, 
valueBytesUnlocked(), old, oldBytes, false);
+            if (cctx.isReplicated() || primary)
+                cctx.continuousQueries().onEntryUpdated(this, key, val, 
valueBytesUnlocked(), old, oldBytes);
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
 
@@ -3228,15 +3229,10 @@ public abstract class GridCacheMapEntry<K, V> 
implements GridCacheEntryEx<K, V>
                 drReplicate(drType, val, valBytes, ver);
 
                 if (!skipQryNtf) {
-                    if (cctx.affinity().primary(cctx.localNode(), key, 
topVer)) {
-                        cctx.continuousQueries().onEntryUpdate(this,
-                            key,
-                            val,
-                            valueBytesUnlocked(),
-                            null,
-                            null,
-                            preload);
-                    }
+                    if (!preload && (cctx.isLocal() || cctx.isReplicated() ||
+                        cctx.affinity().primary(cctx.localNode(), key, 
topVer)))
+                        cctx.continuousQueries().onEntryUpdated(this, key, 
val, valueBytesUnlocked(), null, null);
+
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index a4ea863..cb8982b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -625,7 +625,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             GridCacheSwapManager swapMgr = new 
GridCacheSwapManager(cfg.getCacheMode() == LOCAL || 
!GridCacheUtils.isNearEnabled(cfg));
             GridCacheEvictionManager evictMgr = new GridCacheEvictionManager();
             GridCacheQueryManager qryMgr = queryManager(cfg);
-            GridCacheContinuousQueryManager contQryMgr = new 
GridCacheContinuousQueryManager();
+            CacheContinuousQueryManager contQryMgr = new 
CacheContinuousQueryManager();
             CacheDataStructuresManager dataStructuresMgr = new 
CacheDataStructuresManager();
             GridCacheTtlManager ttlMgr = new GridCacheTtlManager();
             GridCacheDrManager drMgr = 
ctx.createComponent(GridCacheDrManager.class);
@@ -761,7 +761,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                  * 2. GridCacheIoManager
                  * 3. GridCacheDeploymentManager
                  * 4. GridCacheQueryManager (note, that we start it for DHT 
cache though).
-                 * 5. GridCacheContinuousQueryManager (note, that we start it 
for DHT cache though).
+                 * 5. CacheContinuousQueryManager (note, that we start it for 
DHT cache though).
                  * 6. GridCacheDgcManager
                  * 7. GridCacheTtlManager.
                  * ===============================================
@@ -1587,8 +1587,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      *
      * @return Utility cache.
      */
-    public <K, V> GridCache<K, V> utilityCache() {
-        return cache(CU.UTILITY_CACHE_NAME);
+    public <K, V> GridCacheAdapter<K, V> utilityCache() {
+        return internalCache(CU.UTILITY_CACHE_NAME);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
index ce7ec24..4ec2dc5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
@@ -454,4 +454,9 @@ public interface GridCacheProjectionEx<K, V> extends 
CacheProjection<K, V> {
     public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> 
invokeAllAsync(
         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
         Object... args);
+
+    /**
+     * @return Context.
+     */
+    public GridCacheContext<K, V> context();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 433837d..53fc796 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -325,6 +325,64 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /**
+     * Executes continuous query.
+     *
+     * @param qry Query.
+     * @param loc Local flag.
+     * @return Initial iteration cursor.
+     */
+    private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, 
boolean loc) {
+        if (qry.getInitialPredicate() instanceof ContinuousQuery)
+            throw new IgniteException("Initial predicate for continuous query 
can't be an instance of another " +
+                "continuous query. Use SCAN or SQL query for initial 
iteration.");
+
+        if (qry.getLocalListener() == null)
+            throw new IgniteException("Mandatory local listener is not set for 
the query: " + qry);
+
+        try {
+            final UUID routineId = ctx.continuousQueries().executeQuery(
+                qry.getLocalListener(),
+                qry.getRemoteFilter(),
+                qry.getBufferSize(),
+                qry.getTimeInterval(),
+                qry.isAutoUnsubscribe(),
+                loc ? ctx.grid().forLocal() : null);
+
+            final QueryCursor<Cache.Entry<K, V>> cur;
+
+            if (qry.getInitialPredicate() != null)
+                cur = loc ? localQuery(qry.getInitialPredicate()) : 
query(qry.getInitialPredicate());
+            else
+                cur = null;
+
+            return new QueryCursor<Cache.Entry<K, V>>() {
+                @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                    return cur != null ? cur.iterator() : new 
GridEmptyIterator<Cache.Entry<K, V>>();
+                }
+
+                @Override public List<Cache.Entry<K, V>> getAll() {
+                    return cur != null ? cur.getAll() : 
Collections.<Cache.Entry<K, V>>emptyList();
+                }
+
+                @Override public void close() {
+                    if (cur != null)
+                        cur.close();
+
+                    try {
+                        
ctx.kernalContext().continuous().stopRoutine(routineId).get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw U.convertException(e);
+                    }
+                }
+            };
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
      * @param local Enforce local.
      * @return Local node cluster group.
      */
@@ -333,6 +391,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public QueryCursor<Entry<K,V>> query(Query qry) {
         A.notNull(qry, "qry");
 
@@ -344,6 +403,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
             if (qry instanceof SqlQuery) {
                 return null; // TODO
             }
+            else if (qry instanceof ContinuousQuery)
+                return queryContinuous((ContinuousQuery<K, V>)qry, false);
 
             return query(qry, projection(false));
         }
@@ -407,11 +468,12 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * @throws CacheException If query indexing disabled for sql query.
      */
     private void validate(Query qry) {
-        if (!(qry instanceof ScanQuery) && !ctx.config().isQueryIndexEnabled())
+        if (!(qry instanceof ScanQuery) && !(qry instanceof ContinuousQuery) 
&& !ctx.config().isQueryIndexEnabled())
             throw new CacheException("Indexing is disabled for cache: " + 
ctx.cache().name());
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public QueryCursor<Entry<K,V>> localQuery(Query qry) {
         A.notNull(qry, "qry");
 
@@ -422,6 +484,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
 
             if (qry instanceof SqlQuery)
                 return doLocalQuery((SqlQuery)qry);
+            else if (qry instanceof ContinuousQuery)
+                return queryContinuous((ContinuousQuery<K, V>)qry, true);
 
             return query(qry, projection(true));
         }
@@ -1108,7 +1172,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            ctx.continuousQueries().registerCacheEntryListener(lsnrCfg, true);
+            ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
         }
         catch (IgniteCheckedException e) {
             throw cacheException(e);
@@ -1119,11 +1183,11 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) {
+    @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            ctx.continuousQueries().deregisterCacheEntryListener(lsnrCfg);
+            ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
         }
         catch (IgniteCheckedException e) {
             throw cacheException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 5f68a7a..407da34 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -19,11 +19,9 @@ package 
org.apache.ignite.internal.processors.cache.datastructures;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.query.continuous.*;
 import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.*;
@@ -33,6 +31,7 @@ import org.apache.ignite.resources.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -59,7 +58,7 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
     private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> 
queueHdrView;
 
     /** Query notifying about queue update. */
-    private GridCacheContinuousQueryAdapter queueQry;
+    private UUID queueQryId;
 
     /** Queue query creation guard. */
     private final AtomicBoolean queueQryGuard = new AtomicBoolean();
@@ -98,14 +97,8 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
     @Override protected void onKernalStop0(boolean cancel) {
         busyLock.block();
 
-        if (queueQry != null) {
-            try {
-                queueQry.close();
-            }
-            catch (IgniteCheckedException e) {
-                U.warn(log, "Failed to cancel queue header query.", e);
-            }
-        }
+        if (queueQryId != null)
+            cctx.continuousQueries().cancelInternalQuery(queueQryId);
 
         for (GridCacheQueueProxy q : queuesMap.values())
             q.delegate().onKernalStop();
@@ -188,52 +181,43 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
                 return null;
 
             if (queueQryGuard.compareAndSet(false, true)) {
-                queueQry = 
(GridCacheContinuousQueryAdapter)cctx.cache().queries().createContinuousQuery();
-
-                queueQry.remoteFilter(new QueueHeaderPredicate());
-
-                queueQry.localCallback(new IgniteBiPredicate<UUID, 
Collection<GridCacheContinuousQueryEntry>>() {
-                    @Override public boolean apply(UUID id, 
Collection<GridCacheContinuousQueryEntry> entries) {
-                        if (!busyLock.enterBusy())
-                            return false;
+                queueQryId = cctx.continuousQueries().executeInternalQuery(
+                    new CacheEntryUpdatedListener<K, V>() {
+                        @Override public void 
onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) {
+                            if (!busyLock.enterBusy())
+                                return;
 
-                        try {
-                            for (GridCacheContinuousQueryEntry e : entries) {
-                                GridCacheQueueHeaderKey key = 
(GridCacheQueueHeaderKey)e.getKey();
-                                GridCacheQueueHeader hdr = 
(GridCacheQueueHeader)e.getValue();
+                            try {
+                                for (CacheEntryEvent<?, ?> e : evts) {
+                                    GridCacheQueueHeaderKey key = 
(GridCacheQueueHeaderKey)e.getKey();
+                                    GridCacheQueueHeader hdr = 
(GridCacheQueueHeader)e.getValue();
 
-                                for (final GridCacheQueueProxy queue : 
queuesMap.values()) {
-                                    if (queue.name().equals(key.queueName())) {
-                                        if (hdr == null) {
-                                            GridCacheQueueHeader oldHdr = 
(GridCacheQueueHeader)e.getOldValue();
+                                    for (final GridCacheQueueProxy queue : 
queuesMap.values()) {
+                                        if 
(queue.name().equals(key.queueName())) {
+                                            if (hdr == null) {
+                                                GridCacheQueueHeader oldHdr = 
(GridCacheQueueHeader)e.getOldValue();
 
-                                            assert oldHdr != null;
+                                                assert oldHdr != null;
 
-                                            if 
(oldHdr.id().equals(queue.delegate().id())) {
-                                                
queue.delegate().onRemoved(false);
+                                                if 
(oldHdr.id().equals(queue.delegate().id())) {
+                                                    
queue.delegate().onRemoved(false);
 
-                                                
queuesMap.remove(queue.delegate().id());
+                                                    
queuesMap.remove(queue.delegate().id());
+                                                }
                                             }
+                                            else
+                                                
queue.delegate().onHeaderChanged(hdr);
                                         }
-                                        else
-                                            
queue.delegate().onHeaderChanged(hdr);
                                     }
                                 }
                             }
-
-                            return true;
-                        }
-                        finally {
-                            busyLock.leaveBusy();
+                            finally {
+                                busyLock.leaveBusy();
+                            }
                         }
-                    }
-                });
-
-                queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? 
cctx.grid().forLocal() : null,
-                    true,
-                    false,
-                    false,
-                    true);
+                    },
+                    new QueueHeaderPredicate(),
+                    cctx.isLocal() || cctx.isReplicated());
             }
 
             GridCacheQueueProxy queue = queuesMap.get(hdr.id());
@@ -544,7 +528,8 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
     /**
      * Predicate for queue continuous query.
      */
-    private static class QueueHeaderPredicate implements 
IgnitePredicate<CacheContinuousQueryEntry>, Externalizable {
+    private static class QueueHeaderPredicate<K, V> implements 
CacheEntryEventFilter<K, V>,
+        Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -556,7 +541,7 @@ public class CacheDataStructuresManager<K, V> extends 
GridCacheManagerAdapter<K,
         }
 
         /** {@inheritDoc} */
-        @Override public boolean apply(CacheContinuousQueryEntry e) {
+        @Override public boolean evaluate(CacheEntryEvent<? extends K, ? 
extends V> e) {
             return e.getKey() instanceof GridCacheQueueHeaderKey;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java
index c1aede1..3dcb82a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java
@@ -93,16 +93,6 @@ public interface CacheQueries<K, V> {
     public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable 
IgniteBiPredicate<K, V> filter);
 
     /**
-     * Creates new continuous query.
-     * <p>
-     * For more information refer to {@link CacheContinuousQuery} 
documentation.
-     *
-     * @return Created continuous query.
-     * @see CacheContinuousQuery
-     */
-    public CacheContinuousQuery<K, V> createContinuousQuery();
-
-    /**
      * Forces this cache to rebuild all search indexes of given value type. 
Sometimes indexes
      * may hold references to objects that have already been removed from 
cache. Although
      * not affecting query results, these objects may consume extra memory. 
Rebuilding

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
index 368dae7..5dbc043 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
@@ -182,11 +182,6 @@ public class GridCacheQueriesImpl<K, V> implements 
GridCacheQueriesEx<K, V>, Ext
     }
 
     /** {@inheritDoc} */
-    @Override public CacheContinuousQuery<K, V> createContinuousQuery() {
-        return ctx.continuousQueries().createQuery(prj == null ? null : 
prj.predicate());
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> rebuildIndexes(Class<?> cls) {
         A.notNull(cls, "cls");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java
index d7c3f4c..79eb978 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java
@@ -151,18 +151,6 @@ public class GridCacheQueriesProxy<K, V> implements 
GridCacheQueriesEx<K, V>, Ex
     }
 
     /** {@inheritDoc} */
-    @Override public CacheContinuousQuery<K, V> createContinuousQuery() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
-        try {
-            return delegate.createContinuousQuery();
-        }
-        finally {
-            gate.leave(prev);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public <R> CacheQuery<R> createSpiQuery() {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
new file mode 100644
index 0000000..aa4ce54
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+import static 
org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*;
+
+/**
+ * Continuous query entry.
+ */
+class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, 
Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Key. */
+    @GridToStringInclude
+    private K key;
+
+    /** New value. */
+    @GridToStringInclude
+    private V newVal;
+
+    /** Old value. */
+    @GridToStringInclude
+    private V oldVal;
+
+    /** Serialized key. */
+    @GridToStringExclude
+    private byte[] keyBytes;
+
+    /** Serialized value. */
+    @GridToStringExclude
+    private GridCacheValueBytes newValBytes;
+
+    /** Serialized value. */
+    @GridToStringExclude
+    private GridCacheValueBytes oldValBytes;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** Deployment info. */
+    @GridToStringExclude
+    private GridDeploymentInfo depInfo;
+
+    public CacheContinuousQueryEntry() {
+        // No-op.
+    }
+
+    CacheContinuousQueryEntry(K key, @Nullable V newVal, @Nullable 
GridCacheValueBytes newValBytes, @Nullable V oldVal,
+        @Nullable GridCacheValueBytes oldValBytes) {
+
+        this.key = key;
+        this.newVal = newVal;
+        this.newValBytes = newValBytes;
+        this.oldVal = oldVal;
+        this.oldValBytes = oldValBytes;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     */
+    void cacheName(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    /**
+     * @return cache name.
+     */
+    String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @throws IgniteCheckedException In case of error.
+     */
+    void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
+        assert marsh != null;
+
+        assert key != null;
+
+        keyBytes = marsh.marshal(key);
+
+        if (newValBytes == null || newValBytes.isNull())
+            newValBytes = newVal != null ?
+                newVal instanceof byte[] ? plain(newVal) : 
marshaled(marsh.marshal(newVal)) : null;
+
+        if (oldValBytes == null || oldValBytes.isNull())
+            oldValBytes = oldVal != null ?
+                oldVal instanceof byte[] ? plain(oldVal) : 
marshaled(marsh.marshal(oldVal)) : null;
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @param ldr Class loader.
+     * @throws IgniteCheckedException In case of error.
+     */
+    void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws 
IgniteCheckedException {
+        assert marsh != null;
+
+        assert key == null : "Key should be null: " + key;
+        assert newVal == null : "New value should be null: " + newVal;
+        assert oldVal == null : "Old value should be null: " + oldVal;
+        assert keyBytes != null;
+
+        key = marsh.unmarshal(keyBytes, ldr);
+
+        if (newValBytes != null && !newValBytes.isNull())
+            newVal = newValBytes.isPlain() ? (V)newValBytes.get() : 
marsh.<V>unmarshal(newValBytes.get(), ldr);
+
+        if (oldValBytes != null && !oldValBytes.isNull())
+            oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : 
marsh.<V>unmarshal(oldValBytes.get(), ldr);
+    }
+
+    /**
+     * @return Key.
+     */
+    K key() {
+        return key;
+    }
+
+    /**
+     * @return New value.
+     */
+    V value() {
+        return newVal;
+    }
+
+    /**
+     * @return Old value.
+     */
+    V oldValue() {
+        return oldVal;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepare(GridDeploymentInfo depInfo) {
+        this.depInfo = depInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDeploymentInfo deployInfo() {
+        return depInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        boolean b = keyBytes != null;
+
+        out.writeBoolean(b);
+
+        if (b) {
+            U.writeByteArray(out, keyBytes);
+
+            if (newValBytes != null && !newValBytes.isNull()) {
+                out.writeBoolean(true);
+                out.writeBoolean(newValBytes.isPlain());
+                U.writeByteArray(out, newValBytes.get());
+            }
+            else
+                out.writeBoolean(false);
+
+            if (oldValBytes != null && !oldValBytes.isNull()) {
+                out.writeBoolean(true);
+                out.writeBoolean(oldValBytes.isPlain());
+                U.writeByteArray(out, oldValBytes.get());
+            }
+            else
+                out.writeBoolean(false);
+
+            U.writeString(out, cacheName);
+            out.writeObject(depInfo);
+        }
+        else {
+            out.writeObject(key);
+            out.writeObject(newVal);
+            out.writeObject(oldVal);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        boolean b = in.readBoolean();
+
+        if (b) {
+            keyBytes = U.readByteArray(in);
+
+            if (in.readBoolean())
+                newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : 
marshaled(U.readByteArray(in));
+
+            if (in.readBoolean())
+                oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : 
marshaled(U.readByteArray(in));
+
+            cacheName = U.readString(in);
+            depInfo = (GridDeploymentInfo)in.readObject();
+        }
+        else {
+            key = (K)in.readObject();
+            newVal = (V)in.readObject();
+            oldVal = (V)in.readObject();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryEntry.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
new file mode 100644
index 0000000..c90ae34
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.cache.*;
+import javax.cache.event.*;
+
+/**
+ * Continuous query event.
+ */
+class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
+    /** Entry. */
+    @GridToStringExclude
+    private final CacheContinuousQueryEntry<K, V> e;
+
+    /**
+     * @param source Source cache.
+     * @param eventType Event type.
+     * @param e Entry.
+     */
+    CacheContinuousQueryEvent(Cache source, EventType eventType, 
CacheContinuousQueryEntry<K, V> e) {
+        super(source, eventType);
+
+        assert e != null;
+
+        this.e = e;
+    }
+
+    /**
+     * @return Entry.
+     */
+    CacheContinuousQueryEntry<K, V> entry() {
+        return e;
+    }
+
+    /** {@inheritDoc} */
+    @Override public K getKey() {
+        return e.key();
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getValue() {
+        return e.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getOldValue() {
+        return e.oldValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isOldValueAvailable() {
+        return e.oldValue() != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T unwrap(Class<T> cls) {
+        if(cls.isAssignableFrom(getClass()))
+            return cls.cast(this);
+
+        throw new IllegalArgumentException("Unwrapping to class is not 
supported: " + cls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryEvent.class, this, "key", 
e.key(), "newVal", e.value(), "oldVal",
+            e.oldValue(), "cacheName", e.cacheName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java
new file mode 100644
index 0000000..897f481
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+
+import javax.cache.event.*;
+
+/**
+ * Extended continuous query filter.
+ */
+public interface CacheContinuousQueryFilterEx<K, V> extends 
CacheEntryEventFilter<K, V> {
+    /**
+     * Callback for query unregister event.
+     */
+    public void onQueryUnregister();
+}

Reply via email to