http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java index 0000000,88183c7..74d54af mode 000000,100644..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java @@@ -1,0 -1,197 +1,197 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache; + + import org.apache.ignite.*; + import org.apache.ignite.cache.query.*; + import org.apache.ignite.cache.query.annotations.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.*; ++import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.*; + import org.apache.ignite.testframework.junits.common.*; + + import javax.cache.*; + import java.io.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; + import static org.apache.ignite.cache.CacheMode.*; + + /** + * + */ + public class IgniteCacheSqlQueryMultiThreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration<?,?> ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); - ccfg.setDistributionMode(PARTITIONED_ONLY); ++ ccfg.setNearConfiguration(null); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setIndexedTypes( + Integer.class, Person.class + ); + + c.setCacheConfiguration(ccfg); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(2); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testQuery() throws Exception { + final IgniteCache<Integer, Person> cache = grid(0).jcache(null); + + cache.clear(); + + for (int i = 0; i < 2000; i++) + cache.put(i, new Person(i)); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < 100; i++) { + QueryCursor<Cache.Entry<Integer, Person>> qry = + cache.query(new SqlQuery("Person", "age >= 0")); + + int cnt = 0; + + for (Cache.Entry<Integer, Person> e : qry) + cnt++; + + assertEquals(2000, cnt); + } + + return null; + } + }, 16, "test"); + } + + /** + * Test put and parallel query. + * @throws Exception If failed. + */ + public void testQueryPut() throws Exception { + final IgniteCache<Integer, Person> cache = grid(0).jcache(null); + + cache.clear(); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Random rnd = new GridRandom(); + + while (!stop.get()) { + List<List<?>> res = cache.queryFields( + new SqlFieldsQuery("select avg(age) from Person where age > 0")).getAll(); + + assertEquals(1, res.size()); + + if (res.get(0).get(0) == null) + continue; + + int avgAge = ((Number)res.get(0).get(0)).intValue(); + + if (rnd.nextInt(300) == 0) + X.println("__ " + avgAge); + } + + return null; + } + }, 20); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Random rnd = new GridRandom(); + Random age = new GridRandom(); + + while (!stop.get()) + cache.put(rnd.nextInt(2000), new Person(age.nextInt(3000) - 1000)); + + return null; + } + }, 20); + + Thread.sleep(30 * 1000); + + stop.set(true); + + fut2.get(10 * 1000); + fut1.get(10 * 1000); + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + @QuerySqlField(index = true) + private int age; + + /** + * @param age Age. + */ + Person(int age) { + this.age = age; + } + + /** + * @return Age/ + */ + public int age() { + return age; + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicFieldsQuerySelfTest.java index 0000000,406814e..2311139 mode 000000,100644..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicFieldsQuerySelfTest.java @@@ -1,0 -1,59 +1,59 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.near; + + import org.apache.ignite.cache.*; ++import org.apache.ignite.configuration.*; + import org.apache.ignite.cache.query.*; + import org.apache.ignite.internal.util.typedef.*; + + import java.util.*; + + import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; + + /** + * Tests for fields queries. + */ + public class IgniteCacheAtomicFieldsQuerySelfTest extends IgniteCachePartitionedFieldsQuerySelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ - @Override protected CacheDistributionMode distributionMode() { - return PARTITIONED_ONLY; ++ @Override protected NearCacheConfiguration nearConfiguration() { ++ return null; + } + + /** + * + */ + public void testUnsupportedOperations() { + try { + QueryCursor<List<?>> qry = grid(0).jcache(null).queryFields(new SqlFieldsQuery( + "update Person set name = ?").setArgs("Mary Poppins")); + + qry.getAll(); + + fail("We don't support updates."); + } + catch (Exception e) { + X.println("___ " + e.getMessage()); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.java index 0000000,b75c0a7..d5e02c2 mode 000000,100644..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.java @@@ -1,0 -1,32 +1,30 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.near; + -import org.apache.ignite.cache.*; - -import static org.apache.ignite.cache.CacheDistributionMode.*; ++import org.apache.ignite.configuration.*; + + /** + * Tests for atomic cache with near cache enabled. + */ + public class IgniteCacheAtomicNearEnabledFieldsQuerySelfTest extends IgniteCacheAtomicFieldsQuerySelfTest { + /** {@inheritDoc} */ - @Override protected CacheDistributionMode distributionMode() { - return NEAR_PARTITIONED; ++ @Override protected NearCacheConfiguration nearConfiguration() { ++ return new NearCacheConfiguration(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java index 0000000,a804d39..1518e09 mode 000000,100644..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java @@@ -1,0 -1,217 +1,216 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.near; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.cache.query.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + + import javax.cache.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static org.apache.ignite.cache.CacheAtomicityMode.*; + import static org.apache.ignite.cache.CacheDistributionMode.*; + import static org.apache.ignite.cache.CacheMode.*; + + /** + * Test for distributed queries with node restarts. + */ + public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final int KEY_CNT = 1000; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 90 * 1000; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(NEAR_PARTITIONED); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testRestarts() throws Exception { + int duration = 60 * 1000; + int qryThreadNum = 10; + final long nodeLifeTime = 2 * 1000; + final int logFreq = 20; + + final IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + assert cache != null; + + for (int i = 0; i < KEY_CNT; i++) + cache.put(i, i); + + assertEquals(KEY_CNT, cache.localSize()); + + final AtomicInteger qryCnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (!done.get()) { + Collection<Cache.Entry<Integer, Integer>> res = + cache.query(new SqlQuery(Integer.class, "_val >= 0")).getAll(); + + assertFalse(res.isEmpty()); + + int c = qryCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Executed queries: " + c); + } + } + }, qryThreadNum); + + final AtomicInteger restartCnt = new AtomicInteger(); + + CollectingEventListener lsnr = new CollectingEventListener(); + + for (int i = 0; i < GRID_CNT; i++) + grid(i).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_STOPPED); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { + @SuppressWarnings({"BusyWait"}) + @Override public Object call() throws Exception { + while (!done.get()) { + int idx = GRID_CNT; + + startGrid(idx); + + Thread.sleep(nodeLifeTime); + + stopGrid(idx); + + int c = restartCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Node restarts: " + c); + } + + return true; + } + }, 1); + + Thread.sleep(duration); + + done.set(true); + + fut1.get(); + fut2.get(); + + info("Awaiting rebalance events [restartCnt=" + restartCnt.get() + ']'); + + boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000); + + for (int i = 0; i < GRID_CNT; i++) + grid(i).events().stopLocalListen(lsnr, EventType.EVT_CACHE_REBALANCE_STOPPED); + + assert success; + } + + /** Listener that will wait for specified number of events received. */ + private class CollectingEventListener implements IgnitePredicate<Event> { + /** Registered events count. */ + private int evtCnt; + + /** {@inheritDoc} */ + @Override public synchronized boolean apply(Event evt) { + evtCnt++; + + info("Processed event [evt=" + evt + ", evtCnt=" + evtCnt + ']'); + + notifyAll(); + + return true; + } + + /** + * Waits until total number of events processed is equal or greater then argument passed. + * + * @param cnt Number of events to wait. + * @param timeout Timeout to wait. + * @return {@code True} if successfully waited, {@code false} if timeout happened. + * @throws InterruptedException If thread is interrupted. + */ + public synchronized boolean awaitEvents(int cnt, long timeout) throws InterruptedException { + long start = U.currentTimeMillis(); + + long now = start; + + while (start + timeout > now) { + if (evtCnt >= cnt) + return true; + + wait(start + timeout - now); + + now = U.currentTimeMillis(); + } + + return false; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java index 0000000,e7fdb3c..b79b9df mode 000000,100644..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java @@@ -1,0 -1,414 +1,413 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.reducefields; + + import org.apache.ignite.cache.*; + import org.apache.ignite.cache.affinity.*; + import org.apache.ignite.cache.query.annotations.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.marshaller.optimized.*; + import org.apache.ignite.spi.discovery.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.junits.common.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.util.*; + + import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; + import static org.apache.ignite.cache.CacheMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; + import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + + /** + * Tests for reduce fields queries. + */ + public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Flag indicating if starting node should have cache. */ + protected boolean hasCache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (hasCache) + cfg.setCacheConfiguration(cache(null)); + else + cfg.setCacheConfiguration(); + + cfg.setDiscoverySpi(discovery()); + cfg.setMarshaller(new OptimizedMarshaller(false)); + + return cfg; + } + + /** + * @return Distribution. + */ - protected CacheDistributionMode distributionMode() { - return NEAR_PARTITIONED; ++ protected NearCacheConfiguration nearConfiguration() { ++ return new NearCacheConfiguration(); + } + + /** + * @param name Cache name. + * @return Cache. + */ + private CacheConfiguration cache(@Nullable String name) { + CacheConfiguration<?,?> cache = defaultCacheConfiguration(); + + cache.setName(name); + cache.setCacheMode(cacheMode()); + cache.setAtomicityMode(atomicityMode()); - cache.setDistributionMode(distributionMode()); ++ cache.setNearConfiguration(nearConfiguration()); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setRebalanceMode(SYNC); + cache.setIndexedTypes( + String.class, Organization.class, + CacheAffinityKey.class, Person.class + ); + + if (cacheMode() == PARTITIONED) + cache.setBackups(1); + + return cache; + } + + /** + * @return Discovery SPI. + */ + private static DiscoverySpi discovery() { + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + return spi; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + hasCache = true; + + startGridsMultiThreaded(gridCount()); + + hasCache = false; + + startGrid(gridCount()); + + GridCache<String, Organization> orgCache = ((IgniteKernal)grid(0)).cache(null); + + assert orgCache != null; + + assert orgCache.putx("o1", new Organization(1, "A")); + assert orgCache.putx("o2", new Organization(2, "B")); + + GridCache<CacheAffinityKey<String>, Person> personCache = ((IgniteKernal)grid(0)).cache(null); + + assert personCache != null; + + assert personCache.putx(new CacheAffinityKey<>("p1", "o1"), new Person("John White", 25, 1)); + assert personCache.putx(new CacheAffinityKey<>("p2", "o1"), new Person("Joe Black", 35, 1)); + assert personCache.putx(new CacheAffinityKey<>("p3", "o2"), new Person("Mike Green", 40, 2)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @return cache mode. + */ + protected abstract CacheMode cacheMode(); + + /** + * @return Number of grids to start. + */ + protected abstract int gridCount(); + + /** + * @return Cache atomicity mode. + */ + protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** + * @throws Exception If failed. + */ + public void testNoDataInCache() throws Exception { + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)) + .cache(null).queries().createSqlFieldsQuery("select age from Person where orgId = 999"); + + Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get(); + + assertEquals("Result", 0, F.reduce(res, new AverageLocalReducer()).intValue()); + } + + /** + * @throws Exception If failed. + */ + public void testAverageQuery() throws Exception { + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).cache(null).queries().createSqlFieldsQuery("select age from Person"); + + Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get(); + + assertEquals("Average", 33, F.reduce(res, new AverageLocalReducer()).intValue()); + } + + /** + * @throws Exception If failed. + */ + public void testAverageQueryWithArguments() throws Exception { + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).cache(null).queries().createSqlFieldsQuery( + "select age from Person where orgId = ?"); + + Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer(), 1).get(); + + assertEquals("Average", 30, F.reduce(res, new AverageLocalReducer()).intValue()); + } + + // /** + // * @throws Exception If failed. + // */ + // public void testFilters() throws Exception { + // GridCacheReduceFieldsQuery<Object, Object, GridBiTuple<Integer, Integer>, Integer> qry = ((IgniteKernal)grid(0)).cache(null) + // .queries().createReduceFieldsQuery("select age from Person"); + // + // qry = qry.remoteKeyFilter( + // new GridPredicate<Object>() { + // @Override public boolean apply(Object e) { + // return !"p2".equals(((CacheAffinityKey)e).key()); + // } + // } + // ).remoteValueFilter( + // new P1<Object>() { + // @Override public boolean apply(Object e) { + // return !"Mike Green".equals(((Person)e).name); + // } + // } + // ); + // + // qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new AverageLocalReducer()); + // + // Integer avg = qry.reduce().get(); + // + // assertNotNull("Average", avg); + // assertEquals("Average", 25, avg.intValue()); + // } + + // /** + // * @throws Exception If failed. + // */ + // public void testOnProjectionWithFilter() throws Exception { + // P2<CacheAffinityKey<String>, Person> p = new P2<CacheAffinityKey<String>, Person>() { + // @Override public boolean apply(CacheAffinityKey<String> key, Person val) { + // return val.orgId == 1; + // } + // }; + // + // CacheProjection<CacheAffinityKey<String>, Person> cachePrj = + // grid(0).<CacheAffinityKey<String>, Person>cache(null).projection(p); + // + // GridCacheReduceFieldsQuery<CacheAffinityKey<String>, Person, GridBiTuple<Integer, Integer>, Integer> qry = + // cachePrj.queries().createReduceFieldsQuery("select age from Person"); + // + // qry = qry.remoteValueFilter( + // new P1<Person>() { + // @Override public boolean apply(Person e) { + // return !"Joe Black".equals(e.name); + // } + // }); + // + // qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new AverageLocalReducer()); + // + // Integer avg = qry.reduce().get(); + // + // assertNotNull("Average", avg); + // assertEquals("Average", 25, avg.intValue()); + // } + + /** + * @return true if cache mode is replicated, false otherwise. + */ + private boolean isReplicatedMode() { + return cacheMode() == REPLICATED; + } + + /** + * Person. + */ + @SuppressWarnings("UnusedDeclaration") + private static class Person implements Serializable { + /** Name. */ + @QuerySqlField(index = false) + private final String name; + + /** Age. */ + @QuerySqlField(index = true) + private final int age; + + /** Organization ID. */ + @QuerySqlField(index = true) + private final int orgId; + + /** + * @param name Name. + * @param age Age. + * @param orgId Organization ID. + */ + private Person(String name, int age, int orgId) { + assert !F.isEmpty(name); + assert age > 0; + assert orgId > 0; + + this.name = name; + this.age = age; + this.orgId = orgId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Person person = (Person)o; + + return age == person.age && orgId == person.orgId && name.equals(person.name); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = name.hashCode(); + + res = 31 * res + age; + res = 31 * res + orgId; + + return res; + } + } + + /** + * Organization. + */ + @SuppressWarnings("UnusedDeclaration") + private static class Organization implements Serializable { + /** ID. */ + @QuerySqlField + private final int id; + + /** Name. */ + @QuerySqlField(index = false) + private final String name; + + /** + * @param id ID. + * @param name Name. + */ + private Organization(int id, String name) { + assert id > 0; + assert !F.isEmpty(name); + + this.id = id; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Organization that = (Organization)o; + + return id == that.id && name.equals(that.name); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = id; + + res = 31 * res + name.hashCode(); + + return res; + } + } + + /** + * Average remote reducer factory. + */ + protected static class AverageRemoteReducer implements IgniteReducer<List<?>, IgniteBiTuple<Integer, Integer>> { + /** */ + private int sum; + + /** */ + private int cnt; + + @Override public boolean collect(List<?> e) { + sum += (Integer)e.get(0); + + cnt++; + + return true; + } + + @Override public IgniteBiTuple<Integer, Integer> reduce() { + return F.t(sum, cnt); + } + } + + /** + * Average local reducer factory. + */ + protected static class AverageLocalReducer implements IgniteReducer<IgniteBiTuple<Integer, Integer>, Integer> { + /** */ + private int sum; + + /** */ + private int cnt; + + @Override public boolean collect(IgniteBiTuple<Integer, Integer> t) { + sum += t.get1(); + cnt += t.get2(); + + return true; + } + + @Override public Integer reduce() { + return cnt == 0 ? 0 : sum / cnt; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java index 0000000,fd93a3e..ee463b6 mode 000000,100644..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java @@@ -1,0 -1,38 +1,38 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.reducefields; + + import org.apache.ignite.cache.*; ++import org.apache.ignite.configuration.*; + + import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; + + /** + * Reduce fields queries tests for partitioned cache. + */ + public class GridCacheReduceFieldsQueryAtomicSelfTest extends GridCacheReduceFieldsQueryPartitionedSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ - @Override protected CacheDistributionMode distributionMode() { - return PARTITIONED_ONLY; ++ @Override protected NearCacheConfiguration nearConfiguration() { ++ return null; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java index e3fc936,f7e50ad..ed729ad --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java @@@ -73,17 -70,15 +70,15 @@@ public class GridQueryParsingTest exten cc.setCacheMode(CacheMode.PARTITIONED); cc.setAtomicityMode(CacheAtomicityMode.ATOMIC); - cc.setDistributionMode(PARTITIONED_ONLY); + cc.setNearConfiguration(null); cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setPreloadMode(SYNC); + cc.setRebalanceMode(SYNC); cc.setSwapEnabled(false); - - CacheQueryConfiguration qcfg = new CacheQueryConfiguration(); - - qcfg.setIndexPrimitiveKey(true); - qcfg.setIndexFixedTyping(true); - - cc.setQueryConfiguration(qcfg); + cc.setSqlFunctionClasses(GridQueryParsingTest.class); + cc.setIndexedTypes( + String.class, Address.class, + String.class, Person.class + ); c.setCacheConfiguration(cc); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java ----------------------------------------------------------------------