ignite-699
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b6633893 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b6633893 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b6633893 Branch: refs/heads/ignite-gg-9991 Commit: b6633893f50152507d42c796d1378329b092ff9e Parents: 9c8160b Author: S.Vladykin <svlady...@gridgain.com> Authored: Sun Apr 12 11:10:04 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Sun Apr 12 11:10:04 2015 +0300 ---------------------------------------------------------------------- .../ignite/cache/query/SqlFieldsQuery.java | 24 +++ .../processors/query/h2/IgniteH2Indexing.java | 2 +- .../query/h2/sql/GridSqlQuerySplitter.java | 18 +- .../IgniteCacheColocatedQuerySelfTest.java | 209 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 1 + 5 files changed, 246 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index 632942c..63b9b6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -39,6 +39,9 @@ public final class SqlFieldsQuery extends Query<List<?>> { @GridToStringInclude private Object[] args; + /** */ + private boolean colocated; + /** * Constructs sql fields query. * @@ -92,6 +95,27 @@ public final class SqlFieldsQuery extends Query<List<?>> { return this; } + /** + * Checks if this query colocated. + * + * @return {@code true} If the query is colocated. + */ + public boolean isColocated() { + return colocated; + } + + /** + * Sets flag defining if this query colocated. + * + * @param colocated Flag value. + * @return {@code this} For chaining. + */ + public SqlFieldsQuery setColocated(boolean colocated) { + this.colocated = colocated; + + return this; + } + /** {@inheritDoc} */ @Override public SqlFieldsQuery setPageSize(int pageSize) { return (SqlFieldsQuery)super.setPageSize(pageSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 6804f31..f1c10bc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -786,7 +786,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { Collection<GridQueryFieldMetadata> meta; try { - twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs()); + twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), qry.isColocated()); meta = meta(stmt.getMetaData()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 79c73f0..20344c0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -71,9 +71,10 @@ public class GridSqlQuerySplitter { /** * @param stmt Prepared statement. * @param params Parameters. + * @param colocated Colocated query. * @return Two step query. */ - public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, Object[] params) { + public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, Object[] params, boolean colocated) { if (params == null) params = GridCacheSqlQuery.EMPTY_PARAMS; @@ -140,7 +141,7 @@ public class GridSqlQuerySplitter { boolean aggregateFound = false; for (int i = 0, len = mapExps.size(); i < len; i++) // Remember len because mapExps list can grow. - aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i); + aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i, colocated); // Fill select expressions. mapQry.clearSelect(); @@ -161,12 +162,14 @@ public class GridSqlQuerySplitter { for (int col : srcQry.groupColumns()) mapQry.addGroupExpression(column(((GridSqlAlias)mapExps.get(col)).alias())); - for (int col : srcQry.groupColumns()) - rdcQry.addGroupExpression(column(((GridSqlAlias)mapExps.get(col)).alias())); + if (!colocated) { + for (int col : srcQry.groupColumns()) + rdcQry.addGroupExpression(column(((GridSqlAlias)mapExps.get(col)).alias())); + } } // -- HAVING - if (srcQry.having() != null) { + if (srcQry.having() != null && !colocated) { // TODO Find aggregate functions in HAVING clause. rdcQry.whereAnd(column(columnName(srcQry.havingColumn()))); @@ -297,10 +300,11 @@ public class GridSqlQuerySplitter { * @param rdcSelect Selects for reduce query. * @param colNames Set of unique top level column names. * @param idx Index. + * @param colocated If it is a colocated query. * @return {@code true} If aggregate was found. */ private static boolean splitSelectExpression(List<GridSqlElement> mapSelect, GridSqlElement[] rdcSelect, - Set<String> colNames, int idx) { + Set<String> colNames, int idx, boolean colocated) { GridSqlElement el = mapSelect.get(idx); GridSqlAlias alias = null; @@ -312,7 +316,7 @@ public class GridSqlQuerySplitter { el = alias.child(); } - if (el instanceof GridSqlAggregateFunction) { + if (!colocated && el instanceof GridSqlAggregateFunction) { aggregateFound = true; GridSqlAggregateFunction agg = (GridSqlAggregateFunction)el; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheColocatedQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheColocatedQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheColocatedQuerySelfTest.java new file mode 100644 index 0000000..b6051b7 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheColocatedQuerySelfTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + */ +public class IgniteCacheColocatedQuerySelfTest extends GridCommonAbstractTest { + /** */ + private static final String QRY = + "select productId, sum(price) s, count(1) c " + + "from Purchase " + + "group by productId " + + "having c > ? " + + "order by s desc, productId limit ? "; + + /** */ + private static final int PURCHASES = 1000; + + /** */ + private static final int PRODUCTS = 10; + + /** */ + private static final int MAX_PRICE = 5; + + /** */ + private static final long SEED = ThreadLocalRandom.current().nextLong(); + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration<?,?> cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setSwapEnabled(false); + cacheCfg.setBackups(1); + cacheCfg.setIndexedTypes( + AffinityUuid.class, Purchase.class + ); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + + X.println("--> seed: " + SEED); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + ignite(0).cache(null).removeAll(); + } + + /** + * @param c Cache. + * @param colocated Colocated. + * @return Result. + */ + private static List<List<?>> query(IgniteCache<AffinityUuid,Purchase> c, boolean colocated) { + return c.query(new SqlFieldsQuery(QRY).setArgs(30, 5).setColocated(colocated)).getAll(); + } + + /** + * Correct affinity. + */ + public void testColocatedQueryRight() { + IgniteCache<AffinityUuid,Purchase> c = ignite(0).cache(null); + + Random rnd = new GridRandom(SEED); + + for (int i = 0; i < PURCHASES; i++) { + Purchase p = new Purchase(); + + p.productId = rnd.nextInt(PRODUCTS); + p.price = rnd.nextInt(MAX_PRICE); + + c.put(new AffinityUuid(p.productId), p); // Correct affinity. + } + + List<List<?>> res1 = query(c, false); + List<List<?>> res2 = query(c, true); + + X.println("res1: " + res1); + X.println("res2: " + res2); + + assertFalse(res1.isEmpty()); + assertEquals(res1.toString(), res2.toString()); // TODO fix type conversion issue + } + + /** + * Correct affinity. + */ + public void testColocatedQueryWrong() { + IgniteCache<AffinityUuid,Purchase> c = ignite(0).cache(null); + + Random rnd = new GridRandom(SEED); + + for (int i = 0; i < PURCHASES; i++) { + Purchase p = new Purchase(); + + p.productId = rnd.nextInt(PRODUCTS); + p.price = rnd.nextInt(MAX_PRICE); + + c.put(new AffinityUuid(rnd.nextInt(PRODUCTS)), p); // Random affinity. + } + + List<List<?>> res1 = query(c, false); + List<List<?>> res2 = query(c, true); + + X.println("res1: " + res1); + X.println("res2: " + res2); + + assertFalse(res1.isEmpty()); + assertFalse(res1.equals(res2)); + } + + /** + * + */ + private static class Purchase implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @QuerySqlField + int productId; + + /** */ + @QuerySqlField + int price; + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + + if (o == null || getClass() != o.getClass()) return false; + + Purchase purchase = (Purchase)o; + + return productId == purchase.productId && price == purchase.price; + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = productId; + + result = 31 * result + price; + + return result; + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index d81d766..08f6ed2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -55,6 +55,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCachePartitionedQueryP2PDisabledSelfTest.class); suite.addTestSuite(IgniteCachePartitionedQueryMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheQueryIndexSelfTest.class); + suite.addTestSuite(IgniteCacheColocatedQuerySelfTest.class); suite.addTestSuite(IgniteCacheLargeResultSelfTest.class); suite.addTestSuite(GridCacheQueryInternalKeysSelfTest.class); suite.addTestSuite(IgniteCacheQueryMultiThreadedSelfTest.class);