IGNITE-543 - Query API changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/151a2e52 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/151a2e52 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/151a2e52 Branch: refs/heads/ignite-543 Commit: 151a2e52285c1370c4202095eaf7f70b97ff1cad Parents: dcbab7d Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Fri Mar 20 19:26:52 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Fri Mar 20 19:26:52 2015 -0700 ---------------------------------------------------------------------- .../ignite/cache/query/ContinuousQuery.java | 47 +++++--------------- .../org/apache/ignite/cache/query/Query.java | 3 ++ .../processors/cache/IgniteCacheProxy.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 6 +-- ...ridCacheContinuousQueryAbstractSelfTest.java | 13 +++--- .../GridContinuousOperationsLoadTest.java | 2 +- 6 files changed, 24 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index 9814995..cadcb9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -109,10 +109,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { private static final long serialVersionUID = 0L; /** - * Default buffer size. Size of {@code 1} means that all entries + * Default page size. Size of {@code 1} means that all entries * will be sent to master node immediately (buffering is disabled). */ - public static final int DFLT_BUF_SIZE = 1; + public static final int DFLT_PAGE_SIZE = 1; /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */ public static final long DFLT_TIME_INTERVAL = 0; @@ -132,9 +132,6 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { /** Remote filter. */ private CacheEntryEventFilter<K, V> rmtFilter; - /** Buffer size. */ - private int bufSize = DFLT_BUF_SIZE; - /** Time interval. */ private long timeInterval = DFLT_TIME_INTERVAL; @@ -142,6 +139,13 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; /** + * Creates new continuous query. + */ + public ContinuousQuery() { + setPageSize(DFLT_PAGE_SIZE); + } + + /** * Sets initial query. * <p> * This query will be executed before continuous listener is registered @@ -222,41 +226,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { } /** - * Sets buffer size. - * <p> - * When a cache update happens, entry is first put into a buffer. Entries from buffer will be - * sent to the master node only if the buffer is full or time provided via {@link #setTimeInterval(long)} method is - * exceeded. - * <p> - * Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is - * disabled). - * - * @param bufSize Buffer size. - * @return {@code this} for chaining. - */ - public ContinuousQuery<K, V> setBufferSize(int bufSize) { - if (bufSize <= 0) - throw new IllegalArgumentException("Buffer size must be above zero."); - - this.bufSize = bufSize; - - return this; - } - - /** - * Gets buffer size. - * - * @return Buffer size. - */ - public int getBufferSize() { - return bufSize; - } - - /** * Sets time interval. * <p> * When a cache update happens, entry is first put into a buffer. Entries from buffer will - * be sent to the master node only if the buffer is full (its size can be provided via {@link #setBufferSize(int)} + * be sent to the master node only if the buffer is full (its size can be provided via {@link #setPageSize(int)} * method) or time provided via this method is exceeded. * <p> * Default time interval is {@code 0} which means that http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java index c120fc5..bcace6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java @@ -65,6 +65,9 @@ public abstract class Query<R> implements Serializable { * @return {@code this} for chaining. */ public Query<R> setPageSize(int pageSize) { + if (pageSize <= 0) + throw new IllegalArgumentException("Page size must be above zero."); + this.pageSize = pageSize; return this; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f55de0e..22bb330 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -365,7 +365,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V final UUID routineId = ctx.continuousQueries().executeQuery( qry.getLocalListener(), qry.getRemoteFilter(), - qry.getBufferSize(), + qry.getPageSize(), qry.getTimeInterval(), qry.isAutoUnsubscribe(), loc ? ctx.grid().cluster().forLocal() : null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index eb0fc1d..39a8959 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -284,7 +284,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { return executeQuery0( locLsnr, rmtFilter, - ContinuousQuery.DFLT_BUF_SIZE, + ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, true, @@ -401,7 +401,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? cctx.kernalContext().job().currentTaskNameHash() : 0; - GridContinuousHandler hnd = new CacheContinuousQueryHandler<>( + GridContinuousHandler hnd = new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), locLsnr, @@ -582,7 +582,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { routineId = executeQuery0( locLsnr, rmtFilter, - ContinuousQuery.DFLT_BUF_SIZE, + ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, false, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index cbf9eb1..096ea97 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -170,7 +170,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo for (int i = 0; i < gridCount(); i++) { - GridContinuousProcessor proc = ((IgniteKernal)grid(i)).context().continuous(); + GridContinuousProcessor proc = grid(i).context().continuous(); assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size()); @@ -180,8 +180,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size()); - CacheContinuousQueryManager mgr = - ((IgniteKernal)grid(i)).context().cache().internalCache().context().continuousQueries(); + CacheContinuousQueryManager mgr = grid(i).context().cache().internalCache().context().continuousQueries(); assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size()); } @@ -215,7 +214,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo log, new Callable<Object>() { @Override public Object call() throws Exception { - q.setBufferSize(-1); + q.setPageSize(-1); return null; } @@ -226,7 +225,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - q.setBufferSize(0); + q.setPageSize(0); return null; } @@ -514,7 +513,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } }); - qry.setBufferSize(5); + qry.setPageSize(5); try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { ClusterNode node = F.first(grid(0).cluster().forRemotes().nodes()); @@ -599,7 +598,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } }); - qry.setBufferSize(10); + qry.setPageSize(10); qry.setTimeInterval(3000); try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java index fb0c2d8..2ab72f7 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java @@ -128,7 +128,7 @@ public class GridContinuousOperationsLoadTest { } }); - qry.setBufferSize(bufSize); + qry.setPageSize(bufSize); qry.setTimeInterval(timeInterval); cache.query(qry);