IGNITE-543 - Query API changes

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/151a2e52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/151a2e52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/151a2e52

Branch: refs/heads/ignite-45
Commit: 151a2e52285c1370c4202095eaf7f70b97ff1cad
Parents: dcbab7d
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Fri Mar 20 19:26:52 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Fri Mar 20 19:26:52 2015 -0700

----------------------------------------------------------------------
 .../ignite/cache/query/ContinuousQuery.java     | 47 +++++---------------
 .../org/apache/ignite/cache/query/Query.java    |  3 ++
 .../processors/cache/IgniteCacheProxy.java      |  2 +-
 .../continuous/CacheContinuousQueryManager.java |  6 +--
 ...ridCacheContinuousQueryAbstractSelfTest.java | 13 +++---
 .../GridContinuousOperationsLoadTest.java       |  2 +-
 6 files changed, 24 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 9814995..cadcb9c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -109,10 +109,10 @@ public final class ContinuousQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
     private static final long serialVersionUID = 0L;
 
     /**
-     * Default buffer size. Size of {@code 1} means that all entries
+     * Default page size. Size of {@code 1} means that all entries
      * will be sent to master node immediately (buffering is disabled).
      */
-    public static final int DFLT_BUF_SIZE = 1;
+    public static final int DFLT_PAGE_SIZE = 1;
 
     /** Maximum default time interval after which buffer will be flushed (if 
buffering is enabled). */
     public static final long DFLT_TIME_INTERVAL = 0;
@@ -132,9 +132,6 @@ public final class ContinuousQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
     /** Remote filter. */
     private CacheEntryEventFilter<K, V> rmtFilter;
 
-    /** Buffer size. */
-    private int bufSize = DFLT_BUF_SIZE;
-
     /** Time interval. */
     private long timeInterval = DFLT_TIME_INTERVAL;
 
@@ -142,6 +139,13 @@ public final class ContinuousQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
     private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
 
     /**
+     * Creates new continuous query.
+     */
+    public ContinuousQuery() {
+        setPageSize(DFLT_PAGE_SIZE);
+    }
+
+    /**
      * Sets initial query.
      * <p>
      * This query will be executed before continuous listener is registered
@@ -222,41 +226,10 @@ public final class ContinuousQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
     }
 
     /**
-     * Sets buffer size.
-     * <p>
-     * When a cache update happens, entry is first put into a buffer. Entries 
from buffer will be
-     * sent to the master node only if the buffer is full or time provided via 
{@link #setTimeInterval(long)} method is
-     * exceeded.
-     * <p>
-     * Default buffer size is {@code 1} which means that entries will be sent 
immediately (buffering is
-     * disabled).
-     *
-     * @param bufSize Buffer size.
-     * @return {@code this} for chaining.
-     */
-    public ContinuousQuery<K, V> setBufferSize(int bufSize) {
-        if (bufSize <= 0)
-            throw new IllegalArgumentException("Buffer size must be above 
zero.");
-
-        this.bufSize = bufSize;
-
-        return this;
-    }
-
-    /**
-     * Gets buffer size.
-     *
-     * @return Buffer size.
-     */
-    public int getBufferSize() {
-        return bufSize;
-    }
-
-    /**
      * Sets time interval.
      * <p>
      * When a cache update happens, entry is first put into a buffer. Entries 
from buffer will
-     * be sent to the master node only if the buffer is full (its size can be 
provided via {@link #setBufferSize(int)}
+     * be sent to the master node only if the buffer is full (its size can be 
provided via {@link #setPageSize(int)}
      * method) or time provided via this method is exceeded.
      * <p>
      * Default time interval is {@code 0} which means that

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
index c120fc5..bcace6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
@@ -65,6 +65,9 @@ public abstract class Query<R> implements Serializable {
      * @return {@code this} for chaining.
      */
     public Query<R> setPageSize(int pageSize) {
+        if (pageSize <= 0)
+            throw new IllegalArgumentException("Page size must be above 
zero.");
+
         this.pageSize = pageSize;
 
         return this;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f55de0e..22bb330 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -365,7 +365,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
             final UUID routineId = ctx.continuousQueries().executeQuery(
                 qry.getLocalListener(),
                 qry.getRemoteFilter(),
-                qry.getBufferSize(),
+                qry.getPageSize(),
                 qry.getTimeInterval(),
                 qry.isAutoUnsubscribe(),
                 loc ? ctx.grid().cluster().forLocal() : null);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index eb0fc1d..39a8959 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -284,7 +284,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         return executeQuery0(
             locLsnr,
             rmtFilter,
-            ContinuousQuery.DFLT_BUF_SIZE,
+            ContinuousQuery.DFLT_PAGE_SIZE,
             ContinuousQuery.DFLT_TIME_INTERVAL,
             ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
             true,
@@ -401,7 +401,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         int taskNameHash = !internal && 
cctx.kernalContext().security().enabled() ?
             cctx.kernalContext().job().currentTaskNameHash() : 0;
 
-        GridContinuousHandler hnd = new CacheContinuousQueryHandler<>(
+        GridContinuousHandler hnd = new CacheContinuousQueryHandler(
             cctx.name(),
             TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement()),
             locLsnr,
@@ -582,7 +582,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
             routineId = executeQuery0(
                 locLsnr,
                 rmtFilter,
-                ContinuousQuery.DFLT_BUF_SIZE,
+                ContinuousQuery.DFLT_PAGE_SIZE,
                 ContinuousQuery.DFLT_TIME_INTERVAL,
                 ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
                 false,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index cbf9eb1..096ea97 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -170,7 +170,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
 
         for (int i = 0; i < gridCount(); i++) {
-            GridContinuousProcessor proc = 
((IgniteKernal)grid(i)).context().continuous();
+            GridContinuousProcessor proc = grid(i).context().continuous();
 
             assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, 
"locInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, 
"rmtInfos")).size());
@@ -180,8 +180,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, 
"waitForStopAck")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, 
"pending")).size());
 
-            CacheContinuousQueryManager mgr =
-                
((IgniteKernal)grid(i)).context().cache().internalCache().context().continuousQueries();
+            CacheContinuousQueryManager mgr = 
grid(i).context().cache().internalCache().context().continuousQueries();
 
             assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size());
         }
@@ -215,7 +214,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             log,
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    q.setBufferSize(-1);
+                    q.setPageSize(-1);
 
                     return null;
                 }
@@ -226,7 +225,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
         GridTestUtils.assertThrows(log, new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    q.setBufferSize(0);
+                    q.setPageSize(0);
 
                     return null;
                 }
@@ -514,7 +513,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             }
         });
 
-        qry.setBufferSize(5);
+        qry.setPageSize(5);
 
         try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.query(qry)) {
             ClusterNode node = F.first(grid(0).cluster().forRemotes().nodes());
@@ -599,7 +598,7 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             }
         });
 
-        qry.setBufferSize(10);
+        qry.setPageSize(10);
         qry.setTimeInterval(3000);
 
         try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = 
cache.query(qry)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/151a2e52/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
index fb0c2d8..2ab72f7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
@@ -128,7 +128,7 @@ public class GridContinuousOperationsLoadTest {
                         }
                     });
 
-                    qry.setBufferSize(bufSize);
+                    qry.setPageSize(bufSize);
                     qry.setTimeInterval(timeInterval);
 
                     cache.query(qry);

Reply via email to