IGNITE-143 - Continuous queries refactoring (manual merge)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4f649be2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4f649be2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4f649be2 Branch: refs/heads/ignite-nio Commit: 4f649be29a298c3950cd09b641fba450d7fd0241 Parents: 3b8f9a6 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Fri Feb 13 16:47:03 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Fri Feb 13 16:47:03 2015 -0800 ---------------------------------------------------------------------- .../datagrid/CacheContinuousQueryExample.java | 69 +- .../cache/query/CacheContinuousQuery.java | 285 ------ .../cache/query/CacheContinuousQueryEntry.java | 49 - .../ignite/cache/query/ContinuousQuery.java | 234 +++-- .../org/apache/ignite/cache/query/Query.java | 9 + .../ignite/events/CacheQueryExecutedEvent.java | 8 +- .../ignite/events/CacheQueryReadEvent.java | 8 +- .../processors/cache/CacheEntryEvent.java | 78 -- .../processors/cache/GridCacheContext.java | 6 +- .../processors/cache/GridCacheMapEntry.java | 26 +- .../processors/cache/GridCacheProcessor.java | 8 +- .../processors/cache/GridCacheProjectionEx.java | 5 + .../processors/cache/IgniteCacheProxy.java | 72 +- .../CacheDataStructuresManager.java | 81 +- .../processors/cache/query/CacheQueries.java | 10 - .../cache/query/GridCacheQueriesImpl.java | 5 - .../cache/query/GridCacheQueriesProxy.java | 12 - .../continuous/CacheContinuousQueryEntry.java | 234 +++++ .../continuous/CacheContinuousQueryEvent.java | 87 ++ .../CacheContinuousQueryFilterEx.java | 31 + .../continuous/CacheContinuousQueryHandler.java | 490 ++++++++++ .../CacheContinuousQueryListener.java | 47 + .../continuous/CacheContinuousQueryManager.java | 664 ++++++++++++++ .../GridCacheContinuousQueryAdapter.java | 319 ------- .../GridCacheContinuousQueryEntry.java | 344 ------- .../GridCacheContinuousQueryFilterEx.java | 33 - .../GridCacheContinuousQueryHandler.java | 571 ------------ .../GridCacheContinuousQueryListener.java | 41 - .../GridCacheContinuousQueryManager.java | 784 ---------------- .../processors/hadoop/GridHadoopJobId.java | 3 +- .../service/GridServiceProcessor.java | 75 +- .../optimized/optimized-classnames.properties | 4 +- ...ridCacheContinuousQueryAbstractSelfTest.java | 888 ++++--------------- ...dCacheContinuousQueryReplicatedSelfTest.java | 95 +- .../GridContinuousOperationsLoadTest.java | 54 +- .../loadtests/hashmap/GridCacheTestContext.java | 2 +- .../hadoop/jobtracker/GridHadoopJobTracker.java | 31 +- 37 files changed, 2181 insertions(+), 3581 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java index ce05988..ec7a040 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java @@ -18,11 +18,11 @@ package org.apache.ignite.examples.datagrid; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.lang.*; -import java.util.*; +import javax.cache.*; +import javax.cache.event.*; /** * This examples demonstrates continuous query API. @@ -48,46 +48,51 @@ public class CacheContinuousQueryExample { System.out.println(); System.out.println(">>> Cache continuous query example started."); - GridCache<Integer, String> cache = ignite.cache(CACHE_NAME); + IgniteCache<Integer, String> cache = ignite.jcache(CACHE_NAME); // Clean up caches on all nodes before run. - cache.clear(0); + cache.clear(); int keyCnt = 20; + // These entries will be queried by initial predicate. for (int i = 0; i < keyCnt; i++) - cache.putx(i, Integer.toString(i)); + cache.put(i, Integer.toString(i)); // Create new continuous query. - try (CacheContinuousQuery<Integer, String> qry = cache.queries().createContinuousQuery()) { - // Callback that is called locally when update notifications are received. - qry.localCallback( - new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, String>>>() { - @Override public boolean apply( - UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, String>> entries - ) { - for (CacheContinuousQueryEntry<Integer, String> e : entries) - System.out.println("Queried entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); - - return true; // Return true to continue listening. - } - }); - - // This filter will be evaluated remotely on all nodes - // Entry that pass this filter will be sent to the caller. - qry.remoteFilter(new IgnitePredicate<CacheContinuousQueryEntry<Integer, String>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, String> e) { - return e.getKey() > 15; - } - }); - - // Execute query. - qry.execute(); + ContinuousQuery<Integer, String> qry = Query.continuous(); + + qry.setInitialPredicate(Query.scan(new IgniteBiPredicate<Integer, String>() { + @Override public boolean apply(Integer key, String val) { + return key > 10; + } + })); + + // Callback that is called locally when update notifications are received. + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) + System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + } + }); + + // This filter will be evaluated remotely on all nodes. + // Entry that pass this filter will be sent to the caller. + qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) { + return e.getKey() > 25; + } + }); + + // Execute query. + try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) { + // Iterate through existing data. + for (Cache.Entry<Integer, String> e : cur) + System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); // Add a few more keys and watch more query notifications. - for (int i = keyCnt; i < keyCnt + 5; i++) - cache.putx(i, Integer.toString(i)); + for (int i = keyCnt; i < keyCnt + 10; i++) + cache.put(i, Integer.toString(i)); // Wait for a while while callback is notified about remaining puts. Thread.sleep(2000); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java deleted file mode 100644 index eaac9b8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.query; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * API for configuring and executing continuous cache queries. - * <p> - * Continuous queries are executed as follows: - * <ol> - * <li> - * Query is sent to requested grid nodes. Note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} - * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches query will be always executed - * locally. - * </li> - * <li> - * Each node iterates through existing cache data and registers listeners that will - * notify about further updates. - * <li> - * Each key-value pair is passed through optional filter and if this filter returns - * true, key-value pair is sent to the master node (the one that executed query). - * If filter is not provided, all pairs are sent. - * </li> - * <li> - * When master node receives key-value pairs, it notifies the local callback. - * </li> - * </ol> - * <h2 class="header">NOTE</h2> - * Under some concurrent circumstances callback may get several notifications - * for one cache update. This should be taken into account when implementing callback. - * <h1 class="header">Query usage</h1> - * As an example, suppose we have cache with {@code 'Person'} objects and we need - * to query all persons with salary above 1000. - * <p> - * Here is the {@code Person} class: - * <pre name="code" class="java"> - * public class Person { - * // Name. - * private String name; - * - * // Salary. - * private double salary; - * - * ... - * } - * </pre> - * <p> - * You can create and execute continuous query like so: - * <pre name="code" class="java"> - * // Create new continuous query. - * qry = cache.createContinuousQuery(); - * - * // Callback that is called locally when update notifications are received. - * // It simply prints out information about all created persons. - * qry.callback(new GridPredicate2<UUID, Collection<Map.Entry<UUID, Person>>>() { - * @Override public boolean apply(UUID uuid, Collection<Map.Entry<UUID, Person>> entries) { - * for (Map.Entry<UUID, Person> e : entries) { - * Person p = e.getValue(); - * - * X.println(">>>"); - * X.println(">>> " + p.getFirstName() + " " + p.getLastName() + - * "'s salary is " + p.getSalary()); - * X.println(">>>"); - * } - * - * return true; - * } - * }); - * - * // This query will return persons with salary above 1000. - * qry.filter(new GridPredicate2<UUID, Person>() { - * @Override public boolean apply(UUID uuid, Person person) { - * return person.getSalary() > 1000; - * } - * }); - * - * // Execute query. - * qry.execute(); - * </pre> - * This will execute query on all nodes that have cache you are working with and notify callback - * with both data that already exists in cache and further updates. - * <p> - * To stop receiving updates call {@link #close()} method: - * <pre name="code" class="java"> - * qry.cancel(); - * </pre> - * Note that one query instance can be executed only once. After it's cancelled, it's non-operational. - * If you need to repeat execution, use {@link CacheQueries#createContinuousQuery()} method to create - * new query. - */ -public interface CacheContinuousQuery<K, V> extends AutoCloseable { - /** - * Default buffer size. Size of {@code 1} means that all entries - * will be sent to master node immediately (buffering is disabled). - */ - public static final int DFLT_BUF_SIZE = 1; - - /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */ - public static final long DFLT_TIME_INTERVAL = 0; - - /** - * Default value for automatic unsubscription flag. Remote filters - * will be unregistered by default if master node leaves topology. - */ - public static final boolean DFLT_AUTO_UNSUBSCRIBE = true; - - /** - * Sets local callback. This callback is called only - * in local node when new updates are received. - * <p> - * The callback predicate accepts ID of the node from where updates - * are received and collection of received entries. Note that - * for removed entries value will be {@code null}. - * <p> - * If the predicate returns {@code false}, query execution will - * be cancelled. - * <p> - * <b>WARNING:</b> all operations that involve any kind of JVM-local - * or distributed locking (e.g., synchronization or transactional - * cache operations), should be executed asynchronously without - * blocking the thread that called the callback. Otherwise, you - * can get deadlocks. - * - * @param locCb Local callback. - */ - public void localCallback(IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb); - - /** - * Gets local callback. See {@link #localCallback(IgniteBiPredicate)} for more information. - * - * @return Local callback. - */ - @Nullable public IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> localCallback(); - - /** - * Sets optional key-value filter. This filter is called before - * entry is sent to the master node. - * <p> - * <b>WARNING:</b> all operations that involve any kind of JVM-local - * or distributed locking (e.g., synchronization or transactional - * cache operations), should be executed asynchronously without - * blocking the thread that called the filter. Otherwise, you - * can get deadlocks. - * - * @param filter Key-value filter. - */ - public void remoteFilter(@Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter); - - /** - * Gets key-value filter. See {@link #remoteFilter(IgnitePredicate)} for more information. - * - * @return Key-value filter. - */ - @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> remoteFilter(); - - /** - * Sets buffer size. - * <p> - * When a cache update happens, entry is first put into a buffer. - * Entries from buffer will be sent to the master node only if - * the buffer is full or time provided via {@link #timeInterval(long)} - * method is exceeded. - * <p> - * Default buffer size is {@code 1} which means that entries will - * be sent immediately (buffering is disabled). - * - * @param bufSize Buffer size. - */ - public void bufferSize(int bufSize); - - /** - * Gets buffer size. See {@link #bufferSize(int)} for more information. - * - * @return Buffer size. - */ - public int bufferSize(); - - /** - * Sets time interval. - * <p> - * When a cache update happens, entry is first put into a buffer. - * Entries from buffer will be sent to the master node only if - * the buffer is full (its size can be provided via {@link #bufferSize(int)} - * method) or time provided via this method is exceeded. - * <p> - * Default time interval is {@code 0} which means that time check is - * disabled and entries will be sent only when buffer is full. - * - * @param timeInterval Time interval. - */ - public void timeInterval(long timeInterval); - - /** - * Gets time interval. See {@link #timeInterval(long)} for more information. - * - * @return Gets time interval. - */ - public long timeInterval(); - - /** - * Sets automatic unsubscribe flag. - * <p> - * This flag indicates that query filters on remote nodes should be automatically - * unregistered if master node (node that initiated the query) leaves topology. - * If this flag is {@code false}, filters will be unregistered only when - * the query is cancelled from master node, and won't ever be unregistered if - * master node leaves grid. - * <p> - * Default value for this flag is {@code true}. - * - * @param autoUnsubscribe Automatic unsubscription flag. - */ - public void autoUnsubscribe(boolean autoUnsubscribe); - - /** - * Gets automatic unsubscribe flag. See {@link #autoUnsubscribe(boolean)} - * for more information. - * - * @return Automatic unsubscribe flag. - */ - public boolean isAutoUnsubscribe(); - - /** - * Starts continuous query execution on the whole grid. - * <p> - * Note that if grid contains nodes without appropriate cache, - * these nodes will be filtered out. - * <p> - * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} - * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches - * query will be always executed locally. - * - * @throws IgniteCheckedException In case of error. - */ - public void execute() throws IgniteCheckedException; - - /** - * Starts continuous query execution on provided set of nodes. - * <p> - * Note that if provided projection contains nodes without - * appropriate cache, these nodes will be filtered out. - * <p> - * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} - * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches - * query will be always executed locally. - * - * @param prj Grid projection. - * @throws IgniteCheckedException In case of error. - */ - public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException; - - /** - * Stops continuous query execution. - * <p> - * Note that one query instance can be executed only once. - * After it's cancelled, it's non-operational. - * If you need to repeat execution, use {@link CacheQueries#createContinuousQuery()} - * method to create new query. - * - * @throws IgniteCheckedException In case of error. - */ - @Override public void close() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java deleted file mode 100644 index 90d3602..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.query; - -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Entry used for continuous query notifications. - */ -public interface CacheContinuousQueryEntry<K, V> extends Map.Entry<K, V>, Serializable { - /** - * Gets entry key. - * - * @return Entry key. - */ - @Override public K getKey(); - - /** - * Gets entry new value. New value may be null, if entry is being removed. - * - * @return Entry new value. - */ - @Override @Nullable public V getValue(); - - /** - * Gets entry old value. Old value may be null if entry is being inserted (not updated). - * - * @return Gets entry old value. - */ - @Nullable public V getOldValue(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index b02c65f..35303ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -22,31 +22,24 @@ import org.apache.ignite.*; import javax.cache.event.*; /** - * API for configuring and executing continuous cache queries. + * API for configuring continuous cache queries. * <p> - * Continuous queries are executed as follows: - * <ol> - * <li> - * Query is sent to requested grid nodes. Note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} - * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches query will be always executed - * locally. - * </li> - * <li> - * Each node iterates through existing cache data and registers listeners that will - * notify about further updates. - * <li> - * Each key-value pair is passed through optional filter and if this filter returns - * true, key-value pair is sent to the master node (the one that executed query). - * If filter is not provided, all pairs are sent. - * </li> - * <li> - * When master node receives key-value pairs, it notifies the local callback. - * </li> - * </ol> - * <h2 class="header">NOTE</h2> - * Under some concurrent circumstances callback may get several notifications - * for one cache update. This should be taken into account when implementing callback. - * <h1 class="header">Query usage</h1> + * Continuous queries allow to register a remote filter and a local listener + * for cache updates. If an update event passes the filter, it will be sent to + * the node that executed the query and local listener will be notified. + * <p> + * Additionally, you can execute initial query to get currently existing data. + * Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialPredicate(Query)} + * method. + * <p> + * Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)} + * method of only on the local node using {@link IgniteCache#localQuery(Query)} method. + * Note that in case query is distributed and a new node joins, it will get the remote + * filter for the query during discovery process before it actually joins topology, + * so no updates will be missed. + * <p> + * To create a new instance of continuous query use {@link Query#continuous()} factory method. + * <h1 class="header">Example</h1> * As an example, suppose we have cache with {@code 'Person'} objects and we need * to query all persons with salary above 1000. * <p> @@ -66,13 +59,21 @@ import javax.cache.event.*; * You can create and execute continuous query like so: * <pre name="code" class="java"> * // Create new continuous query. - * qry = cache.createContinuousQuery(); + * ContinuousQuery qry = Query.continuous(); + * + * // Initial iteration query will return all persons with salary above 1000. + * qry.setInitialPredicate(Query.scan(new IgniteBiPredicate<UUID, Person>() { + * @Override public boolean apply(UUID id, Person p) { + * return p.getSalary() > 1000; + * } + * })); + * * * // Callback that is called locally when update notifications are received. * // It simply prints out information about all created persons. - * qry.callback(new GridPredicate2<UUID, Collection<Map.Entry<UUID, Person>>>() { - * @Override public boolean apply(UUID uuid, Collection<Map.Entry<UUID, Person>> entries) { - * for (Map.Entry<UUID, Person> e : entries) { + * qry.setLocalListener(new CacheEntryUpdatedListener<UUID, Person>() { + * @Override public void onUpdated(Iterable<CacheEntryEvent<? extends UUID, ? extends Person>> evts) { + * for (CacheEntryEvent<? extends UUID, ? extends Person> e : evts) { * Person p = e.getValue(); * * X.println(">>>"); @@ -80,33 +81,31 @@ import javax.cache.event.*; * "'s salary is " + p.getSalary()); * X.println(">>>"); * } - * - * return true; * } * }); * - * // This query will return persons with salary above 1000. - * qry.filter(new GridPredicate2<UUID, Person>() { - * @Override public boolean apply(UUID uuid, Person person) { - * return person.getSalary() > 1000; + * // Continuous listener will be notified for persons with salary above 1000. + * qry.setRemoteFilter(new CacheEntryEventFilter<UUID, Person>() { + * @Override public boolean evaluate(CacheEntryEvent<? extends UUID, ? extends Person> e) { + * return e.getValue().getSalary() > 1000; * } * }); * - * // Execute query. - * qry.execute(); + * // Execute query and get cursor that iterates through initial data. + * QueryCursor<Cache.Entry<UUID, Person>> cur = cache.query(qry); * </pre> - * This will execute query on all nodes that have cache you are working with and notify callback - * with both data that already exists in cache and further updates. + * This will execute query on all nodes that have cache you are working with and + * listener will start to receive notifications for cache updates. * <p> - * To stop receiving updates call {@link #close()} method: + * To stop receiving updates call {@link QueryCursor#close()} method: * <pre name="code" class="java"> - * qry.cancel(); + * cur.close(); * </pre> - * Note that one query instance can be executed only once. After it's cancelled, it's non-operational. - * If you need to repeat execution, use {@link org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create - * new query. + * Note that this works even if you didn't provide initial query. Cursor will + * be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()} + * is called. */ -public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> implements AutoCloseable { +public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> { /** */ private static final long serialVersionUID = 0L; @@ -125,13 +124,50 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> imp */ public static final boolean DFLT_AUTO_UNSUBSCRIBE = true; - public void setInitialPredicate(Query filter) { - // TODO: implement. + /** Initial filter. */ + private Query initFilter; + + /** Local listener. */ + private CacheEntryUpdatedListener<K, V> locLsnr; + + /** Remote filter. */ + private CacheEntryEventFilter<K, V> rmtFilter; + + /** Buffer size. */ + private int bufSize = DFLT_BUF_SIZE; + + /** Time interval. */ + private long timeInterval = DFLT_TIME_INTERVAL; + + /** Automatic unsubscription flag. */ + private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; + + /** + * Sets initial query. + * <p> + * This query will be executed before continuous listener is registered + * which allows to iterate through entries which already existed at the + * time continuous query is executed. + * + * @param initFilter Initial query. + */ + public void setInitialPredicate(Query initFilter) { + this.initFilter = initFilter; + } + + /** + * Gets initial query. + * + * @return Initial query. + */ + public Query getInitialPredicate() { + return initFilter; } /** * Sets local callback. This callback is called only in local node when new updates are received. - * <p> The callback predicate accepts ID of the node from where updates are received and collection + * <p> + * The callback predicate accepts ID of the node from where updates are received and collection * of received entries. Note that for removed entries value will be {@code null}. * <p> * If the predicate returns {@code false}, query execution will be cancelled. @@ -143,7 +179,16 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> imp * @param locLsnr Local callback. */ public void setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) { - // TODO: implement. + this.locLsnr = locLsnr; + } + + /** + * Gets local listener. + * + * @return Local listener. + */ + public CacheEntryUpdatedListener<K, V> getLocalListener() { + return locLsnr; } /** @@ -153,56 +198,99 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> imp * (e.g., synchronization or transactional cache operations), should be executed asynchronously * without blocking the thread that called the filter. Otherwise, you can get deadlocks. * - * @param filter Key-value filter. + * @param rmtFilter Key-value filter. + */ + public void setRemoteFilter(CacheEntryEventFilter<K, V> rmtFilter) { + this.rmtFilter = rmtFilter; + } + + /** + * Gets remote filter. + * + * @return Remote filter. */ - public void setRemoteFilter(CacheEntryEventFilter<K, V> filter) { - // TODO: implement. + public CacheEntryEventFilter<K, V> getRemoteFilter() { + return rmtFilter; } /** - * Sets buffer size. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will be - * sent to the master node only if the buffer is full or time provided via {@link #timeInterval(long)} method is - * exceeded. <p> Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is + * Sets buffer size. + * <p> + * When a cache update happens, entry is first put into a buffer. Entries from buffer will be + * sent to the master node only if the buffer is full or time provided via {@link #setTimeInterval(long)} method is + * exceeded. + * <p> + * Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is * disabled). * * @param bufSize Buffer size. */ - public void bufferSize(int bufSize) { - // TODO: implement. + public void setBufferSize(int bufSize) { + if (bufSize <= 0) + throw new IllegalArgumentException("Buffer size must be above zero."); + + this.bufSize = bufSize; } /** - * Sets time interval. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will - * be sent to the master node only if the buffer is full (its size can be provided via {@link #bufferSize(int)} - * method) or time provided via this method is exceeded. <p> Default time interval is {@code 0} which means that + * Gets buffer size. + * + * @return Buffer size. + */ + public int getBufferSize() { + return bufSize; + } + + /** + * Sets time interval. + * <p> + * When a cache update happens, entry is first put into a buffer. Entries from buffer will + * be sent to the master node only if the buffer is full (its size can be provided via {@link #setBufferSize(int)} + * method) or time provided via this method is exceeded. + * <p> + * Default time interval is {@code 0} which means that * time check is disabled and entries will be sent only when buffer is full. * * @param timeInterval Time interval. */ - public void timeInterval(long timeInterval) { - // TODO: implement. + public void setTimeInterval(long timeInterval) { + if (timeInterval < 0) + throw new IllegalArgumentException("Time interval can't be negative."); + + this.timeInterval = timeInterval; + } + + /** + * Gets time interval. + * + * @return Time interval. + */ + public long getTimeInterval() { + return timeInterval; } /** - * Sets automatic unsubscribe flag. <p> This flag indicates that query filters on remote nodes should be - * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is {@code - * false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be - * unregistered if master node leaves grid. <p> Default value for this flag is {@code true}. + * Sets automatic unsubscribe flag. + * <p> + * This flag indicates that query filters on remote nodes should be + * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is + * {@code false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be + * unregistered if master node leaves grid. + * <p> + * Default value for this flag is {@code true}. * * @param autoUnsubscribe Automatic unsubscription flag. */ - public void autoUnsubscribe(boolean autoUnsubscribe) { - // TODO: implement. + public void setAutoUnsubscribe(boolean autoUnsubscribe) { + this.autoUnsubscribe = autoUnsubscribe; } /** - * Stops continuous query execution. <p> Note that one query instance can be executed only once. After it's - * cancelled, it's non-operational. If you need to repeat execution, use {@link - * org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create new query. + * Gets automatic unsubscription flag value. * - * @throws IgniteCheckedException In case of error. + * @return Automatic unsubscription flag. */ - @Override public void close() throws IgniteCheckedException { - // TODO: implement. + public boolean isAutoUnsubscribe() { + return autoUnsubscribe; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java index 744d8d2..c24d704 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java @@ -106,6 +106,15 @@ public abstract class Query<T extends Query> implements Serializable { } /** + * Factory method for continuous queries. + * + * @return Continuous query. + */ + public static <K, V> ContinuousQuery<K, V> continuous() { + return new ContinuousQuery<>(); + } + + /** * Gets optional page size, if {@code 0}, then {@link CacheQueryConfiguration#getPageSize()} is used. * * @return Optional page size. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java index 51810a2..a7563a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java @@ -17,7 +17,6 @@ package org.apache.ignite.events; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.tostring.*; @@ -25,6 +24,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; +import javax.cache.event.*; import java.util.*; /** @@ -84,7 +84,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter { /** Continuous query filter. */ @GridToStringInclude - private final IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter; + private final CacheEntryEventFilter<K, V> contQryFilter; /** Query arguments. */ @GridToStringInclude @@ -117,7 +117,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter { @Nullable String clsName, @Nullable String clause, @Nullable IgniteBiPredicate<K, V> scanQryFilter, - @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter, + @Nullable CacheEntryEventFilter<K, V> contQryFilter, @Nullable Object[] args, @Nullable UUID subjId, @Nullable String taskName) { @@ -194,7 +194,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter { * * @return Continuous query filter. */ - @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> continuousQueryFilter() { + @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() { return contQryFilter; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java index 79b5eca..1959976 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java @@ -17,7 +17,6 @@ package org.apache.ignite.events; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.tostring.*; @@ -25,6 +24,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; +import javax.cache.event.*; import java.util.*; /** @@ -84,7 +84,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter { /** Continuous query filter. */ @GridToStringInclude - private final IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter; + private final CacheEntryEventFilter<K, V> contQryFilter; /** Query arguments. */ @GridToStringInclude @@ -135,7 +135,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter { @Nullable String clsName, @Nullable String clause, @Nullable IgniteBiPredicate<K, V> scanQryFilter, - @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter, + @Nullable CacheEntryEventFilter<K, V> contQryFilter, @Nullable Object[] args, @Nullable UUID subjId, @Nullable String taskName, @@ -220,7 +220,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter { * * @return Continuous query filter. */ - @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> continuousQueryFilter() { + @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() { return contQryFilter; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java deleted file mode 100644 index ffbc85b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; - -import javax.cache.event.*; - -/** - * Implementation of {@link javax.cache.event.CacheEntryEvent}. - */ -public class CacheEntryEvent<K, V> extends javax.cache.event.CacheEntryEvent<K, V> { - /** */ - private final CacheContinuousQueryEntry<K, V> e; - - /** - * @param src Cache. - * @param type Event type. - * @param e Ignite event. - */ - public CacheEntryEvent(IgniteCache src, EventType type, CacheContinuousQueryEntry<K, V> e) { - super(src, type); - - this.e = e; - } - - /** {@inheritDoc} */ - @Override public V getOldValue() { - return e.getOldValue(); - } - - /** {@inheritDoc} */ - @Override public boolean isOldValueAvailable() { - return e.getOldValue() != null; - } - - /** {@inheritDoc} */ - @Override public K getKey() { - return e.getKey(); - } - - /** {@inheritDoc} */ - @Override public V getValue() { - return e.getValue(); - } - - /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> cls) { - if(cls.isAssignableFrom(getClass())) - return cls.cast(this); - - throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "CacheEntryEvent [evtType=" + getEventType() + - ", key=" + getKey() + - ", val=" + getValue() + - ", oldVal=" + getOldValue() + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 39758a6..36d7f1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -111,7 +111,7 @@ public class GridCacheContext<K, V> implements Externalizable { private GridCacheQueryManager<K, V> qryMgr; /** Continuous query manager. */ - private GridCacheContinuousQueryManager<K, V> contQryMgr; + private CacheContinuousQueryManager<K, V> contQryMgr; /** Swap manager. */ private GridCacheSwapManager<K, V> swapMgr; @@ -240,7 +240,7 @@ public class GridCacheContext<K, V> implements Externalizable { GridCacheStoreManager<K, V> storeMgr, GridCacheEvictionManager<K, V> evictMgr, GridCacheQueryManager<K, V> qryMgr, - GridCacheContinuousQueryManager<K, V> contQryMgr, + CacheContinuousQueryManager<K, V> contQryMgr, GridCacheAffinityManager<K, V> affMgr, CacheDataStructuresManager<K, V> dataStructuresMgr, GridCacheTtlManager<K, V> ttlMgr, @@ -867,7 +867,7 @@ public class GridCacheContext<K, V> implements Externalizable { /** * @return Continuous query manager, {@code null} if disabled. */ - public GridCacheContinuousQueryManager<K, V> continuousQueries() { + public CacheContinuousQueryManager<K, V> continuousQueries() { return contQryMgr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 2f8bb62..c69ad4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1165,7 +1165,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); cctx.dataStructures().onEntryUpdated(key, false); } @@ -1324,7 +1324,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes); cctx.dataStructures().onEntryUpdated(key, true); } @@ -1633,7 +1633,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -1645,7 +1645,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } - return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old), invokeRes); + return new GridTuple3<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old), + invokeRes); } /** {@inheritDoc} */ @@ -2204,8 +2205,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (res) updateMetrics(op, metrics); - if (primary) - cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + if (cctx.isReplicated() || primary) + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -3228,15 +3229,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> drReplicate(drType, val, valBytes, ver); if (!skipQryNtf) { - if (cctx.affinity().primary(cctx.localNode(), key, topVer)) { - cctx.continuousQueries().onEntryUpdate(this, - key, - val, - valueBytesUnlocked(), - null, - null, - preload); - } + if (!preload && (cctx.isLocal() || cctx.isReplicated() || + cctx.affinity().primary(cctx.localNode(), key, topVer))) + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null); + cctx.dataStructures().onEntryUpdated(key, false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index a4ea863..cb8982b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -625,7 +625,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || !GridCacheUtils.isNearEnabled(cfg)); GridCacheEvictionManager evictMgr = new GridCacheEvictionManager(); GridCacheQueryManager qryMgr = queryManager(cfg); - GridCacheContinuousQueryManager contQryMgr = new GridCacheContinuousQueryManager(); + CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager(); CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager(); GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class); @@ -761,7 +761,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * 2. GridCacheIoManager * 3. GridCacheDeploymentManager * 4. GridCacheQueryManager (note, that we start it for DHT cache though). - * 5. GridCacheContinuousQueryManager (note, that we start it for DHT cache though). + * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though). * 6. GridCacheDgcManager * 7. GridCacheTtlManager. * =============================================== @@ -1587,8 +1587,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @return Utility cache. */ - public <K, V> GridCache<K, V> utilityCache() { - return cache(CU.UTILITY_CACHE_NAME); + public <K, V> GridCacheAdapter<K, V> utilityCache() { + return internalCache(CU.UTILITY_CACHE_NAME); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java index ce7ec24..4ec2dc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java @@ -454,4 +454,9 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args); + + /** + * @return Context. + */ + public GridCacheContext<K, V> context(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 433837d..53fc796 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -325,6 +325,64 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** + * Executes continuous query. + * + * @param qry Query. + * @param loc Local flag. + * @return Initial iteration cursor. + */ + private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, boolean loc) { + if (qry.getInitialPredicate() instanceof ContinuousQuery) + throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + + "continuous query. Use SCAN or SQL query for initial iteration."); + + if (qry.getLocalListener() == null) + throw new IgniteException("Mandatory local listener is not set for the query: " + qry); + + try { + final UUID routineId = ctx.continuousQueries().executeQuery( + qry.getLocalListener(), + qry.getRemoteFilter(), + qry.getBufferSize(), + qry.getTimeInterval(), + qry.isAutoUnsubscribe(), + loc ? ctx.grid().forLocal() : null); + + final QueryCursor<Cache.Entry<K, V>> cur; + + if (qry.getInitialPredicate() != null) + cur = loc ? localQuery(qry.getInitialPredicate()) : query(qry.getInitialPredicate()); + else + cur = null; + + return new QueryCursor<Cache.Entry<K, V>>() { + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return cur != null ? cur.iterator() : new GridEmptyIterator<Cache.Entry<K, V>>(); + } + + @Override public List<Cache.Entry<K, V>> getAll() { + return cur != null ? cur.getAll() : Collections.<Cache.Entry<K, V>>emptyList(); + } + + @Override public void close() { + if (cur != null) + cur.close(); + + try { + ctx.kernalContext().continuous().stopRoutine(routineId).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + }; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** * @param local Enforce local. * @return Local node cluster group. */ @@ -333,6 +391,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public QueryCursor<Entry<K,V>> query(Query qry) { A.notNull(qry, "qry"); @@ -344,6 +403,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (qry instanceof SqlQuery) { return null; // TODO } + else if (qry instanceof ContinuousQuery) + return queryContinuous((ContinuousQuery<K, V>)qry, false); return query(qry, projection(false)); } @@ -407,11 +468,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @throws CacheException If query indexing disabled for sql query. */ private void validate(Query qry) { - if (!(qry instanceof ScanQuery) && !ctx.config().isQueryIndexEnabled()) + if (!(qry instanceof ScanQuery) && !(qry instanceof ContinuousQuery) && !ctx.config().isQueryIndexEnabled()) throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name()); } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public QueryCursor<Entry<K,V>> localQuery(Query qry) { A.notNull(qry, "qry"); @@ -422,6 +484,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (qry instanceof SqlQuery) return doLocalQuery((SqlQuery)qry); + else if (qry instanceof ContinuousQuery) + return queryContinuous((ContinuousQuery<K, V>)qry, true); return query(qry, projection(true)); } @@ -1108,7 +1172,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - ctx.continuousQueries().registerCacheEntryListener(lsnrCfg, true); + ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false); } catch (IgniteCheckedException e) { throw cacheException(e); @@ -1119,11 +1183,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ - @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) { + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - ctx.continuousQueries().deregisterCacheEntryListener(lsnrCfg); + ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); } catch (IgniteCheckedException e) { throw cacheException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 5f68a7a..407da34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -19,11 +19,9 @@ package org.apache.ignite.internal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.datastructures.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.*; @@ -33,6 +31,7 @@ import org.apache.ignite.resources.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.event.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -59,7 +58,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView; /** Query notifying about queue update. */ - private GridCacheContinuousQueryAdapter queueQry; + private UUID queueQryId; /** Queue query creation guard. */ private final AtomicBoolean queueQryGuard = new AtomicBoolean(); @@ -98,14 +97,8 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, @Override protected void onKernalStop0(boolean cancel) { busyLock.block(); - if (queueQry != null) { - try { - queueQry.close(); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to cancel queue header query.", e); - } - } + if (queueQryId != null) + cctx.continuousQueries().cancelInternalQuery(queueQryId); for (GridCacheQueueProxy q : queuesMap.values()) q.delegate().onKernalStop(); @@ -188,52 +181,43 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, return null; if (queueQryGuard.compareAndSet(false, true)) { - queueQry = (GridCacheContinuousQueryAdapter)cctx.cache().queries().createContinuousQuery(); - - queueQry.remoteFilter(new QueueHeaderPredicate()); - - queueQry.localCallback(new IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry>>() { - @Override public boolean apply(UUID id, Collection<GridCacheContinuousQueryEntry> entries) { - if (!busyLock.enterBusy()) - return false; + queueQryId = cctx.continuousQueries().executeInternalQuery( + new CacheEntryUpdatedListener<K, V>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + if (!busyLock.enterBusy()) + return; - try { - for (GridCacheContinuousQueryEntry e : entries) { - GridCacheQueueHeaderKey key = (GridCacheQueueHeaderKey)e.getKey(); - GridCacheQueueHeader hdr = (GridCacheQueueHeader)e.getValue(); + try { + for (CacheEntryEvent<?, ?> e : evts) { + GridCacheQueueHeaderKey key = (GridCacheQueueHeaderKey)e.getKey(); + GridCacheQueueHeader hdr = (GridCacheQueueHeader)e.getValue(); - for (final GridCacheQueueProxy queue : queuesMap.values()) { - if (queue.name().equals(key.queueName())) { - if (hdr == null) { - GridCacheQueueHeader oldHdr = (GridCacheQueueHeader)e.getOldValue(); + for (final GridCacheQueueProxy queue : queuesMap.values()) { + if (queue.name().equals(key.queueName())) { + if (hdr == null) { + GridCacheQueueHeader oldHdr = (GridCacheQueueHeader)e.getOldValue(); - assert oldHdr != null; + assert oldHdr != null; - if (oldHdr.id().equals(queue.delegate().id())) { - queue.delegate().onRemoved(false); + if (oldHdr.id().equals(queue.delegate().id())) { + queue.delegate().onRemoved(false); - queuesMap.remove(queue.delegate().id()); + queuesMap.remove(queue.delegate().id()); + } } + else + queue.delegate().onHeaderChanged(hdr); } - else - queue.delegate().onHeaderChanged(hdr); } } } - - return true; - } - finally { - busyLock.leaveBusy(); + finally { + busyLock.leaveBusy(); + } } - } - }); - - queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, - true, - false, - false, - true); + }, + new QueueHeaderPredicate(), + cctx.isLocal() || cctx.isReplicated()); } GridCacheQueueProxy queue = queuesMap.get(hdr.id()); @@ -544,7 +528,8 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, /** * Predicate for queue continuous query. */ - private static class QueueHeaderPredicate implements IgnitePredicate<CacheContinuousQueryEntry>, Externalizable { + private static class QueueHeaderPredicate<K, V> implements CacheEntryEventFilter<K, V>, + Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -556,7 +541,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } /** {@inheritDoc} */ - @Override public boolean apply(CacheContinuousQueryEntry e) { + @Override public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> e) { return e.getKey() instanceof GridCacheQueueHeaderKey; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java index c1aede1..3dcb82a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java @@ -93,16 +93,6 @@ public interface CacheQueries<K, V> { public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter); /** - * Creates new continuous query. - * <p> - * For more information refer to {@link CacheContinuousQuery} documentation. - * - * @return Created continuous query. - * @see CacheContinuousQuery - */ - public CacheContinuousQuery<K, V> createContinuousQuery(); - - /** * Forces this cache to rebuild all search indexes of given value type. Sometimes indexes * may hold references to objects that have already been removed from cache. Although * not affecting query results, these objects may consume extra memory. Rebuilding http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java index 368dae7..5dbc043 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java @@ -182,11 +182,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext } /** {@inheritDoc} */ - @Override public CacheContinuousQuery<K, V> createContinuousQuery() { - return ctx.continuousQueries().createQuery(prj == null ? null : prj.predicate()); - } - - /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> rebuildIndexes(Class<?> cls) { A.notNull(cls, "cls"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java index d7c3f4c..79eb978 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java @@ -151,18 +151,6 @@ public class GridCacheQueriesProxy<K, V> implements GridCacheQueriesEx<K, V>, Ex } /** {@inheritDoc} */ - @Override public CacheContinuousQuery<K, V> createContinuousQuery() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.createContinuousQuery(); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Override public <R> CacheQuery<R> createSpiQuery() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java new file mode 100644 index 0000000..aa4ce54 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +import static org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*; + +/** + * Continuous query entry. + */ +class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Key. */ + @GridToStringInclude + private K key; + + /** New value. */ + @GridToStringInclude + private V newVal; + + /** Old value. */ + @GridToStringInclude + private V oldVal; + + /** Serialized key. */ + @GridToStringExclude + private byte[] keyBytes; + + /** Serialized value. */ + @GridToStringExclude + private GridCacheValueBytes newValBytes; + + /** Serialized value. */ + @GridToStringExclude + private GridCacheValueBytes oldValBytes; + + /** Cache name. */ + private String cacheName; + + /** Deployment info. */ + @GridToStringExclude + private GridDeploymentInfo depInfo; + + public CacheContinuousQueryEntry() { + // No-op. + } + + CacheContinuousQueryEntry(K key, @Nullable V newVal, @Nullable GridCacheValueBytes newValBytes, @Nullable V oldVal, + @Nullable GridCacheValueBytes oldValBytes) { + + this.key = key; + this.newVal = newVal; + this.newValBytes = newValBytes; + this.oldVal = oldVal; + this.oldValBytes = oldValBytes; + } + + /** + * @param cacheName Cache name. + */ + void cacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** + * @return cache name. + */ + String cacheName() { + return cacheName; + } + + /** + * @param marsh Marshaller. + * @throws IgniteCheckedException In case of error. + */ + void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { + assert marsh != null; + + assert key != null; + + keyBytes = marsh.marshal(key); + + if (newValBytes == null || newValBytes.isNull()) + newValBytes = newVal != null ? + newVal instanceof byte[] ? plain(newVal) : marshaled(marsh.marshal(newVal)) : null; + + if (oldValBytes == null || oldValBytes.isNull()) + oldValBytes = oldVal != null ? + oldVal instanceof byte[] ? plain(oldVal) : marshaled(marsh.marshal(oldVal)) : null; + } + + /** + * @param marsh Marshaller. + * @param ldr Class loader. + * @throws IgniteCheckedException In case of error. + */ + void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + assert marsh != null; + + assert key == null : "Key should be null: " + key; + assert newVal == null : "New value should be null: " + newVal; + assert oldVal == null : "Old value should be null: " + oldVal; + assert keyBytes != null; + + key = marsh.unmarshal(keyBytes, ldr); + + if (newValBytes != null && !newValBytes.isNull()) + newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr); + + if (oldValBytes != null && !oldValBytes.isNull()) + oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : marsh.<V>unmarshal(oldValBytes.get(), ldr); + } + + /** + * @return Key. + */ + K key() { + return key; + } + + /** + * @return New value. + */ + V value() { + return newVal; + } + + /** + * @return Old value. + */ + V oldValue() { + return oldVal; + } + + /** {@inheritDoc} */ + @Override public void prepare(GridDeploymentInfo depInfo) { + this.depInfo = depInfo; + } + + /** {@inheritDoc} */ + @Override public GridDeploymentInfo deployInfo() { + return depInfo; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + boolean b = keyBytes != null; + + out.writeBoolean(b); + + if (b) { + U.writeByteArray(out, keyBytes); + + if (newValBytes != null && !newValBytes.isNull()) { + out.writeBoolean(true); + out.writeBoolean(newValBytes.isPlain()); + U.writeByteArray(out, newValBytes.get()); + } + else + out.writeBoolean(false); + + if (oldValBytes != null && !oldValBytes.isNull()) { + out.writeBoolean(true); + out.writeBoolean(oldValBytes.isPlain()); + U.writeByteArray(out, oldValBytes.get()); + } + else + out.writeBoolean(false); + + U.writeString(out, cacheName); + out.writeObject(depInfo); + } + else { + out.writeObject(key); + out.writeObject(newVal); + out.writeObject(oldVal); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + boolean b = in.readBoolean(); + + if (b) { + keyBytes = U.readByteArray(in); + + if (in.readBoolean()) + newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); + + if (in.readBoolean()) + oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); + + cacheName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } + else { + key = (K)in.readObject(); + newVal = (V)in.readObject(); + oldVal = (V)in.readObject(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryEntry.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java new file mode 100644 index 0000000..c90ae34 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import javax.cache.*; +import javax.cache.event.*; + +/** + * Continuous query event. + */ +class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { + /** Entry. */ + @GridToStringExclude + private final CacheContinuousQueryEntry<K, V> e; + + /** + * @param source Source cache. + * @param eventType Event type. + * @param e Entry. + */ + CacheContinuousQueryEvent(Cache source, EventType eventType, CacheContinuousQueryEntry<K, V> e) { + super(source, eventType); + + assert e != null; + + this.e = e; + } + + /** + * @return Entry. + */ + CacheContinuousQueryEntry<K, V> entry() { + return e; + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return e.key(); + } + + /** {@inheritDoc} */ + @Override public V getValue() { + return e.value(); + } + + /** {@inheritDoc} */ + @Override public V getOldValue() { + return e.oldValue(); + } + + /** {@inheritDoc} */ + @Override public boolean isOldValueAvailable() { + return e.oldValue() != null; + } + + /** {@inheritDoc} */ + @Override public <T> T unwrap(Class<T> cls) { + if(cls.isAssignableFrom(getClass())) + return cls.cast(this); + + throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryEvent.class, this, "key", e.key(), "newVal", e.value(), "oldVal", + e.oldValue(), "cacheName", e.cacheName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java new file mode 100644 index 0000000..897f481 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + + +import javax.cache.event.*; + +/** + * Extended continuous query filter. + */ +public interface CacheContinuousQueryFilterEx<K, V> extends CacheEntryEventFilter<K, V> { + /** + * Callback for query unregister event. + */ + public void onQueryUnregister(); +}