#ignite-738: revert some methods.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/edbfcf70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/edbfcf70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/edbfcf70

Branch: refs/heads/ignite-485
Commit: edbfcf700c78eb9b9e6a36ef680d5a435ff838ea
Parents: 0920bcf
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Tue Apr 14 10:45:53 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Tue Apr 14 10:45:53 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/query/CacheQuery.java      |  19 +++
 .../cache/query/GridCacheQueryAdapter.java      |  48 ++++--
 .../datastructures/GridCacheSetImpl.java        |   7 +-
 .../cache/IgniteTxMultiNodeAbstractTest.java    |   3 +-
 ...achePartitionedPreloadLifecycleSelfTest.java |  18 ++-
 ...CacheReplicatedPreloadLifecycleSelfTest.java |  25 +--
 ...idCacheReduceQueryMultithreadedSelfTest.java | 155 +++++++++++++++++++
 ...GridCachePartitionedFieldsQuerySelfTest.java |  49 ++++++
 ...dCacheAbstractReduceFieldsQuerySelfTest.java |  30 ++--
 ...cheReduceFieldsQueryPartitionedSelfTest.java |  17 ++
 .../IgniteCacheQuerySelfTestSuite.java          |   1 +
 11 files changed, 321 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 93576ab..bab6db6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -21,6 +21,7 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.query.annotations.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 /**
@@ -259,6 +260,24 @@ public interface CacheQuery<T> {
     public CacheQueryFuture<T> execute(@Nullable Object... args);
 
     /**
+     * Executes the query the same way as {@link #execute(Object...)} method 
but reduces result remotely.
+     *
+     * @param rmtReducer Remote reducer.
+     * @param args Optional arguments.
+     * @return Future for the query result.
+     */
+    public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, 
@Nullable Object... args);
+
+    /**
+     * Executes the query the same way as {@link #execute(Object...)} method 
but transforms result remotely.
+     *
+     * @param rmtTransform Remote transformer.
+     * @param args Optional arguments.
+     * @return Future for the query result.
+     */
+    public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, 
@Nullable Object... args);
+
+    /**
      * Gets metrics for this query.
      *
      * @return Query metrics.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index fa54bbc..4b1fc87 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -363,6 +363,36 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
 
     /** {@inheritDoc} */
     @Override public CacheQueryFuture<T> execute(@Nullable Object... args) {
+        return execute(null, null, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> 
rmtReducer, @Nullable Object... args) {
+        return execute(rmtReducer, null, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> 
rmtTransform, @Nullable Object... args) {
+        return execute(null, rmtTransform, args);
+    }
+
+    @Override public QueryMetrics metrics() {
+        return metrics.copy();
+    }
+
+    @Override public void resetMetrics() {
+        metrics = new GridCacheQueryMetricsAdapter();
+    }
+
+    /**
+     * @param rmtReducer Optional reducer.
+     * @param rmtTransform Optional transformer.
+     * @param args Arguments.
+     * @return Future.
+     */
+    @SuppressWarnings("IfMayBeConditional")
+    private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> 
rmtReducer,
+        @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
         Collection<ClusterNode> nodes = nodes();
 
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -375,7 +405,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
 
         if (cctx.deploymentEnabled()) {
             try {
-                cctx.deploy().registerClasses(filter, null, null);
+                cctx.deploy().registerClasses(filter, rmtReducer, 
rmtTransform);
                 cctx.deploy().registerClasses(args);
             }
             catch (IgniteCheckedException e) {
@@ -388,26 +418,18 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
 
         taskHash = cctx.kernalContext().job().currentTaskNameHash();
 
-        GridCacheQueryBean bean = new GridCacheQueryBean(this, null, null, 
args);
+        GridCacheQueryBean bean = new GridCacheQueryBean(this, 
(IgniteReducer<Object, Object>)rmtReducer,
+            (IgniteClosure<Object, Object>)rmtTransform, args);
 
         GridCacheQueryManager qryMgr = cctx.queries();
 
         boolean loc = nodes.size() == 1 && 
F.first(nodes).id().equals(cctx.localNodeId());
 
         if (type == SQL_FIELDS || type == SPI)
-            return (CacheQueryFuture<T>)(loc ? qryMgr.queryFieldsLocal(bean) :
+            return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
         else
-            return (CacheQueryFuture<T>)(loc ? qryMgr.queryLocal(bean) : 
qryMgr.queryDistributed(bean, nodes));
-    }
-
-
-    @Override public QueryMetrics metrics() {
-        return metrics.copy();
-    }
-
-    @Override public void resetMetrics() {
-        metrics = new GridCacheQueryMetricsAdapter();
+            return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : 
qryMgr.queryDistributed(bean, nodes));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 94fd25a..7d7b028 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -120,7 +120,12 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
 
             qry.projection(ctx.grid().cluster().forNodes(nodes));
 
-            int sum = F.reduce((Iterable<Object>)qry.execute().get(), new 
SumReducer());
+            Iterable<Integer> col = (Iterable<Integer>)qry.execute(new 
SumReducer()).get();
+
+            int sum = 0;
+
+            for (Integer val : col)
+                sum += val;
 
             return sum;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java
index 58c07a0..765bf9e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java
@@ -723,8 +723,7 @@ public abstract class IgniteTxMultiNodeAbstractTest extends 
GridCommonAbstractTe
 
             SqlQuery<String, Integer> qry = new SqlQuery<>(Integer.class, 
"_val >= 0");
 
-            List<Cache.Entry<String, Integer>> entries =
-                new ArrayList<>(cache.query(qry).getAll());
+            List<Cache.Entry<String, Integer>> entries = new 
ArrayList<>(cache.query(qry).getAll());
 
             Collections.sort(entries, new Comparator<Cache.Entry<String, 
Integer>>() {
                 @Override public int compare(Cache.Entry<String, Integer> o1, 
Cache.Entry<String, Integer> o2) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
index a2443b2..a794e78 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.lifecycle.*;
@@ -28,6 +30,7 @@ import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
 
 import javax.cache.*;
+import java.util.*;
 
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CacheRebalanceMode.*;
@@ -171,16 +174,17 @@ public class GridCachePartitionedPreloadLifecycleSelfTest 
extends GridCachePrelo
             info("Checking '" + (i + 1) + "' nodes...");
 
             for (int j = 0; j < G.allGrids().size(); j++) {
-                IgniteCache<Object, MyValue> c2 = grid(j).cache("two");
+                GridCacheAdapter<Object, MyValue> c2 = 
((IgniteKernal)grid(j)).internalCache("two");
 
-                ScanQuery<Object, MyValue> qry = new ScanQuery<>(null);
+                CacheQuery<Map.Entry<Object, MyValue>> qry = 
c2.queries().createScanQuery(null);
 
-                final Ignite grid = grid(j);
+                int totalCnt = F.sumInt(qry.execute(new 
IgniteReducer<Map.Entry<Object, MyValue>, Integer>() {
+                    @IgniteInstanceResource
+                    private Ignite grid;
 
-                int totalCnt = F.reduce(c2.query(qry).getAll(), new 
IgniteReducer<Cache.Entry<Object, MyValue>, Integer>() {
                     private int cnt;
 
-                    @Override public boolean collect(Cache.Entry<Object, 
MyValue> e) {
+                    @Override public boolean collect(Map.Entry<Object, 
MyValue> e) {
                         Object key = e.getKey();
 
                         assertNotNull(e.getValue());
@@ -206,7 +210,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest 
extends GridCachePrelo
                     @Override public Integer reduce() {
                         return cnt;
                     }
-                });
+                }).get());
 
                 info("Total entry count [grid=" + j + ", totalCnt=" + totalCnt 
+ ']');
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
index 525a8f7..4e6e08c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
@@ -177,7 +177,6 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest 
extends GridCachePreloa
             info("Checking '" + (i + 1) + "' nodes...");
 
             for (int j = 0; j < G.allGrids().size(); j++) {
-                final Ignite grid = grid(j);
                 GridCacheAdapter<Object, MyValue> c2 = 
((IgniteKernal)grid(j)).internalCache("two");
 
                 CacheQuery<Map.Entry<Object, MyValue>> qry = 
c2.queries().createScanQuery(null);
@@ -187,13 +186,18 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest 
extends GridCachePreloa
 
                 qry = qry.projection(grid(j).cluster());
 
-                int totalCnt = F.reduce(qry.execute().get(), new 
IgniteReducer<Map.Entry<Object, MyValue>, Integer>() {
+                int totalCnt = F.sumInt(qry.execute(new 
IgniteReducer<Map.Entry<Object, MyValue>, Integer>() {
+                    @IgniteInstanceResource
+                    private Ignite grid;
+
+                    @LoggerResource
+                    private IgniteLogger log0;
+
                     private int cnt;
 
-                    @Override
-                    public boolean collect(Map.Entry<Object, MyValue> e) {
-                        if (!quiet && grid.log().isInfoEnabled())
-                            grid.log().info("Collecting entry: " + e);
+                    @Override public boolean collect(Map.Entry<Object, 
MyValue> e) {
+                        if (!quiet && log0.isInfoEnabled())
+                            log0.info("Collecting entry: " + e);
 
                         Object key = e.getKey();
 
@@ -201,10 +205,10 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest 
extends GridCachePreloa
 
                         try {
                             Object v1 = e.getValue();
-                            Object v2 = ((IgniteKernal) 
grid).getCache("one").get(key);
+                            Object v2 = 
((IgniteKernal)grid).getCache("one").get(key);
 
                             assertNotNull("Cache c1 misses value for key [i=" 
+ j0 + ", j=" + i0 +
-                                ", missedKey=" + key + ", cache=" + 
((IgniteKernal) grid).getCache("one").values() + ']', v2);
+                                ", missedKey=" + key + ", cache=" + 
((IgniteKernal)grid).getCache("one").values() + ']', v2);
                             assertEquals(v1, v2);
                         }
                         catch (IgniteCheckedException e1) {
@@ -218,11 +222,10 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest 
extends GridCachePreloa
                         return true;
                     }
 
-                    @Override
-                    public Integer reduce() {
+                    @Override public Integer reduce() {
                         return cnt;
                     }
-                });
+                }).get());
 
                 info("Total entry count [grid=" + j + ", totalCnt=" + totalCnt 
+ ']');
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
new file mode 100644
index 0000000..d2a626a
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.marshaller.optimized.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Multithreaded reduce query tests with lots of data.
+ */
+public class GridCacheReduceQueryMultithreadedSelfTest extends 
GridCacheAbstractSelfTest {
+    /** */
+    private static final int GRID_CNT = 5;
+
+    /** */
+    private static final int TEST_TIMEOUT = 2 * 60 * 1000;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return GRID_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIMEOUT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        c.setMarshaller(new OptimizedMarshaller(false));
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) 
throws Exception {
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setBackups(1);
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setIndexedTypes(
+            String.class, Integer.class
+        );
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testReduceQuery() throws Exception {
+        final int keyCnt = 5000;
+        final int logFreq = 500;
+
+        final GridCacheAdapter<String, Integer> c = internalCache(jcache());
+
+        final CountDownLatch startLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                for (int i = 1; i < keyCnt; i++) {
+                    c.getAndPut(String.valueOf(i), i);
+
+                    startLatch.countDown();
+
+                    if (i % logFreq == 0)
+                        info("Stored entries: " + i);
+                }
+
+                return null;
+            }
+        }, 1);
+
+        // Create query.
+        final CacheQuery<Map.Entry<String, Integer>> sumQry =
+            c.queries().createSqlQuery(Integer.class, "_val > 
0").timeout(TEST_TIMEOUT);
+
+        final R1<Map.Entry<String, Integer>, Integer> rmtRdc = new 
R1<Map.Entry<String, Integer>, Integer>() {
+            /** */
+            private AtomicInteger sum = new AtomicInteger();
+
+            @Override public boolean collect(Map.Entry<String, Integer> e) {
+                sum.addAndGet(e.getValue());
+
+                return true;
+            }
+
+            @Override public Integer reduce() {
+                return sum.get();
+            }
+        };
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        startLatch.await();
+
+        IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                int cnt = 0;
+
+                while (!stop.get()) {
+                    Collection<Integer> res = sumQry.execute(rmtRdc).get();
+
+                    int sum = F.sumInt(res);
+
+                    cnt++;
+
+                    assertTrue(sum > 0);
+
+                    if (cnt % logFreq == 0) {
+                        info("Reduced value: " + sum);
+                        info("Executed queries: " + cnt);
+                    }
+                }
+
+                return null;
+            }
+        }, 1);
+
+        fut1.get();
+
+        stop.set(true);
+
+        fut2.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java
index f48abb4..19b2d5a 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java
@@ -63,4 +63,53 @@ public class GridCachePartitionedFieldsQuerySelfTest extends 
GridCacheAbstractFi
 
         return cc;
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncludeBackups() throws Exception {
+        CacheQuery<List<?>> qry = 
((IgniteKernal)grid(0)).internalCache(null).queries().createSqlFieldsQuery(
+            "select _KEY, name, age from Person");
+
+        qry.includeBackups(true);
+
+        CacheQueryFuture<List<?>> fut = qry.execute();
+
+        List<List<?>> res = new ArrayList<>(fut.get());
+
+        assertNotNull("Result", res);
+        assertEquals("Result", res.size(), 6);
+
+        Collections.sort(res, new Comparator<List<?>>() {
+            @Override public int compare(List<?> row1, List<?> row2) {
+                return ((Integer)row1.get(2)).compareTo((Integer)row2.get(2));
+            }
+        });
+
+        int cnt = 0;
+
+        for (List<?> row : res) {
+            assertEquals("Row size", 3, row.size());
+
+            if (cnt == 0 || cnt == 1) {
+                assertEquals("Key", new AffinityKey<>("p1", "o1"), row.get(0));
+                assertEquals("Name", "John White", row.get(1));
+                assertEquals("Age", 25, row.get(2));
+            }
+            else if (cnt == 2 || cnt == 3) {
+                assertEquals("Key", new AffinityKey<>("p2", "o1"), row.get(0));
+                assertEquals("Name", "Joe Black", row.get(1));
+                assertEquals("Age", 35, row.get(2));
+            }
+            else if (cnt == 4 || cnt == 5) {
+                assertEquals("Key", new AffinityKey<>("p3", "o2"), row.get(0));
+                assertEquals("Name", "Mike Green", row.get(1));
+                assertEquals("Age", 40, row.get(2));
+            }
+
+            cnt++;
+        }
+
+        assertEquals("Result count", 6, cnt);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
index b09f92d..14b265c 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
@@ -20,9 +20,10 @@ package 
org.apache.ignite.internal.processors.cache.reducefields;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.query.annotations.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.optimized.*;
@@ -160,40 +161,35 @@ public abstract class 
GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
      * @throws Exception If failed.
      */
     public void testNoDataInCache() throws Exception {
-        SqlFieldsQuery qry = new SqlFieldsQuery("select age from Person where 
orgId = 999");
+        CacheQuery<List<?>> qry = ((IgniteKernal)grid(0))
+            .getCache(null).queries().createSqlFieldsQuery("select age from 
Person where orgId = 999");
 
-        Collection<List<?>> res = grid(0).cache(null).query(qry).getAll();
+        Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new 
AverageRemoteReducer()).get();
 
-        IgniteBiTuple<Integer, Integer> redRes = F.reduce(res, new 
AverageRemoteReducer());
-
-        assertEquals("Result", 0, F.reduce(Collections.singleton(redRes), new 
AverageLocalReducer()).intValue());
+        assertEquals("Result", 0, F.reduce(res, new 
AverageLocalReducer()).intValue());
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAverageQuery() throws Exception {
-        SqlFieldsQuery qry = new SqlFieldsQuery("select age from Person");
-
-        Collection<List<?>> res = grid(0).cache(null).query(qry).getAll();
+        CacheQuery<List<?>> qry = 
((IgniteKernal)grid(0)).getCache(null).queries().createSqlFieldsQuery("select 
age from Person");
 
-        IgniteBiTuple<Integer, Integer> redRes = F.reduce(res, new 
AverageRemoteReducer());
+        Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new 
AverageRemoteReducer()).get();
 
-        assertEquals("Average", 33, F.reduce(Collections.singleton(redRes), 
new AverageLocalReducer()).intValue());
+        assertEquals("Average", 33, F.reduce(res, new 
AverageLocalReducer()).intValue());
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAverageQueryWithArguments() throws Exception {
-        SqlFieldsQuery qry = new SqlFieldsQuery("select age from Person where 
orgId = ?");
-        qry.setArgs(1);
-
-        Collection<List<?>> res = grid(0).cache(null).query(qry).getAll();
+        CacheQuery<List<?>> qry = 
((IgniteKernal)grid(0)).getCache(null).queries().createSqlFieldsQuery(
+            "select age from Person where orgId = ?");
 
-        IgniteBiTuple<Integer, Integer> redRes = F.reduce(res, new 
AverageRemoteReducer());
+        Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new 
AverageRemoteReducer(), 1).get();
 
-        assertEquals("Average", 30, F.reduce(Collections.singleton(redRes), 
new AverageLocalReducer()).intValue());
+        assertEquals("Average", 30, F.reduce(res, new 
AverageLocalReducer()).intValue());
     }
 
 //    /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java
index de5093f..a5aee10 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java
@@ -39,4 +39,21 @@ public class GridCacheReduceFieldsQueryPartitionedSelfTest 
extends GridCacheAbst
     @Override protected int gridCount() {
         return 3;
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncludeBackups() throws Exception {
+        CacheQuery<List<?>> qry = 
((IgniteKernal)grid(0)).getCache(null).queries().createSqlFieldsQuery("select 
age from Person");
+
+        qry.includeBackups(true);
+
+        int sum = 0;
+
+        for (IgniteBiTuple<Integer, Integer> tuple : qry.execute(new 
AverageRemoteReducer()).get())
+            sum += tuple.get1();
+
+        // One backup, so sum is two times greater
+        assertEquals("Sum", 200, sum);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 6cfa35d..fe70c12 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -65,6 +65,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
         
suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class);
 //        suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); TODO 
IGNITE-484
+        suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class);
         suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class);
 
         // Fields queries.

Reply via email to