http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c4e3222/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java index 09bbf22,0000000..fc88c74 mode 100644,000000..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java @@@ -1,328 -1,0 +1,338 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + - import javax.cache.*; +import javax.cache.configuration.*; ++import javax.cache.integration.*; ++import java.io.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Test that entries are indexed on load/reload methods. + */ +public class IgniteCacheQueryLoadSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Puts count. */ + private static final int PUT_CNT = 10; + + /** Store map. */ + private static final Map<Integer, ValueObject> STORE_MAP = new HashMap<>(); + + /** */ + public IgniteCacheQueryLoadSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(REPLICATED); + ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setLoadPreviousValue(true); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setIndexedTypes( + Integer.class, ValueObject.class + ); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + cache().removeAll(); + + assert cache().isEmpty(); + assert size(ValueObject.class) == 0; + + STORE_MAP.clear(); + } + + /** + * Number of objects of given type in index. + * + * @param cls Value type. + * @return Objects number. + * @throws IgniteCheckedException If failed. + */ + private long size(Class<?> cls) throws IgniteCheckedException { + GridCacheQueryManager<Object, Object> qryMgr = ((IgniteKernal)grid()).internalCache().context().queries(); + + assert qryMgr != null; + + return qryMgr.size(cls); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCache() throws Exception { + IgniteCache<Integer, ValueObject> cache = grid().jcache(null); + + cache.loadCache(null); + + assertEquals(PUT_CNT, cache.size()); + + Collection<Cache.Entry<Integer, ValueObject>> res = + cache.query(new SqlQuery(ValueObject.class, "val >= 0")).getAll(); + + assertNotNull(res); + assertEquals(PUT_CNT, res.size()); + assertEquals(PUT_CNT, size(ValueObject.class)); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheAsync() throws Exception { + IgniteCache<Integer, ValueObject> cache = grid().jcache(null); + + IgniteCache<Integer, ValueObject> asyncCache = cache.withAsync(); + + asyncCache.loadCache(null, 0); + + asyncCache.future().get(); + + assert cache.size() == PUT_CNT; + + Collection<Cache.Entry<Integer, ValueObject>> res = + cache.query(new SqlQuery(ValueObject.class, "val >= 0")).getAll(); + + assert res != null; + assert res.size() == PUT_CNT; + assert size(ValueObject.class) == PUT_CNT; + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheFiltered() throws Exception { + IgniteCache<Integer, ValueObject> cache = grid().jcache(null); + - cache.loadCache(new P2<Integer, ValueObject>() { - @Override public boolean apply(Integer key, ValueObject val) { ++ cache.loadCache(new P2<Integer,ValueObject>() { ++ @Override ++ public boolean apply(Integer key, ValueObject val) { + return key >= 5; + } + }); + + assert cache.size() == PUT_CNT - 5; + + Collection<Cache.Entry<Integer, ValueObject>> res = + cache.query(new SqlQuery(ValueObject.class, "val >= 0")).getAll(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheAsyncFiltered() throws Exception { + IgniteCache<Integer, ValueObject> cache = grid().jcache(null); + + IgniteCache<Integer, ValueObject> asyncCache = cache.withAsync(); + + asyncCache.loadCache(new P2<Integer, ValueObject>() { + @Override + public boolean apply(Integer key, ValueObject val) { + return key >= 5; + } + }, 0); + + asyncCache.future().get(); + + assert cache.localSize() == PUT_CNT - 5; + + Collection<Cache.Entry<Integer, ValueObject>> res = + cache.query(new SqlQuery(ValueObject.class, "val >= 0")).getAll(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * @throws Exception If failed. + */ + public void testReloadAsync() throws Exception { + STORE_MAP.put(1, new ValueObject(1)); + + GridCache<Integer, ValueObject> cache = cache(); + + assert cache.reloadAsync(1).get().value() == 1; + + assert cache.size() == 1; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == 1; + assert size(ValueObject.class) == 1; + } + + /** + * @throws Exception If failed. + */ + public void testReloadAll() throws Exception { + for (int i = 0; i < PUT_CNT; i++) + STORE_MAP.put(i, new ValueObject(i)); + + GridCache<Integer, ValueObject> cache = cache(); + + Integer[] keys = new Integer[PUT_CNT - 5]; + + for (int i = 0; i < PUT_CNT - 5; i++) + keys[i] = i + 5; + - cache.reloadAll(F.asList(keys)); ++ CompletionListenerFuture fut = new CompletionListenerFuture(); ++ ++ grid().<Integer, Integer>jcache(null).loadAll(F.asSet(keys), true, fut); ++ ++ fut.get(); + + assert cache.size() == PUT_CNT - 5; + + Collection<Map.Entry<Integer, ValueObject>> res = + cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + + cache.clear(); + + assert cache.isEmpty(); + assertEquals(0, cache.size()); + - cache.reloadAll(Arrays.asList(keys)); ++ fut = new CompletionListenerFuture(); ++ ++ grid().<Integer, Integer>jcache(null).loadAll(F.asSet(keys), true, fut); ++ ++ fut.get(); + + assertEquals(PUT_CNT - 5, cache.size()); + + res = cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get(); + + assert res != null; + assert res.size() == PUT_CNT - 5; + assert size(ValueObject.class) == PUT_CNT - 5; + } + + /** + * Test store. + */ + private static class TestStore extends CacheStoreAdapter<Integer, ValueObject> { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, ValueObject> clo, @Nullable Object... args) { + assert clo != null; + + for (int i = 0; i < PUT_CNT; i++) + clo.apply(i, new ValueObject(i)); + } + + /** {@inheritDoc} */ + @Override public ValueObject load(Integer key) { + assert key != null; + + return STORE_MAP.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(javax.cache.Cache.Entry<? extends Integer, ? extends ValueObject> e) { + assert e != null; + assert e.getKey() != null; + assert e.getValue() != null; + + STORE_MAP.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + assert key != null; + + STORE_MAP.remove(key); + } + } + + /** + * Value object class. + */ - private static class ValueObject { ++ private static class ValueObject implements Serializable { + /** Value. */ + @QuerySqlField + private final int val; + + /** + * @param val Value. + */ + ValueObject(int val) { + this.val = val; + } + + /** + * @return Value. + */ + int value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ValueObject.class, this); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c4e3222/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java index 3956a82,0000000..a804d39 mode 100644,000000..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java @@@ -1,217 -1,0 +1,217 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Test for distributed queries with node restarts. + */ +public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final int KEY_CNT = 1000; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 90 * 1000; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testRestarts() throws Exception { + int duration = 60 * 1000; + int qryThreadNum = 10; + final long nodeLifeTime = 2 * 1000; + final int logFreq = 20; + + final IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + assert cache != null; + + for (int i = 0; i < KEY_CNT; i++) + cache.put(i, i); + + assertEquals(KEY_CNT, cache.localSize()); + + final AtomicInteger qryCnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (!done.get()) { + Collection<Cache.Entry<Integer, Integer>> res = + cache.query(new SqlQuery(Integer.class, "_val >= 0")).getAll(); + + assertFalse(res.isEmpty()); + + int c = qryCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Executed queries: " + c); + } + } + }, qryThreadNum); + + final AtomicInteger restartCnt = new AtomicInteger(); + + CollectingEventListener lsnr = new CollectingEventListener(); + + for (int i = 0; i < GRID_CNT; i++) - grid(i).events().localListen(lsnr, EventType.EVT_CACHE_PRELOAD_STOPPED); ++ grid(i).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_STOPPED); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { + @SuppressWarnings({"BusyWait"}) + @Override public Object call() throws Exception { + while (!done.get()) { + int idx = GRID_CNT; + + startGrid(idx); + + Thread.sleep(nodeLifeTime); + + stopGrid(idx); + + int c = restartCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Node restarts: " + c); + } + + return true; + } + }, 1); + + Thread.sleep(duration); + + done.set(true); + + fut1.get(); + fut2.get(); + - info("Awaiting preload events [restartCnt=" + restartCnt.get() + ']'); ++ info("Awaiting rebalance events [restartCnt=" + restartCnt.get() + ']'); + + boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000); + + for (int i = 0; i < GRID_CNT; i++) - grid(i).events().stopLocalListen(lsnr, EventType.EVT_CACHE_PRELOAD_STOPPED); ++ grid(i).events().stopLocalListen(lsnr, EventType.EVT_CACHE_REBALANCE_STOPPED); + + assert success; + } + + /** Listener that will wait for specified number of events received. */ + private class CollectingEventListener implements IgnitePredicate<Event> { + /** Registered events count. */ + private int evtCnt; + + /** {@inheritDoc} */ + @Override public synchronized boolean apply(Event evt) { + evtCnt++; + + info("Processed event [evt=" + evt + ", evtCnt=" + evtCnt + ']'); + + notifyAll(); + + return true; + } + + /** + * Waits until total number of events processed is equal or greater then argument passed. + * + * @param cnt Number of events to wait. + * @param timeout Timeout to wait. + * @return {@code True} if successfully waited, {@code false} if timeout happened. + * @throws InterruptedException If thread is interrupted. + */ + public synchronized boolean awaitEvents(int cnt, long timeout) throws InterruptedException { + long start = U.currentTimeMillis(); + + long now = start; + + while (start + timeout > now) { + if (evtCnt >= cnt) + return true; + + wait(start + timeout - now); + + now = U.currentTimeMillis(); + } + + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c4e3222/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java index da71d14,0000000..e7fdb3c mode 100644,000000..100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java @@@ -1,433 -1,0 +1,414 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.reducefields; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.typedef.*; ++import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; ++import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Tests for reduce fields queries. + */ +public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Flag indicating if starting node should have cache. */ + protected boolean hasCache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (hasCache) + cfg.setCacheConfiguration(cache(null)); + else + cfg.setCacheConfiguration(); + + cfg.setDiscoverySpi(discovery()); + cfg.setMarshaller(new OptimizedMarshaller(false)); + + return cfg; + } + + /** + * @return Distribution. + */ + protected CacheDistributionMode distributionMode() { + return NEAR_PARTITIONED; + } + + /** + * @param name Cache name. + * @return Cache. + */ + private CacheConfiguration cache(@Nullable String name) { + CacheConfiguration<?,?> cache = defaultCacheConfiguration(); + + cache.setName(name); + cache.setCacheMode(cacheMode()); + cache.setAtomicityMode(atomicityMode()); + cache.setDistributionMode(distributionMode()); + cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setPreloadMode(SYNC); ++ cache.setRebalanceMode(SYNC); + cache.setIndexedTypes( + String.class, Organization.class, + CacheAffinityKey.class, Person.class + ); + + if (cacheMode() == PARTITIONED) + cache.setBackups(1); + + return cache; + } + + /** + * @return Discovery SPI. + */ + private static DiscoverySpi discovery() { + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + return spi; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + hasCache = true; + + startGridsMultiThreaded(gridCount()); + + hasCache = false; + + startGrid(gridCount()); + + GridCache<String, Organization> orgCache = ((IgniteKernal)grid(0)).cache(null); + + assert orgCache != null; + + assert orgCache.putx("o1", new Organization(1, "A")); + assert orgCache.putx("o2", new Organization(2, "B")); + + GridCache<CacheAffinityKey<String>, Person> personCache = ((IgniteKernal)grid(0)).cache(null); + + assert personCache != null; + + assert personCache.putx(new CacheAffinityKey<>("p1", "o1"), new Person("John White", 25, 1)); + assert personCache.putx(new CacheAffinityKey<>("p2", "o1"), new Person("Joe Black", 35, 1)); + assert personCache.putx(new CacheAffinityKey<>("p3", "o2"), new Person("Mike Green", 40, 2)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @return cache mode. + */ + protected abstract CacheMode cacheMode(); + + /** + * @return Number of grids to start. + */ + protected abstract int gridCount(); + + /** + * @return Cache atomicity mode. + */ + protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** + * @throws Exception If failed. + */ + public void testNoDataInCache() throws Exception { + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)) + .cache(null).queries().createSqlFieldsQuery("select age from Person where orgId = 999"); + + Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get(); + + assertEquals("Result", 0, F.reduce(res, new AverageLocalReducer()).intValue()); + } + + /** + * @throws Exception If failed. + */ + public void testAverageQuery() throws Exception { + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).cache(null).queries().createSqlFieldsQuery("select age from Person"); + + Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get(); + + assertEquals("Average", 33, F.reduce(res, new AverageLocalReducer()).intValue()); + } + + /** + * @throws Exception If failed. + */ + public void testAverageQueryWithArguments() throws Exception { + CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).cache(null).queries().createSqlFieldsQuery( + "select age from Person where orgId = ?"); + + Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer(), 1).get(); + + assertEquals("Average", 30, F.reduce(res, new AverageLocalReducer()).intValue()); + } + +// /** +// * @throws Exception If failed. +// */ +// public void testFilters() throws Exception { +// GridCacheReduceFieldsQuery<Object, Object, GridBiTuple<Integer, Integer>, Integer> qry = ((IgniteKernal)grid(0)).cache(null) +// .queries().createReduceFieldsQuery("select age from Person"); +// +// qry = qry.remoteKeyFilter( +// new GridPredicate<Object>() { +// @Override public boolean apply(Object e) { +// return !"p2".equals(((CacheAffinityKey)e).key()); +// } +// } +// ).remoteValueFilter( +// new P1<Object>() { +// @Override public boolean apply(Object e) { +// return !"Mike Green".equals(((Person)e).name); +// } +// } +// ); +// +// qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new AverageLocalReducer()); +// +// Integer avg = qry.reduce().get(); +// +// assertNotNull("Average", avg); +// assertEquals("Average", 25, avg.intValue()); +// } + - /** - * @throws Exception If failed. - */ - public void testOnProjection() throws Exception { - P2<CacheAffinityKey<String>, Person> p = new P2<CacheAffinityKey<String>, Person>() { - @Override public boolean apply(CacheAffinityKey<String> key, Person val) { - return val.orgId == 1; - } - }; - - CacheProjection<CacheAffinityKey<String>, Person> cachePrj = - ((IgniteKernal)grid(0)).<CacheAffinityKey<String>, Person>cache(null).projection(p); - - CacheQuery<List<?>> qry = cachePrj.queries().createSqlFieldsQuery("select age from Person"); - - Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get(); - - assertEquals("Average", 30, F.reduce(res, new AverageLocalReducer()).intValue()); - } - +// /** +// * @throws Exception If failed. +// */ +// public void testOnProjectionWithFilter() throws Exception { +// P2<CacheAffinityKey<String>, Person> p = new P2<CacheAffinityKey<String>, Person>() { +// @Override public boolean apply(CacheAffinityKey<String> key, Person val) { +// return val.orgId == 1; +// } +// }; +// +// CacheProjection<CacheAffinityKey<String>, Person> cachePrj = +// grid(0).<CacheAffinityKey<String>, Person>cache(null).projection(p); +// +// GridCacheReduceFieldsQuery<CacheAffinityKey<String>, Person, GridBiTuple<Integer, Integer>, Integer> qry = +// cachePrj.queries().createReduceFieldsQuery("select age from Person"); +// +// qry = qry.remoteValueFilter( +// new P1<Person>() { +// @Override public boolean apply(Person e) { +// return !"Joe Black".equals(e.name); +// } +// }); +// +// qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new AverageLocalReducer()); +// +// Integer avg = qry.reduce().get(); +// +// assertNotNull("Average", avg); +// assertEquals("Average", 25, avg.intValue()); +// } + + /** + * @return true if cache mode is replicated, false otherwise. + */ + private boolean isReplicatedMode() { + return cacheMode() == REPLICATED; + } + + /** + * Person. + */ + @SuppressWarnings("UnusedDeclaration") + private static class Person implements Serializable { + /** Name. */ + @QuerySqlField(index = false) + private final String name; + + /** Age. */ + @QuerySqlField(index = true) + private final int age; + + /** Organization ID. */ + @QuerySqlField(index = true) + private final int orgId; + + /** + * @param name Name. + * @param age Age. + * @param orgId Organization ID. + */ + private Person(String name, int age, int orgId) { + assert !F.isEmpty(name); + assert age > 0; + assert orgId > 0; + + this.name = name; + this.age = age; + this.orgId = orgId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Person person = (Person)o; + + return age == person.age && orgId == person.orgId && name.equals(person.name); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = name.hashCode(); + + res = 31 * res + age; + res = 31 * res + orgId; + + return res; + } + } + + /** + * Organization. + */ + @SuppressWarnings("UnusedDeclaration") + private static class Organization implements Serializable { + /** ID. */ + @QuerySqlField + private final int id; + + /** Name. */ + @QuerySqlField(index = false) + private final String name; + + /** + * @param id ID. + * @param name Name. + */ + private Organization(int id, String name) { + assert id > 0; + assert !F.isEmpty(name); + + this.id = id; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Organization that = (Organization)o; + + return id == that.id && name.equals(that.name); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = id; + + res = 31 * res + name.hashCode(); + + return res; + } + } + + /** + * Average remote reducer factory. + */ + protected static class AverageRemoteReducer implements IgniteReducer<List<?>, IgniteBiTuple<Integer, Integer>> { + /** */ + private int sum; + + /** */ + private int cnt; + + @Override public boolean collect(List<?> e) { + sum += (Integer)e.get(0); + + cnt++; + + return true; + } + + @Override public IgniteBiTuple<Integer, Integer> reduce() { + return F.t(sum, cnt); + } + } + + /** + * Average local reducer factory. + */ + protected static class AverageLocalReducer implements IgniteReducer<IgniteBiTuple<Integer, Integer>, Integer> { + /** */ + private int sum; + + /** */ + private int cnt; + + @Override public boolean collect(IgniteBiTuple<Integer, Integer> t) { + sum += t.get1(); + cnt += t.get2(); + + return true; + } + + @Override public Integer reduce() { + return cnt == 0 ? 0 : sum / cnt; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c4e3222/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java index 80b03ae,4c65d42..f7e50ad --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java @@@ -36,10 -36,10 +36,10 @@@ import org.h2.engine.* import org.h2.jdbc.*; import java.io.*; -import java.util.*; +import java.sql.*; import static org.apache.ignite.cache.CacheDistributionMode.*; - import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; /** @@@ -72,13 -76,15 +72,13 @@@ public class GridQueryParsingTest exten cc.setAtomicityMode(CacheAtomicityMode.ATOMIC); cc.setDistributionMode(PARTITIONED_ONLY); cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setPreloadMode(SYNC); + cc.setRebalanceMode(SYNC); cc.setSwapEnabled(false); - - CacheQueryConfiguration qcfg = new CacheQueryConfiguration(); - - qcfg.setIndexPrimitiveKey(true); - qcfg.setIndexFixedTyping(true); - - cc.setQueryConfiguration(qcfg); + cc.setSqlFunctionClasses(GridQueryParsingTest.class); + cc.setIndexedTypes( + String.class, Address.class, + String.class, Person.class + ); c.setCacheConfiguration(cc); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c4e3222/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c4e3222/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c4e3222/modules/yardstick/config/ignite-base-config.xml ---------------------------------------------------------------------- diff --cc modules/yardstick/config/ignite-base-config.xml index c88316d,ccb45a7..1e1daa5 --- a/modules/yardstick/config/ignite-base-config.xml +++ b/modules/yardstick/config/ignite-base-config.xml @@@ -27,23 -27,18 +27,6 @@@ <bean id="base-ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" abstract="true"> <property name="peerClassLoadingEnabled" value="false"/> - <property name="marshaller"> - <bean class="org.apache.ignite.marshaller.optimized.OptimizedMarshaller"> - <property name="requireSerializable" value="true"/> - <property name="classNames"> - <list> - <value>org.apache.ignite.yardstick.cache.model.SampleValue</value> - <value>org.apache.ignite.yardstick.cache.model.Person</value> - <value>org.apache.ignite.yardstick.cache.model.Organization</value> - <value>org.apache.ignite.yardstick.compute.model.NoopTask$NoopJob</value> - <value>org.apache.ignite.yardstick.compute.model.NoopCallable</value> - <value>org.apache.ignite.yardstick.compute.IgniteRunBenchmark$NoopRunnable</value> - <value>org.apache.ignite.yardstick.compute.IgniteApplyBenchmark$NoopClosure</value> - </list> - </property> - </bean> - </property> - <!--<property name="indexingSpi">--> - <!--<list>--> - <!--<bean class="org.apache.ignite.spi.indexing.h2.H2IndexingSpi">--> - <!--<property name="name" value="offheap"/>--> - <!--<property name="maxOffHeapMemory" value="#{500*1024*1024}"/>--> - <!--</bean>--> - <!--<bean class="org.apache.ignite.spi.indexing.h2.H2IndexingSpi">--> - <!--<property name="name" value="default"/>--> - <!--</bean>--> - <!--</list>--> - <!--</property>--> -- <property name="cacheConfiguration"> <list> <bean class="org.apache.ignite.configuration.CacheConfiguration"> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c4e3222/modules/yardstick/config/ignite-store-config.xml ----------------------------------------------------------------------