#ignite-738: revert some methods.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/edbfcf70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/edbfcf70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/edbfcf70 Branch: refs/heads/ignite-gg-9613 Commit: edbfcf700c78eb9b9e6a36ef680d5a435ff838ea Parents: 0920bcf Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue Apr 14 10:45:53 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue Apr 14 10:45:53 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/query/CacheQuery.java | 19 +++ .../cache/query/GridCacheQueryAdapter.java | 48 ++++-- .../datastructures/GridCacheSetImpl.java | 7 +- .../cache/IgniteTxMultiNodeAbstractTest.java | 3 +- ...achePartitionedPreloadLifecycleSelfTest.java | 18 ++- ...CacheReplicatedPreloadLifecycleSelfTest.java | 25 +-- ...idCacheReduceQueryMultithreadedSelfTest.java | 155 +++++++++++++++++++ ...GridCachePartitionedFieldsQuerySelfTest.java | 49 ++++++ ...dCacheAbstractReduceFieldsQuerySelfTest.java | 30 ++-- ...cheReduceFieldsQueryPartitionedSelfTest.java | 17 ++ .../IgniteCacheQuerySelfTestSuite.java | 1 + 11 files changed, 321 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 93576ab..bab6db6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -21,6 +21,7 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; /** @@ -259,6 +260,24 @@ public interface CacheQuery<T> { public CacheQueryFuture<T> execute(@Nullable Object... args); /** + * Executes the query the same way as {@link #execute(Object...)} method but reduces result remotely. + * + * @param rmtReducer Remote reducer. + * @param args Optional arguments. + * @return Future for the query result. + */ + public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args); + + /** + * Executes the query the same way as {@link #execute(Object...)} method but transforms result remotely. + * + * @param rmtTransform Remote transformer. + * @param args Optional arguments. + * @return Future for the query result. + */ + public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args); + + /** * Gets metrics for this query. * * @return Query metrics. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index fa54bbc..4b1fc87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -363,6 +363,36 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** {@inheritDoc} */ @Override public CacheQueryFuture<T> execute(@Nullable Object... args) { + return execute(null, null, args); + } + + /** {@inheritDoc} */ + @Override public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args) { + return execute(rmtReducer, null, args); + } + + /** {@inheritDoc} */ + @Override public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args) { + return execute(null, rmtTransform, args); + } + + @Override public QueryMetrics metrics() { + return metrics.copy(); + } + + @Override public void resetMetrics() { + metrics = new GridCacheQueryMetricsAdapter(); + } + + /** + * @param rmtReducer Optional reducer. + * @param rmtTransform Optional transformer. + * @param args Arguments. + * @return Future. + */ + @SuppressWarnings("IfMayBeConditional") + private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer, + @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) { Collection<ClusterNode> nodes = nodes(); cctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -375,7 +405,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { if (cctx.deploymentEnabled()) { try { - cctx.deploy().registerClasses(filter, null, null); + cctx.deploy().registerClasses(filter, rmtReducer, rmtTransform); cctx.deploy().registerClasses(args); } catch (IgniteCheckedException e) { @@ -388,26 +418,18 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { taskHash = cctx.kernalContext().job().currentTaskNameHash(); - GridCacheQueryBean bean = new GridCacheQueryBean(this, null, null, args); + GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer, + (IgniteClosure<Object, Object>)rmtTransform, args); GridCacheQueryManager qryMgr = cctx.queries(); boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); if (type == SQL_FIELDS || type == SPI) - return (CacheQueryFuture<T>)(loc ? qryMgr.queryFieldsLocal(bean) : + return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); else - return (CacheQueryFuture<T>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); - } - - - @Override public QueryMetrics metrics() { - return metrics.copy(); - } - - @Override public void resetMetrics() { - metrics = new GridCacheQueryMetricsAdapter(); + return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 94fd25a..7d7b028 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -120,7 +120,12 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite qry.projection(ctx.grid().cluster().forNodes(nodes)); - int sum = F.reduce((Iterable<Object>)qry.execute().get(), new SumReducer()); + Iterable<Integer> col = (Iterable<Integer>)qry.execute(new SumReducer()).get(); + + int sum = 0; + + for (Integer val : col) + sum += val; return sum; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java index 58c07a0..765bf9e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java @@ -723,8 +723,7 @@ public abstract class IgniteTxMultiNodeAbstractTest extends GridCommonAbstractTe SqlQuery<String, Integer> qry = new SqlQuery<>(Integer.class, "_val >= 0"); - List<Cache.Entry<String, Integer>> entries = - new ArrayList<>(cache.query(qry).getAll()); + List<Cache.Entry<String, Integer>> entries = new ArrayList<>(cache.query(qry).getAll()); Collections.sort(entries, new Comparator<Cache.Entry<String, Integer>>() { @Override public int compare(Cache.Entry<String, Integer> o1, Cache.Entry<String, Integer> o2) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java index a2443b2..a794e78 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.*; @@ -28,6 +30,7 @@ import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import javax.cache.*; +import java.util.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheRebalanceMode.*; @@ -171,16 +174,17 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo info("Checking '" + (i + 1) + "' nodes..."); for (int j = 0; j < G.allGrids().size(); j++) { - IgniteCache<Object, MyValue> c2 = grid(j).cache("two"); + GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two"); - ScanQuery<Object, MyValue> qry = new ScanQuery<>(null); + CacheQuery<Map.Entry<Object, MyValue>> qry = c2.queries().createScanQuery(null); - final Ignite grid = grid(j); + int totalCnt = F.sumInt(qry.execute(new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() { + @IgniteInstanceResource + private Ignite grid; - int totalCnt = F.reduce(c2.query(qry).getAll(), new IgniteReducer<Cache.Entry<Object, MyValue>, Integer>() { private int cnt; - @Override public boolean collect(Cache.Entry<Object, MyValue> e) { + @Override public boolean collect(Map.Entry<Object, MyValue> e) { Object key = e.getKey(); assertNotNull(e.getValue()); @@ -206,7 +210,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo @Override public Integer reduce() { return cnt; } - }); + }).get()); info("Total entry count [grid=" + j + ", totalCnt=" + totalCnt + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java index 525a8f7..4e6e08c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java @@ -177,7 +177,6 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa info("Checking '" + (i + 1) + "' nodes..."); for (int j = 0; j < G.allGrids().size(); j++) { - final Ignite grid = grid(j); GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two"); CacheQuery<Map.Entry<Object, MyValue>> qry = c2.queries().createScanQuery(null); @@ -187,13 +186,18 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa qry = qry.projection(grid(j).cluster()); - int totalCnt = F.reduce(qry.execute().get(), new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() { + int totalCnt = F.sumInt(qry.execute(new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() { + @IgniteInstanceResource + private Ignite grid; + + @LoggerResource + private IgniteLogger log0; + private int cnt; - @Override - public boolean collect(Map.Entry<Object, MyValue> e) { - if (!quiet && grid.log().isInfoEnabled()) - grid.log().info("Collecting entry: " + e); + @Override public boolean collect(Map.Entry<Object, MyValue> e) { + if (!quiet && log0.isInfoEnabled()) + log0.info("Collecting entry: " + e); Object key = e.getKey(); @@ -201,10 +205,10 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa try { Object v1 = e.getValue(); - Object v2 = ((IgniteKernal) grid).getCache("one").get(key); + Object v2 = ((IgniteKernal)grid).getCache("one").get(key); assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + - ", missedKey=" + key + ", cache=" + ((IgniteKernal) grid).getCache("one").values() + ']', v2); + ", missedKey=" + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2); assertEquals(v1, v2); } catch (IgniteCheckedException e1) { @@ -218,11 +222,10 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa return true; } - @Override - public Integer reduce() { + @Override public Integer reduce() { return cnt; } - }); + }).get()); info("Total entry count [grid=" + j + ", totalCnt=" + totalCnt + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java new file mode 100644 index 0000000..d2a626a --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.marshaller.optimized.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Multithreaded reduce query tests with lots of data. + */ +public class GridCacheReduceQueryMultithreadedSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 5; + + /** */ + private static final int TEST_TIMEOUT = 2 * 60 * 1000; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setMarshaller(new OptimizedMarshaller(false)); + + return c; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheMode(PARTITIONED); + cfg.setBackups(1); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setIndexedTypes( + String.class, Integer.class + ); + + return cfg; + } + + /** + * @throws Exception In case of error. + */ + public void testReduceQuery() throws Exception { + final int keyCnt = 5000; + final int logFreq = 500; + + final GridCacheAdapter<String, Integer> c = internalCache(jcache()); + + final CountDownLatch startLatch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + for (int i = 1; i < keyCnt; i++) { + c.getAndPut(String.valueOf(i), i); + + startLatch.countDown(); + + if (i % logFreq == 0) + info("Stored entries: " + i); + } + + return null; + } + }, 1); + + // Create query. + final CacheQuery<Map.Entry<String, Integer>> sumQry = + c.queries().createSqlQuery(Integer.class, "_val > 0").timeout(TEST_TIMEOUT); + + final R1<Map.Entry<String, Integer>, Integer> rmtRdc = new R1<Map.Entry<String, Integer>, Integer>() { + /** */ + private AtomicInteger sum = new AtomicInteger(); + + @Override public boolean collect(Map.Entry<String, Integer> e) { + sum.addAndGet(e.getValue()); + + return true; + } + + @Override public Integer reduce() { + return sum.get(); + } + }; + + final AtomicBoolean stop = new AtomicBoolean(); + + startLatch.await(); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + int cnt = 0; + + while (!stop.get()) { + Collection<Integer> res = sumQry.execute(rmtRdc).get(); + + int sum = F.sumInt(res); + + cnt++; + + assertTrue(sum > 0); + + if (cnt % logFreq == 0) { + info("Reduced value: " + sum); + info("Executed queries: " + cnt); + } + } + + return null; + } + }, 1); + + fut1.get(); + + stop.set(true); + + fut2.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java index f48abb4..19b2d5a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedFieldsQuerySelfTest.java @@ -63,4 +63,53 @@ public class GridCachePartitionedFieldsQuerySelfTest extends GridCacheAbstractFi return cc; } + + /** + * @throws Exception If failed. + */ + public void testIncludeBackups() throws Exception { + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).internalCache(null).queries().createSqlFieldsQuery( + "select _KEY, name, age from Person"); + + qry.includeBackups(true); + + CacheQueryFuture<List<?>> fut = qry.execute(); + + List<List<?>> res = new ArrayList<>(fut.get()); + + assertNotNull("Result", res); + assertEquals("Result", res.size(), 6); + + Collections.sort(res, new Comparator<List<?>>() { + @Override public int compare(List<?> row1, List<?> row2) { + return ((Integer)row1.get(2)).compareTo((Integer)row2.get(2)); + } + }); + + int cnt = 0; + + for (List<?> row : res) { + assertEquals("Row size", 3, row.size()); + + if (cnt == 0 || cnt == 1) { + assertEquals("Key", new AffinityKey<>("p1", "o1"), row.get(0)); + assertEquals("Name", "John White", row.get(1)); + assertEquals("Age", 25, row.get(2)); + } + else if (cnt == 2 || cnt == 3) { + assertEquals("Key", new AffinityKey<>("p2", "o1"), row.get(0)); + assertEquals("Name", "Joe Black", row.get(1)); + assertEquals("Age", 35, row.get(2)); + } + else if (cnt == 4 || cnt == 5) { + assertEquals("Key", new AffinityKey<>("p3", "o2"), row.get(0)); + assertEquals("Name", "Mike Green", row.get(1)); + assertEquals("Age", 40, row.get(2)); + } + + cnt++; + } + + assertEquals("Result count", 6, cnt); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java index b09f92d..14b265c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java @@ -20,9 +20,10 @@ package org.apache.ignite.internal.processors.cache.reducefields; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; @@ -160,40 +161,35 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom * @throws Exception If failed. */ public void testNoDataInCache() throws Exception { - SqlFieldsQuery qry = new SqlFieldsQuery("select age from Person where orgId = 999"); + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)) + .getCache(null).queries().createSqlFieldsQuery("select age from Person where orgId = 999"); - Collection<List<?>> res = grid(0).cache(null).query(qry).getAll(); + Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get(); - IgniteBiTuple<Integer, Integer> redRes = F.reduce(res, new AverageRemoteReducer()); - - assertEquals("Result", 0, F.reduce(Collections.singleton(redRes), new AverageLocalReducer()).intValue()); + assertEquals("Result", 0, F.reduce(res, new AverageLocalReducer()).intValue()); } /** * @throws Exception If failed. */ public void testAverageQuery() throws Exception { - SqlFieldsQuery qry = new SqlFieldsQuery("select age from Person"); - - Collection<List<?>> res = grid(0).cache(null).query(qry).getAll(); + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).getCache(null).queries().createSqlFieldsQuery("select age from Person"); - IgniteBiTuple<Integer, Integer> redRes = F.reduce(res, new AverageRemoteReducer()); + Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get(); - assertEquals("Average", 33, F.reduce(Collections.singleton(redRes), new AverageLocalReducer()).intValue()); + assertEquals("Average", 33, F.reduce(res, new AverageLocalReducer()).intValue()); } /** * @throws Exception If failed. */ public void testAverageQueryWithArguments() throws Exception { - SqlFieldsQuery qry = new SqlFieldsQuery("select age from Person where orgId = ?"); - qry.setArgs(1); - - Collection<List<?>> res = grid(0).cache(null).query(qry).getAll(); + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).getCache(null).queries().createSqlFieldsQuery( + "select age from Person where orgId = ?"); - IgniteBiTuple<Integer, Integer> redRes = F.reduce(res, new AverageRemoteReducer()); + Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer(), 1).get(); - assertEquals("Average", 30, F.reduce(Collections.singleton(redRes), new AverageLocalReducer()).intValue()); + assertEquals("Average", 30, F.reduce(res, new AverageLocalReducer()).intValue()); } // /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java index de5093f..a5aee10 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java @@ -39,4 +39,21 @@ public class GridCacheReduceFieldsQueryPartitionedSelfTest extends GridCacheAbst @Override protected int gridCount() { return 3; } + + /** + * @throws Exception If failed. + */ + public void testIncludeBackups() throws Exception { + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).getCache(null).queries().createSqlFieldsQuery("select age from Person"); + + qry.includeBackups(true); + + int sum = 0; + + for (IgniteBiTuple<Integer, Integer> tuple : qry.execute(new AverageRemoteReducer()).get()) + sum += tuple.get1(); + + // One backup, so sum is two times greater + assertEquals("Sum", 200, sum); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/edbfcf70/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 6cfa35d..fe70c12 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -65,6 +65,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class); // suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); TODO IGNITE-484 + suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class); suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class); // Fields queries.