ignite-699

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b6633893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b6633893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b6633893

Branch: refs/heads/ignite-gg-9991
Commit: b6633893f50152507d42c796d1378329b092ff9e
Parents: 9c8160b
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Sun Apr 12 11:10:04 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Sun Apr 12 11:10:04 2015 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/SqlFieldsQuery.java      |  24 +++
 .../processors/query/h2/IgniteH2Indexing.java   |   2 +-
 .../query/h2/sql/GridSqlQuerySplitter.java      |  18 +-
 .../IgniteCacheColocatedQuerySelfTest.java      | 209 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   1 +
 5 files changed, 246 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 632942c..63b9b6e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -39,6 +39,9 @@ public final class SqlFieldsQuery extends Query<List<?>> {
     @GridToStringInclude
     private Object[] args;
 
+    /** */
+    private boolean colocated;
+
     /**
      * Constructs sql fields query.
      *
@@ -92,6 +95,27 @@ public final class SqlFieldsQuery extends Query<List<?>> {
         return this;
     }
 
+    /**
+     * Checks if this query colocated.
+     *
+     * @return {@code true} If the query is colocated.
+     */
+    public boolean isColocated() {
+        return colocated;
+    }
+
+    /**
+     * Sets flag defining if this query colocated.
+     *
+     * @param colocated Flag value.
+     * @return {@code this} For chaining.
+     */
+    public SqlFieldsQuery setColocated(boolean colocated) {
+        this.colocated = colocated;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public SqlFieldsQuery setPageSize(int pageSize) {
         return (SqlFieldsQuery)super.setPageSize(pageSize);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 6804f31..f1c10bc 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -786,7 +786,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         Collection<GridQueryFieldMetadata> meta;
 
         try {
-            twoStepQry = 
GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs());
+            twoStepQry = 
GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), 
qry.isColocated());
 
             meta = meta(stmt.getMetaData());
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 79c73f0..20344c0 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -71,9 +71,10 @@ public class GridSqlQuerySplitter {
     /**
      * @param stmt Prepared statement.
      * @param params Parameters.
+     * @param colocated Colocated query.
      * @return Two step query.
      */
-    public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, 
Object[] params) {
+    public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, 
Object[] params, boolean colocated) {
         if (params == null)
             params = GridCacheSqlQuery.EMPTY_PARAMS;
 
@@ -140,7 +141,7 @@ public class GridSqlQuerySplitter {
         boolean aggregateFound = false;
 
         for (int i = 0, len = mapExps.size(); i < len; i++) // Remember len 
because mapExps list can grow.
-            aggregateFound |= splitSelectExpression(mapExps, rdcExps, 
colNames, i);
+            aggregateFound |= splitSelectExpression(mapExps, rdcExps, 
colNames, i, colocated);
 
         // Fill select expressions.
         mapQry.clearSelect();
@@ -161,12 +162,14 @@ public class GridSqlQuerySplitter {
             for (int col : srcQry.groupColumns())
                 
mapQry.addGroupExpression(column(((GridSqlAlias)mapExps.get(col)).alias()));
 
-            for (int col : srcQry.groupColumns())
-                
rdcQry.addGroupExpression(column(((GridSqlAlias)mapExps.get(col)).alias()));
+            if (!colocated) {
+                for (int col : srcQry.groupColumns())
+                    
rdcQry.addGroupExpression(column(((GridSqlAlias)mapExps.get(col)).alias()));
+            }
         }
 
         // -- HAVING
-        if (srcQry.having() != null) {
+        if (srcQry.having() != null && !colocated) {
             // TODO Find aggregate functions in HAVING clause.
             rdcQry.whereAnd(column(columnName(srcQry.havingColumn())));
 
@@ -297,10 +300,11 @@ public class GridSqlQuerySplitter {
      * @param rdcSelect Selects for reduce query.
      * @param colNames Set of unique top level column names.
      * @param idx Index.
+     * @param colocated If it is a colocated query.
      * @return {@code true} If aggregate was found.
      */
     private static boolean splitSelectExpression(List<GridSqlElement> 
mapSelect, GridSqlElement[] rdcSelect,
-        Set<String> colNames, int idx) {
+        Set<String> colNames, int idx, boolean colocated) {
         GridSqlElement el = mapSelect.get(idx);
 
         GridSqlAlias alias = null;
@@ -312,7 +316,7 @@ public class GridSqlQuerySplitter {
             el = alias.child();
         }
 
-        if (el instanceof GridSqlAggregateFunction) {
+        if (!colocated && el instanceof GridSqlAggregateFunction) {
             aggregateFound = true;
 
             GridSqlAggregateFunction agg = (GridSqlAggregateFunction)el;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheColocatedQuerySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheColocatedQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheColocatedQuerySelfTest.java
new file mode 100644
index 0000000..b6051b7
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheColocatedQuerySelfTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ */
+public class IgniteCacheColocatedQuerySelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final String QRY =
+        "select productId, sum(price) s, count(1) c " +
+        "from Purchase " +
+        "group by productId " +
+        "having c > ? " +
+        "order by s desc, productId limit ? ";
+
+    /** */
+    private static final int PURCHASES = 1000;
+
+    /** */
+    private static final int PRODUCTS = 10;
+
+    /** */
+    private static final int MAX_PRICE = 5;
+
+    /** */
+    private static final long SEED = ThreadLocalRandom.current().nextLong();
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        CacheConfiguration<?,?> cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setSwapEnabled(false);
+        cacheCfg.setBackups(1);
+        cacheCfg.setIndexedTypes(
+            AffinityUuid.class, Purchase.class
+        );
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(3);
+
+        X.println("--> seed: " + SEED);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        ignite(0).cache(null).removeAll();
+    }
+
+    /**
+     * @param c Cache.
+     * @param colocated Colocated.
+     * @return Result.
+     */
+    private static List<List<?>> query(IgniteCache<AffinityUuid,Purchase> c, 
boolean colocated) {
+        return c.query(new SqlFieldsQuery(QRY).setArgs(30, 
5).setColocated(colocated)).getAll();
+    }
+
+    /**
+     * Correct affinity.
+     */
+    public void testColocatedQueryRight() {
+        IgniteCache<AffinityUuid,Purchase> c = ignite(0).cache(null);
+
+        Random rnd = new GridRandom(SEED);
+
+        for (int i = 0; i < PURCHASES; i++) {
+            Purchase p = new Purchase();
+
+            p.productId = rnd.nextInt(PRODUCTS);
+            p.price = rnd.nextInt(MAX_PRICE);
+
+            c.put(new AffinityUuid(p.productId), p); // Correct affinity.
+        }
+
+        List<List<?>> res1 = query(c, false);
+        List<List<?>> res2 = query(c, true);
+
+        X.println("res1: " + res1);
+        X.println("res2: " + res2);
+
+        assertFalse(res1.isEmpty());
+        assertEquals(res1.toString(), res2.toString()); // TODO fix type 
conversion issue
+    }
+
+    /**
+     * Correct affinity.
+     */
+    public void testColocatedQueryWrong() {
+        IgniteCache<AffinityUuid,Purchase> c = ignite(0).cache(null);
+
+        Random rnd = new GridRandom(SEED);
+
+        for (int i = 0; i < PURCHASES; i++) {
+            Purchase p = new Purchase();
+
+            p.productId = rnd.nextInt(PRODUCTS);
+            p.price = rnd.nextInt(MAX_PRICE);
+
+            c.put(new AffinityUuid(rnd.nextInt(PRODUCTS)), p); // Random 
affinity.
+        }
+
+        List<List<?>> res1 = query(c, false);
+        List<List<?>> res2 = query(c, true);
+
+        X.println("res1: " + res1);
+        X.println("res2: " + res2);
+
+        assertFalse(res1.isEmpty());
+        assertFalse(res1.equals(res2));
+    }
+
+    /**
+     *
+     */
+    private static class Purchase implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @QuerySqlField
+        int productId;
+
+        /** */
+        @QuerySqlField
+        int price;
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Purchase purchase = (Purchase)o;
+
+            return productId == purchase.productId && price == purchase.price;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = productId;
+
+            result = 31 * result + price;
+
+            return result;
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b6633893/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index d81d766..08f6ed2 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -55,6 +55,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         
suite.addTestSuite(IgniteCachePartitionedQueryP2PDisabledSelfTest.class);
         
suite.addTestSuite(IgniteCachePartitionedQueryMultiThreadedSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryIndexSelfTest.class);
+        suite.addTestSuite(IgniteCacheColocatedQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheLargeResultSelfTest.class);
         suite.addTestSuite(GridCacheQueryInternalKeysSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryMultiThreadedSelfTest.class);

Reply via email to