Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-143 a94f9bad9 -> 91e4f1cb8


# IGNITE-143 - Example and JavaDoc


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/91e4f1cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/91e4f1cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/91e4f1cb

Branch: refs/heads/ignite-143
Commit: 91e4f1cb81630bc15dc0f0697a748534f209833a
Parents: a94f9ba
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Thu Feb 12 20:54:37 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Thu Feb 12 20:54:37 2015 -0800

----------------------------------------------------------------------
 .../datagrid/CacheContinuousQueryExample.java   |  20 ++-
 .../ignite/cache/query/ContinuousQuery.java     | 125 +++++++++++--------
 2 files changed, 87 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/91e4f1cb/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
index 26fd2d2..ec7a040 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
@@ -19,6 +19,7 @@ package org.apache.ignite.examples.datagrid;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
+import org.apache.ignite.lang.*;
 
 import javax.cache.*;
 import javax.cache.event.*;
@@ -54,17 +55,24 @@ public class CacheContinuousQueryExample {
 
             int keyCnt = 20;
 
+            // These entries will be queried by initial predicate.
             for (int i = 0; i < keyCnt; i++)
                 cache.put(i, Integer.toString(i));
 
             // Create new continuous query.
             ContinuousQuery<Integer, String> qry = Query.continuous();
 
+            qry.setInitialPredicate(Query.scan(new IgniteBiPredicate<Integer, 
String>() {
+                @Override public boolean apply(Integer key, String val) {
+                    return key > 10;
+                }
+            }));
+
             // Callback that is called locally when update notifications are 
received.
             qry.setLocalListener(new CacheEntryUpdatedListener<Integer, 
String>() {
                 @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends Integer, ? extends String>> evts) {
                     for (CacheEntryEvent<? extends Integer, ? extends String> 
e : evts)
-                        System.out.println("Queried entry [key=" + e.getKey() 
+ ", val=" + e.getValue() + ']');
+                        System.out.println("Updated entry [key=" + e.getKey() 
+ ", val=" + e.getValue() + ']');
                 }
             });
 
@@ -72,14 +80,18 @@ public class CacheContinuousQueryExample {
             // Entry that pass this filter will be sent to the caller.
             qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() {
                 @Override public boolean evaluate(CacheEntryEvent<? extends 
Integer, ? extends String> e) {
-                    return e.getKey() > 15;
+                    return e.getKey() > 25;
                 }
             });
 
             // Execute query.
-            try (QueryCursor<Cache.Entry<Integer, String>> ignored = 
cache.query(qry)) {
+            try (QueryCursor<Cache.Entry<Integer, String>> cur = 
cache.query(qry)) {
+                // Iterate through existing data.
+                for (Cache.Entry<Integer, String> e : cur)
+                    System.out.println("Queried existing entry [key=" + 
e.getKey() + ", val=" + e.getValue() + ']');
+
                 // Add a few more keys and watch more query notifications.
-                for (int i = keyCnt; i < keyCnt + 5; i++)
+                for (int i = keyCnt; i < keyCnt + 10; i++)
                     cache.put(i, Integer.toString(i));
 
                 // Wait for a while while callback is notified about remaining 
puts.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/91e4f1cb/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 8ed0365..35303ab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -17,36 +17,29 @@
 
 package org.apache.ignite.cache.query;
 
-import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.*;
 
 import javax.cache.event.*;
 
 /**
- * API for configuring and executing continuous cache queries.
+ * API for configuring continuous cache queries.
  * <p>
- * Continuous queries are executed as follows:
- * <ol>
- * <li>
- *  Query is sent to requested grid nodes. Note that for {@link 
org.apache.ignite.cache.CacheMode#LOCAL LOCAL}
- *  and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches 
query will be always executed
- *  locally.
- * </li>
- * <li>
- *  Each node iterates through existing cache data and registers listeners 
that will
- *  notify about further updates.
- * <li>
- *  Each key-value pair is passed through optional filter and if this filter 
returns
- *  true, key-value pair is sent to the master node (the one that executed 
query).
- *  If filter is not provided, all pairs are sent.
- * </li>
- * <li>
- *  When master node receives key-value pairs, it notifies the local callback.
- * </li>
- * </ol>
- * <h2 class="header">NOTE</h2>
- * Under some concurrent circumstances callback may get several notifications
- * for one cache update. This should be taken into account when implementing 
callback.
- * <h1 class="header">Query usage</h1>
+ * Continuous queries allow to register a remote filter and a local listener
+ * for cache updates. If an update event passes the filter, it will be sent to
+ * the node that executed the query and local listener will be notified.
+ * <p>
+ * Additionally, you can execute initial query to get currently existing data.
+ * Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link 
#setInitialPredicate(Query)}
+ * method.
+ * <p>
+ * Query can be executed either on all nodes in topology using {@link 
IgniteCache#query(Query)}
+ * method of only on the local node using {@link 
IgniteCache#localQuery(Query)} method.
+ * Note that in case query is distributed and a new node joins, it will get 
the remote
+ * filter for the query during discovery process before it actually joins 
topology,
+ * so no updates will be missed.
+ * <p>
+ * To create a new instance of continuous query use {@link Query#continuous()} 
factory method.
+ * <h1 class="header">Example</h1>
  * As an example, suppose we have cache with {@code 'Person'} objects and we 
need
  * to query all persons with salary above 1000.
  * <p>
@@ -66,13 +59,21 @@ import javax.cache.event.*;
  * You can create and execute continuous query like so:
  * <pre name="code" class="java">
  * // Create new continuous query.
- * qry = cache.createContinuousQuery();
+ * ContinuousQuery qry = Query.continuous();
+ *
+ * // Initial iteration query will return all persons with salary above 1000.
+ * qry.setInitialPredicate(Query.scan(new IgniteBiPredicate&lt;UUID, 
Person&gt;() {
+ *     &#64;Override public boolean apply(UUID id, Person p) {
+ *         return p.getSalary() &gt; 1000;
+ *     }
+ * }));
+ *
  *
  * // Callback that is called locally when update notifications are received.
  * // It simply prints out information about all created persons.
- * qry.callback(new GridPredicate2&lt;UUID, Collection&lt;Map.Entry&lt;UUID, 
Person&gt;&gt;&gt;() {
- *     &#64;Override public boolean apply(UUID uuid, 
Collection&lt;Map.Entry&lt;UUID, Person&gt;&gt; entries) {
- *         for (Map.Entry&lt;UUID, Person&gt; e : entries) {
+ * qry.setLocalListener(new CacheEntryUpdatedListener&lt;UUID, Person&gt;() {
+ *     &#64;Override public void onUpdated(Iterable&lt;CacheEntryEvent&lt;? 
extends UUID, ? extends Person&gt;&gt; evts) {
+ *         for (CacheEntryEvent&lt;? extends UUID, ? extends Person&gt; e : 
evts) {
  *             Person p = e.getValue();
  *
  *             X.println("&gt;&gt;&gt;");
@@ -80,31 +81,29 @@ import javax.cache.event.*;
  *                 "'s salary is " + p.getSalary());
  *             X.println("&gt;&gt;&gt;");
  *         }
- *
- *         return true;
  *     }
  * });
  *
- * // This query will return persons with salary above 1000.
- * qry.filter(new GridPredicate2&lt;UUID, Person&gt;() {
- *     &#64;Override public boolean apply(UUID uuid, Person person) {
- *         return person.getSalary() &gt; 1000;
+ * // Continuous listener will be notified for persons with salary above 1000.
+ * qry.setRemoteFilter(new CacheEntryEventFilter&lt;UUID, Person&gt;() {
+ *     &#64;Override public boolean evaluate(CacheEntryEvent&lt;? extends 
UUID, ? extends Person&gt; e) {
+ *         return e.getValue().getSalary() &gt; 1000;
  *     }
  * });
  *
- * // Execute query.
- * qry.execute();
+ * // Execute query and get cursor that iterates through initial data.
+ * QueryCursor&lt;Cache.Entry&lt;UUID, Person&gt;&gt; cur = cache.query(qry);
  * </pre>
- * This will execute query on all nodes that have cache you are working with 
and notify callback
- * with both data that already exists in cache and further updates.
+ * This will execute query on all nodes that have cache you are working with 
and
+ * listener will start to receive notifications for cache updates.
  * <p>
- * To stop receiving updates call {@link #close()} method:
+ * To stop receiving updates call {@link QueryCursor#close()} method:
  * <pre name="code" class="java">
- * qry.cancel();
+ * cur.close();
  * </pre>
- * Note that one query instance can be executed only once. After it's 
cancelled, it's non-operational.
- * If you need to repeat execution, use {@link 
org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()}
 method to create
- * new query.
+ * Note that this works even if you didn't provide initial query. Cursor will
+ * be empty in this case, but it will still unregister listeners when {@link 
QueryCursor#close()}
+ * is called.
  */
 public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> {
     /** */
@@ -145,6 +144,10 @@ public final class ContinuousQuery<K, V> extends 
Query<ContinuousQuery<K,V>> {
 
     /**
      * Sets initial query.
+     * <p>
+     * This query will be executed before continuous listener is registered
+     * which allows to iterate through entries which already existed at the
+     * time continuous query is executed.
      *
      * @param initFilter Initial query.
      */
@@ -163,7 +166,8 @@ public final class ContinuousQuery<K, V> extends 
Query<ContinuousQuery<K,V>> {
 
     /**
      * Sets local callback. This callback is called only in local node when 
new updates are received.
-     * <p> The callback predicate accepts ID of the node from where updates 
are received and collection
+     * <p>
+     * The callback predicate accepts ID of the node from where updates are 
received and collection
      * of received entries. Note that for removed entries value will be {@code 
null}.
      * <p>
      * If the predicate returns {@code false}, query execution will be 
cancelled.
@@ -210,15 +214,20 @@ public final class ContinuousQuery<K, V> extends 
Query<ContinuousQuery<K,V>> {
     }
 
     /**
-     * Sets buffer size. <p> When a cache update happens, entry is first put 
into a buffer. Entries from buffer will be
+     * Sets buffer size.
+     * <p>
+     * When a cache update happens, entry is first put into a buffer. Entries 
from buffer will be
      * sent to the master node only if the buffer is full or time provided via 
{@link #setTimeInterval(long)} method is
-     * exceeded. <p> Default buffer size is {@code 1} which means that entries 
will be sent immediately (buffering is
+     * exceeded.
+     * <p>
+     * Default buffer size is {@code 1} which means that entries will be sent 
immediately (buffering is
      * disabled).
      *
      * @param bufSize Buffer size.
      */
     public void setBufferSize(int bufSize) {
-        A.ensure(bufSize > 0, "bufSize > 0");
+        if (bufSize <= 0)
+            throw new IllegalArgumentException("Buffer size must be above 
zero.");
 
         this.bufSize = bufSize;
     }
@@ -233,15 +242,20 @@ public final class ContinuousQuery<K, V> extends 
Query<ContinuousQuery<K,V>> {
     }
 
     /**
-     * Sets time interval. <p> When a cache update happens, entry is first put 
into a buffer. Entries from buffer will
+     * Sets time interval.
+     * <p>
+     * When a cache update happens, entry is first put into a buffer. Entries 
from buffer will
      * be sent to the master node only if the buffer is full (its size can be 
provided via {@link #setBufferSize(int)}
-     * method) or time provided via this method is exceeded. <p> Default time 
interval is {@code 0} which means that
+     * method) or time provided via this method is exceeded.
+     * <p>
+     * Default time interval is {@code 0} which means that
      * time check is disabled and entries will be sent only when buffer is 
full.
      *
      * @param timeInterval Time interval.
      */
     public void setTimeInterval(long timeInterval) {
-        A.ensure(timeInterval >= 0, "timeInterval >= 0");
+        if (timeInterval < 0)
+            throw new IllegalArgumentException("Time interval can't be 
negative.");
 
         this.timeInterval = timeInterval;
     }
@@ -256,10 +270,14 @@ public final class ContinuousQuery<K, V> extends 
Query<ContinuousQuery<K,V>> {
     }
 
     /**
-     * Sets automatic unsubscribe flag. <p> This flag indicates that query 
filters on remote nodes should be
+     * Sets automatic unsubscribe flag.
+     * <p>
+     * This flag indicates that query filters on remote nodes should be
      * automatically unregistered if master node (node that initiated the 
query) leaves topology. If this flag is
      * {@code false}, filters will be unregistered only when the query is 
cancelled from master node, and won't ever be
-     * unregistered if master node leaves grid. <p> Default value for this 
flag is {@code true}.
+     * unregistered if master node leaves grid.
+     * <p>
+     * Default value for this flag is {@code true}.
      *
      * @param autoUnsubscribe Automatic unsubscription flag.
      */
@@ -275,5 +293,4 @@ public final class ContinuousQuery<K, V> extends 
Query<ContinuousQuery<K,V>> {
     public boolean isAutoUnsubscribe() {
         return autoUnsubscribe;
     }
-
 }

Reply via email to