Repository: incubator-ignite Updated Branches: refs/heads/ignite-143 a94f9bad9 -> 91e4f1cb8
# IGNITE-143 - Example and JavaDoc Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/91e4f1cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/91e4f1cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/91e4f1cb Branch: refs/heads/ignite-143 Commit: 91e4f1cb81630bc15dc0f0697a748534f209833a Parents: a94f9ba Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu Feb 12 20:54:37 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu Feb 12 20:54:37 2015 -0800 ---------------------------------------------------------------------- .../datagrid/CacheContinuousQueryExample.java | 20 ++- .../ignite/cache/query/ContinuousQuery.java | 125 +++++++++++-------- 2 files changed, 87 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/91e4f1cb/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java index 26fd2d2..ec7a040 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java @@ -19,6 +19,7 @@ package org.apache.ignite.examples.datagrid; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; +import org.apache.ignite.lang.*; import javax.cache.*; import javax.cache.event.*; @@ -54,17 +55,24 @@ public class CacheContinuousQueryExample { int keyCnt = 20; + // These entries will be queried by initial predicate. for (int i = 0; i < keyCnt; i++) cache.put(i, Integer.toString(i)); // Create new continuous query. ContinuousQuery<Integer, String> qry = Query.continuous(); + qry.setInitialPredicate(Query.scan(new IgniteBiPredicate<Integer, String>() { + @Override public boolean apply(Integer key, String val) { + return key > 10; + } + })); + // Callback that is called locally when update notifications are received. qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) { for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) - System.out.println("Queried entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); } }); @@ -72,14 +80,18 @@ public class CacheContinuousQueryExample { // Entry that pass this filter will be sent to the caller. qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() { @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) { - return e.getKey() > 15; + return e.getKey() > 25; } }); // Execute query. - try (QueryCursor<Cache.Entry<Integer, String>> ignored = cache.query(qry)) { + try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) { + // Iterate through existing data. + for (Cache.Entry<Integer, String> e : cur) + System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + // Add a few more keys and watch more query notifications. - for (int i = keyCnt; i < keyCnt + 5; i++) + for (int i = keyCnt; i < keyCnt + 10; i++) cache.put(i, Integer.toString(i)); // Wait for a while while callback is notified about remaining puts. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/91e4f1cb/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index 8ed0365..35303ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -17,36 +17,29 @@ package org.apache.ignite.cache.query; -import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.*; import javax.cache.event.*; /** - * API for configuring and executing continuous cache queries. + * API for configuring continuous cache queries. * <p> - * Continuous queries are executed as follows: - * <ol> - * <li> - * Query is sent to requested grid nodes. Note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} - * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches query will be always executed - * locally. - * </li> - * <li> - * Each node iterates through existing cache data and registers listeners that will - * notify about further updates. - * <li> - * Each key-value pair is passed through optional filter and if this filter returns - * true, key-value pair is sent to the master node (the one that executed query). - * If filter is not provided, all pairs are sent. - * </li> - * <li> - * When master node receives key-value pairs, it notifies the local callback. - * </li> - * </ol> - * <h2 class="header">NOTE</h2> - * Under some concurrent circumstances callback may get several notifications - * for one cache update. This should be taken into account when implementing callback. - * <h1 class="header">Query usage</h1> + * Continuous queries allow to register a remote filter and a local listener + * for cache updates. If an update event passes the filter, it will be sent to + * the node that executed the query and local listener will be notified. + * <p> + * Additionally, you can execute initial query to get currently existing data. + * Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialPredicate(Query)} + * method. + * <p> + * Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)} + * method of only on the local node using {@link IgniteCache#localQuery(Query)} method. + * Note that in case query is distributed and a new node joins, it will get the remote + * filter for the query during discovery process before it actually joins topology, + * so no updates will be missed. + * <p> + * To create a new instance of continuous query use {@link Query#continuous()} factory method. + * <h1 class="header">Example</h1> * As an example, suppose we have cache with {@code 'Person'} objects and we need * to query all persons with salary above 1000. * <p> @@ -66,13 +59,21 @@ import javax.cache.event.*; * You can create and execute continuous query like so: * <pre name="code" class="java"> * // Create new continuous query. - * qry = cache.createContinuousQuery(); + * ContinuousQuery qry = Query.continuous(); + * + * // Initial iteration query will return all persons with salary above 1000. + * qry.setInitialPredicate(Query.scan(new IgniteBiPredicate<UUID, Person>() { + * @Override public boolean apply(UUID id, Person p) { + * return p.getSalary() > 1000; + * } + * })); + * * * // Callback that is called locally when update notifications are received. * // It simply prints out information about all created persons. - * qry.callback(new GridPredicate2<UUID, Collection<Map.Entry<UUID, Person>>>() { - * @Override public boolean apply(UUID uuid, Collection<Map.Entry<UUID, Person>> entries) { - * for (Map.Entry<UUID, Person> e : entries) { + * qry.setLocalListener(new CacheEntryUpdatedListener<UUID, Person>() { + * @Override public void onUpdated(Iterable<CacheEntryEvent<? extends UUID, ? extends Person>> evts) { + * for (CacheEntryEvent<? extends UUID, ? extends Person> e : evts) { * Person p = e.getValue(); * * X.println(">>>"); @@ -80,31 +81,29 @@ import javax.cache.event.*; * "'s salary is " + p.getSalary()); * X.println(">>>"); * } - * - * return true; * } * }); * - * // This query will return persons with salary above 1000. - * qry.filter(new GridPredicate2<UUID, Person>() { - * @Override public boolean apply(UUID uuid, Person person) { - * return person.getSalary() > 1000; + * // Continuous listener will be notified for persons with salary above 1000. + * qry.setRemoteFilter(new CacheEntryEventFilter<UUID, Person>() { + * @Override public boolean evaluate(CacheEntryEvent<? extends UUID, ? extends Person> e) { + * return e.getValue().getSalary() > 1000; * } * }); * - * // Execute query. - * qry.execute(); + * // Execute query and get cursor that iterates through initial data. + * QueryCursor<Cache.Entry<UUID, Person>> cur = cache.query(qry); * </pre> - * This will execute query on all nodes that have cache you are working with and notify callback - * with both data that already exists in cache and further updates. + * This will execute query on all nodes that have cache you are working with and + * listener will start to receive notifications for cache updates. * <p> - * To stop receiving updates call {@link #close()} method: + * To stop receiving updates call {@link QueryCursor#close()} method: * <pre name="code" class="java"> - * qry.cancel(); + * cur.close(); * </pre> - * Note that one query instance can be executed only once. After it's cancelled, it's non-operational. - * If you need to repeat execution, use {@link org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create - * new query. + * Note that this works even if you didn't provide initial query. Cursor will + * be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()} + * is called. */ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> { /** */ @@ -145,6 +144,10 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> { /** * Sets initial query. + * <p> + * This query will be executed before continuous listener is registered + * which allows to iterate through entries which already existed at the + * time continuous query is executed. * * @param initFilter Initial query. */ @@ -163,7 +166,8 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> { /** * Sets local callback. This callback is called only in local node when new updates are received. - * <p> The callback predicate accepts ID of the node from where updates are received and collection + * <p> + * The callback predicate accepts ID of the node from where updates are received and collection * of received entries. Note that for removed entries value will be {@code null}. * <p> * If the predicate returns {@code false}, query execution will be cancelled. @@ -210,15 +214,20 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> { } /** - * Sets buffer size. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will be + * Sets buffer size. + * <p> + * When a cache update happens, entry is first put into a buffer. Entries from buffer will be * sent to the master node only if the buffer is full or time provided via {@link #setTimeInterval(long)} method is - * exceeded. <p> Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is + * exceeded. + * <p> + * Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is * disabled). * * @param bufSize Buffer size. */ public void setBufferSize(int bufSize) { - A.ensure(bufSize > 0, "bufSize > 0"); + if (bufSize <= 0) + throw new IllegalArgumentException("Buffer size must be above zero."); this.bufSize = bufSize; } @@ -233,15 +242,20 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> { } /** - * Sets time interval. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will + * Sets time interval. + * <p> + * When a cache update happens, entry is first put into a buffer. Entries from buffer will * be sent to the master node only if the buffer is full (its size can be provided via {@link #setBufferSize(int)} - * method) or time provided via this method is exceeded. <p> Default time interval is {@code 0} which means that + * method) or time provided via this method is exceeded. + * <p> + * Default time interval is {@code 0} which means that * time check is disabled and entries will be sent only when buffer is full. * * @param timeInterval Time interval. */ public void setTimeInterval(long timeInterval) { - A.ensure(timeInterval >= 0, "timeInterval >= 0"); + if (timeInterval < 0) + throw new IllegalArgumentException("Time interval can't be negative."); this.timeInterval = timeInterval; } @@ -256,10 +270,14 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> { } /** - * Sets automatic unsubscribe flag. <p> This flag indicates that query filters on remote nodes should be + * Sets automatic unsubscribe flag. + * <p> + * This flag indicates that query filters on remote nodes should be * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is * {@code false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be - * unregistered if master node leaves grid. <p> Default value for this flag is {@code true}. + * unregistered if master node leaves grid. + * <p> + * Default value for this flag is {@code true}. * * @param autoUnsubscribe Automatic unsubscription flag. */ @@ -275,5 +293,4 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> { public boolean isAutoUnsubscribe() { return autoUnsubscribe; } - }