http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a800b1a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
index 8763dd1,0000000..f4cb8a3
mode 100644,000000..100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
@@@ -1,748 -1,0 +1,748 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.eviction.lru.*;
 +import org.apache.ignite.cache.query.*;
 +import org.apache.ignite.configuration.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.marshaller.optimized.*;
 +import org.apache.ignite.spi.discovery.tcp.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 +import org.apache.ignite.spi.swapspace.file.*;
 +import org.apache.ignite.internal.processors.cache.query.*;
 +import org.apache.ignite.internal.processors.query.*;
 +import org.apache.ignite.internal.processors.query.h2.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.testframework.junits.common.*;
 +import org.jetbrains.annotations.*;
 +
 +import javax.cache.*;
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.atomic.*;
 +
 +import static org.apache.ignite.cache.CacheAtomicityMode.*;
 +import static org.apache.ignite.cache.CacheMode.*;
 +
 +/**
 + * Multi-threaded tests for cache queries.
 + */
 +@SuppressWarnings("StatementWithEmptyBody")
 +public class IgniteCacheQueryMultiThreadedSelfTest extends 
GridCommonAbstractTest {
 +    /** */
 +    private static final boolean TEST_INFO = true;
 +
 +    /** Number of test grids (nodes). Should not be less than 2. */
 +    private static final int GRID_CNT = 2;
 +
 +    /** */
 +    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
 +
 +    /** */
 +    private static AtomicInteger idxSwapCnt = new AtomicInteger();
 +
 +    /** */
 +    private static AtomicInteger idxUnswapCnt = new AtomicInteger();
 +
 +    /** */
 +    private static final long DURATION = 30 * 1000;
 +
 +    /** Don't start grid by default. */
 +    public IgniteCacheQueryMultiThreadedSelfTest() {
 +        super(false);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
 +        IgniteConfiguration cfg = super.getConfiguration(gridName);
 +
 +        TcpDiscoverySpi disco = new TcpDiscoverySpi();
 +
 +        disco.setIpFinder(ipFinder);
 +
 +        cfg.setDiscoverySpi(disco);
 +
 +        cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
 +        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
 +
 +        CacheConfiguration cacheCfg = defaultCacheConfiguration();
 +
 +        cacheCfg.setCacheMode(PARTITIONED);
 +        cacheCfg.setAtomicityMode(TRANSACTIONAL);
 +        cacheCfg.setDistributionMode(CacheDistributionMode.NEAR_PARTITIONED);
 +        
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 +        cacheCfg.setSwapEnabled(true);
 +        cacheCfg.setBackups(1);
 +        cacheCfg.setEvictionPolicy(evictsEnabled() ? new 
CacheLruEvictionPolicy(100) : null);
 +
 +        CacheQueryConfiguration qcfg = new CacheQueryConfiguration();
 +
 +        qcfg.setIndexPrimitiveKey(true);
 +
 +        cacheCfg.setQueryConfiguration(qcfg);
 +
 +        if (offheapEnabled() && evictsEnabled())
 +            cacheCfg.setOffHeapMaxMemory(1000); // Small offheap for 
evictions.
 +
 +        cfg.setCacheConfiguration(cacheCfg);
 +
 +        GridQueryConfiguration indexing = new GridQueryConfiguration();
 +
 +        indexing.setMaxOffheapRowsCacheSize(128);
 +
 +        if (offheapEnabled())
 +            indexing.setMaxOffHeapMemory(0);
 +
 +        cfg.setQueryConfiguration(indexing);
 +
 +        GridQueryProcessor.idxCls = FakeIndexing.class;
 +
 +        return cfg;
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class FakeIndexing extends IgniteH2Indexing {
 +        @Override public void onSwap(@Nullable String spaceName, Object key) 
throws IgniteCheckedException {
 +            super.onSwap(spaceName, key);
 +
 +            idxSwapCnt.incrementAndGet();
 +        }
 +
 +        @Override public void onUnswap(@Nullable String spaceName, Object 
key, Object val, byte[] valBytes)
 +        throws IgniteCheckedException {
 +            super.onUnswap(spaceName, key, val, valBytes);
 +
 +            idxUnswapCnt.incrementAndGet();
 +        }
 +    }
 +
 +    /** @return {@code true} If offheap enabled. */
 +    protected boolean offheapEnabled() {
 +        return false;
 +    }
 +
 +    /** @return {@code true} If evictions enabled. */
 +    protected boolean evictsEnabled() {
 +        return true;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTest() throws Exception {
 +        super.beforeTest();
 +
 +        // Clean up all caches.
 +        for (int i = 0; i < GRID_CNT; i++) {
 +            GridCache<Object, Object> c = grid(i).cache(null);
 +
 +            assertEquals(0, c.size());
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTestsStarted() throws Exception {
 +        assert GRID_CNT >= 2 : "Constant GRID_CNT must be greater than or 
equal to 2.";
 +
 +        startGridsMultiThreaded(GRID_CNT);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTestsStopped() throws Exception {
 +        stopAllGrids();
 +
 +        if (evictsEnabled()) {
 +            assertTrue(idxSwapCnt.get() > 0);
 +            assertTrue(idxUnswapCnt.get() > 0);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTest() throws Exception {
 +        super.afterTest();
 +
 +        // Clean up all caches.
 +        for (int i = 0; i < GRID_CNT; i++) {
 +            GridCache<Object, Object> c = grid(i).cache(null);
 +
 +            c.removeAll(F.<CacheEntry<Object, Object>>alwaysTrue());
 +
 +            Iterator<Map.Entry<Object, Object>> it = c.swapIterator();
 +
 +            while (it.hasNext()) {
 +                it.next();
 +
 +                it.remove();
 +            }
 +
 +            it = c.offHeapIterator();
 +
 +            while (it.hasNext()) {
 +                it.next();
 +
 +                it.remove();
 +            }
 +
 +            assertEquals("Swap keys: " + c.swapKeys(), 0, c.swapKeys());
 +            assertEquals(0, c.offHeapEntriesCount());
 +            assertEquals(0, c.size());
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void info(String msg) {
 +        if (TEST_INFO)
 +            super.info(msg);
 +    }
 +
 +    /**
 +     * @param entries Entries.
 +     * @param g Grid.
 +     * @return Affinity nodes.
 +     */
 +    private Set<UUID> affinityNodes(Iterable<Cache.Entry<Integer, Integer>> 
entries, Ignite g) {
 +        Set<UUID> nodes = new HashSet<>();
 +
 +        for (Cache.Entry<Integer, Integer> entry : entries)
 +            
nodes.add(g.cache(null).affinity().mapKeyToPrimaryAndBackups(entry.getKey()).iterator().next().id());
 +
 +        return nodes;
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedSwapUnswapString() throws Exception {
 +        int threadCnt = 150;
 +        final int keyCnt = 2000;
 +        final int valCnt = 10000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, String> c = g.jcache(null);
 +        final IgniteCache<Integer, Long> cl = g.jcache(null);
 +
 +        assertEquals(0, g.cache(null).size());
 +        assertEquals(0, c.query(new QuerySqlPredicate<Integer, String>("1 = 
1")).getAll().size());
 +        assertEquals(0, cl.query(new QuerySqlPredicate<Integer, Long>("1 = 
1")).getAll().size());
 +
 +        Random rnd = new Random();
 +
 +        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
 +            c.put(i, String.valueOf(rnd.nextInt(valCnt)));
 +
 +            if (evictsEnabled() && rnd.nextBoolean())
 +                c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
-         IgniteFuture<?> fut = multithreadedAsync(new CAX() {
++        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                Random rnd = new Random();
 +
 +                while (!done.get()) {
 +                    switch (rnd.nextInt(5)) {
 +                        case 0:
 +                            c.put(rnd.nextInt(keyCnt), 
String.valueOf(rnd.nextInt(valCnt)));
 +
 +                            break;
 +                        case 1:
 +                            if (evictsEnabled())
 +                                
c.localEvict(Arrays.asList(rnd.nextInt(keyCnt)));
 +
 +                            break;
 +                        case 2:
 +                            c.remove(rnd.nextInt(keyCnt));
 +
 +                            break;
 +                        case 3:
 +                            c.get(rnd.nextInt(keyCnt));
 +
 +                            break;
 +                        case 4:
 +                            int from = rnd.nextInt(valCnt);
 +
 +                            QueryCursor<Cache.Entry<Integer, String>> qry = 
c.query(
-                                 new QuerySqlPredicate<Integer, String>("_val 
between ? and ?", String.valueOf(from),
-                                 String.valueOf(from + 250)));
++                                    new QuerySqlPredicate<Integer, 
String>("_val between ? and ?", String.valueOf(from),
++                                            String.valueOf(from + 250)));
 +
 +                            Collection<Cache.Entry<Integer, String>> res = 
qry.getAll();
 +
 +                            for (Cache.Entry<Integer, String> ignored : res) {
 +                                //No-op.
 +                            }
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedSwapUnswapLong() throws Exception {
 +        int threadCnt = 150;
 +        final int keyCnt = 2000;
 +        final int valCnt = 10000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Long> c = g.jcache(null);
 +        final IgniteCache<Integer, String> c1 = g.jcache(null);
 +
 +        assertEquals(0, g.cache(null).size());
 +        assertEquals(0, c1.query(new QuerySqlPredicate<Integer, String>("1 = 
1")).getAll().size());
 +        assertEquals(0, c.query(new QuerySqlPredicate<Integer, Long>("1 = 
1")).getAll().size());
 +
 +        Random rnd = new Random();
 +
 +        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
-             c.put(i, (long)rnd.nextInt(valCnt));
++            c.put(i, (long) rnd.nextInt(valCnt));
 +
 +            if (evictsEnabled() && rnd.nextBoolean())
 +                c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
-         IgniteFuture<?> fut = multithreadedAsync(new CAX() {
++        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                Random rnd = new Random();
 +
 +                while (!done.get()) {
 +                    int key = rnd.nextInt(keyCnt);
 +
 +                    switch (rnd.nextInt(5)) {
 +                        case 0:
-                             c.put(key, (long)rnd.nextInt(valCnt));
++                            c.put(key, (long) rnd.nextInt(valCnt));
 +
 +                            break;
 +                        case 1:
 +                            if (evictsEnabled())
 +                                c.localEvict(Arrays.asList(key));
 +
 +                            break;
 +                        case 2:
 +                            c.remove(key);
 +
 +                            break;
 +                        case 3:
 +                            c.get(key);
 +
 +                            break;
 +                        case 4:
 +                            int from = rnd.nextInt(valCnt);
 +
 +                            Collection<Cache.Entry<Integer, Long>> res = 
c.query(new QuerySqlPredicate<Integer, Long>(
 +                                "_val between ? and ?", from, from + 
250)).getAll();
 +
 +                            for (Cache.Entry<Integer, Long> ignored : res) {
 +                                //No-op.
 +                            }
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedSwapUnswapLongString() throws Exception {
 +        int threadCnt = 150;
 +        final int keyCnt = 2000;
 +        final int valCnt = 10000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Object> c = g.jcache(null);
 +
 +        assertEquals(0, g.jcache(null).size());
 +        assertEquals(0, c.query(new QuerySqlPredicate<Integer, Object>("1 = 
1")).getAll().size());
 +
 +        Random rnd = new Random();
 +
 +        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
 +            c.put(i, rnd.nextBoolean() ? (long) rnd.nextInt(valCnt) : 
String.valueOf(rnd.nextInt(valCnt)));
 +
 +            if (evictsEnabled() && rnd.nextBoolean())
 +                c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
-         IgniteFuture<?> fut = multithreadedAsync(new CAX() {
++        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                Random rnd = new Random();
 +
 +                while (!done.get()) {
 +                    int key = rnd.nextInt(keyCnt);
 +
 +                    switch (rnd.nextInt(5)) {
 +                        case 0:
 +                            c.put(key, rnd.nextBoolean() ? (long) 
rnd.nextInt(valCnt) :
-                                 String.valueOf(rnd.nextInt(valCnt)));
++                                    String.valueOf(rnd.nextInt(valCnt)));
 +
 +                            break;
 +                        case 1:
 +                            if (evictsEnabled())
 +                                c.localEvict(Arrays.asList(key));
 +
 +                            break;
 +                        case 2:
 +                            c.remove(key);
 +
 +                            break;
 +                        case 3:
 +                            c.get(key);
 +
 +                            break;
 +                        case 4:
 +                            int from = rnd.nextInt(valCnt);
 +
 +                            Collection<Cache.Entry<Integer, Object>> res = 
c.query(
 +                                new QuerySqlPredicate<Integer, Object>("_val 
between ? and ?", from, from + 250))
 +                                .getAll();
 +
 +                            for (Cache.Entry<Integer, Object> ignored : res) {
 +                                //No-op.
 +                            }
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedSwapUnswapObject() throws Exception {
 +        int threadCnt = 50;
 +        final int keyCnt = 4000;
 +        final int valCnt = 10000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, TestValue> c = g.jcache(null);
 +
 +        assertEquals(0, g.cache(null).size());
 +        assertEquals(0, c.query(new QuerySqlPredicate<Integer, TestValue>("1 
= 1")).getAll().size());
 +
 +        Random rnd = new Random();
 +
 +        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
 +            c.put(i, new TestValue(rnd.nextInt(valCnt)));
 +
 +            if (evictsEnabled() && rnd.nextBoolean())
 +                c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
-         IgniteFuture<?> fut = multithreadedAsync(new CAX() {
++        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                Random rnd = new Random();
 +
 +                while (!done.get()) {
 +                    int key = rnd.nextInt(keyCnt);
 +
 +                    switch (rnd.nextInt(5)) {
 +                        case 0:
 +                            c.put(key, new TestValue(rnd.nextInt(valCnt)));
 +
 +                            break;
 +                        case 1:
 +                            if (evictsEnabled())
 +                                c.localEvict(Arrays.asList(key));
 +
 +                            break;
 +                        case 2:
 +                            c.remove(key);
 +
 +                            break;
 +                        case 3:
 +                            c.get(key);
 +
 +                            break;
 +                        case 4:
 +                            int from = rnd.nextInt(valCnt);
 +
 +                            Collection<Cache.Entry<Integer, TestValue>> res =
 +                                c.query(new QuerySqlPredicate<Integer, 
TestValue>("TestValue.val between ? and ?",
 +                                    from, from + 250)).getAll();
 +
 +                            for (Cache.Entry<Integer, TestValue> ignored : 
res) {
 +                                //No-op.
 +                            }
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedSameQuery() throws Exception {
 +        int threadCnt = 50;
 +        final int keyCnt = 10;
 +        final int logMod = 5000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Integer> c = g.jcache(null);
 +
 +        for (int i = 0; i < keyCnt; i++) {
 +            c.put(i, i);
 +
 +            c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicInteger cnt = new AtomicInteger();
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
-         IgniteFuture<?> fut = multithreadedAsync(
++        IgniteInternalFuture<?> fut = multithreadedAsync(
 +            new CAX() {
 +                @Override public void applyx() throws IgniteCheckedException {
 +                    int iter = 0;
 +
 +                    while (!done.get() && 
!Thread.currentThread().isInterrupted()) {
 +                        iter++;
 +
 +                        Collection<Cache.Entry<Integer, Integer>> entries =
 +                            c.query(new QuerySqlPredicate<Integer, 
Integer>("_val >= 0")).getAll();
 +
 +                        assert entries != null;
 +
 +                        assertEquals("Query results [entries=" + entries + ", 
aff=" + affinityNodes(entries, g) +
 +                            ", iteration=" + iter + ']', keyCnt, 
entries.size());
 +
 +                        if (cnt.incrementAndGet() % logMod == 0) {
 +                            GridCacheQueryManager<Object, Object> qryMgr =
-                                 
((GridKernal)g).internalCache().context().queries();
++                                
((IgniteKernal)g).internalCache().context().queries();
 +
 +                            assert qryMgr != null;
 +
 +                            qryMgr.printMemoryStats();
 +                        }
 +                    }
 +                }
 +            }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        info("Finishing test...");
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedNewQueries() throws Exception {
 +        int threadCnt = 50;
 +        final int keyCnt = 10;
 +        final int logMod = 5000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Integer> c = g.jcache(null);
 +
 +        for (int i = 0; i < keyCnt; i++) {
 +            c.put(i, i);
 +
 +            c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicInteger cnt = new AtomicInteger();
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
-         IgniteFuture<?> fut = multithreadedAsync(new CAX() {
++        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                int iter = 0;
 +
 +                while (!done.get() && 
!Thread.currentThread().isInterrupted()) {
 +                    iter++;
 +
 +                    Collection<Cache.Entry<Integer, Integer>> entries =
 +                        c.query(new QuerySqlPredicate<Integer, Integer>("_val 
>= 0")).getAll();
 +
 +                    assert entries != null;
 +
 +                    assertEquals("Entries count is not as expected on 
iteration: " + iter, keyCnt, entries.size());
 +
 +                    if (cnt.incrementAndGet() % logMod == 0) {
 +                        GridCacheQueryManager<Object, Object> qryMgr =
-                             
((GridKernal)g).internalCache().context().queries();
++                            
((IgniteKernal)g).internalCache().context().queries();
 +
 +                        assert qryMgr != null;
 +
 +                        qryMgr.printMemoryStats();
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedScanQuery() throws Exception {
 +        int threadCnt = 50;
 +        final int keyCnt = 500;
 +        final int logMod = 5000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Integer> c = g.jcache(null);
 +
 +        for (int i = 0; i < keyCnt; i++)
 +            c.put(i, i);
 +
 +        final AtomicInteger cnt = new AtomicInteger();
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
-         IgniteFuture<?> fut = multithreadedAsync(
++        IgniteInternalFuture<?> fut = multithreadedAsync(
 +            new CAX() {
 +                @Override public void applyx() throws IgniteCheckedException {
 +                    int iter = 0;
 +
 +                    while (!done.get() && 
!Thread.currentThread().isInterrupted()) {
 +                        iter++;
 +
 +                        // Scan query.
 +                        Collection<Cache.Entry<Integer, Integer>> entries =
 +                            c.query(new QueryPredicate<Integer, Integer>() {
 +                                @Override public boolean 
apply(Cache.Entry<Integer, Integer> integerIntegerEntry) {
 +                                    return true;
 +                                }
 +                            }).getAll();
 +
 +                        assert entries != null;
 +
 +                        assertEquals("Entries count is not as expected on 
iteration: " + iter, keyCnt, entries.size());
 +
 +                        if (cnt.incrementAndGet() % logMod == 0) {
 +                            GridCacheQueryManager<Object, Object> qryMgr =
-                                 
((GridKernal)g).internalCache().context().queries();
++                                
((IgniteKernal)g).internalCache().context().queries();
 +
 +                            assert qryMgr != null;
 +
 +                            qryMgr.printMemoryStats();
 +                        }
 +                    }
 +                }
 +            }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * Test value.
 +     */
 +    private static class TestValue implements Serializable {
 +        /** Value. */
 +        @CacheQuerySqlField
 +        private int val;
 +
 +        /**
 +         * @param val Value.
 +         */
 +        private TestValue(int val) {
 +            this.val = val;
 +        }
 +
 +        /**
 +         * @return Value.
 +         */
 +        public int value() {
 +            return val;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a800b1a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
index a8da960,0000000..74c31e1
mode 100644,000000..100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@@ -1,223 -1,0 +1,224 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache.distributed.near;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.query.*;
 +import org.apache.ignite.configuration.*;
 +import org.apache.ignite.events.*;
++import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.spi.discovery.tcp.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.*;
 +
 +import javax.cache.*;
 +
 +import static org.apache.ignite.cache.CacheAtomicityMode.*;
 +import static org.apache.ignite.cache.CacheDistributionMode.*;
 +import static org.apache.ignite.cache.CacheMode.*;
 +
 +/**
 + * Test for distributed queries with node restarts.
 + */
 +public class IgniteCacheQueryNodeRestartSelfTest extends 
GridCacheAbstractSelfTest {
 +    /** */
 +    private static final int GRID_CNT = 3;
 +
 +    /** */
 +    private static final int KEY_CNT = 1000;
 +
 +    /** */
 +    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
 +
 +    /** {@inheritDoc} */
 +    @Override protected int gridCount() {
 +        return GRID_CNT;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected long getTestTimeout() {
 +        return 90 * 1000;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
 +        IgniteConfiguration c = super.getConfiguration(gridName);
 +
 +        TcpDiscoverySpi disco = new TcpDiscoverySpi();
 +
 +        disco.setIpFinder(ipFinder);
 +
 +        c.setDiscoverySpi(disco);
 +
 +        CacheConfiguration cc = defaultCacheConfiguration();
 +
 +        cc.setCacheMode(PARTITIONED);
 +        cc.setBackups(1);
 +        
cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 +        cc.setAtomicityMode(TRANSACTIONAL);
 +        cc.setDistributionMode(NEAR_PARTITIONED);
 +
 +        CacheQueryConfiguration qcfg = new CacheQueryConfiguration();
 +
 +        qcfg.setIndexPrimitiveKey(true);
 +
 +        cc.setQueryConfiguration(qcfg);
 +
 +        c.setCacheConfiguration(cc);
 +
 +        return c;
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testRestarts() throws Exception {
 +        int duration = 60 * 1000;
 +        int qryThreadNum = 10;
 +        final long nodeLifeTime = 2 * 1000;
 +        final int logFreq = 20;
 +
 +        final IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 +
 +        assert cache != null;
 +
 +        for (int i = 0; i < KEY_CNT; i++)
 +            cache.put(i, i);
 +
 +        assertEquals(KEY_CNT, cache.size());
 +
 +        final AtomicInteger qryCnt = new AtomicInteger();
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
-         IgniteFuture<?> fut1 = multithreadedAsync(new CAX() {
++        IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                while (!done.get()) {
 +                    Collection<Cache.Entry<Integer, Integer>> res =
 +                        cache.query(new QuerySqlPredicate<Integer, 
Integer>("_val >= 0")).getAll();
 +
 +                    assertFalse(res.isEmpty());
 +
 +                    int c = qryCnt.incrementAndGet();
 +
 +                    if (c % logFreq == 0)
 +                        info("Executed queries: " + c);
 +                }
 +            }
 +        }, qryThreadNum);
 +
 +        final AtomicInteger restartCnt = new AtomicInteger();
 +
 +        CollectingEventListener lsnr = new CollectingEventListener();
 +
 +        for (int i = 0; i < GRID_CNT; i++)
 +            grid(i).events().localListen(lsnr, 
IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
 +
-         IgniteFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
++        IgniteInternalFuture<?> fut2 = multithreadedAsync(new 
Callable<Object>() {
 +            @SuppressWarnings({"BusyWait"})
 +            @Override public Object call() throws Exception {
 +                while (!done.get()) {
 +                    int idx = GRID_CNT;
 +
 +                    startGrid(idx);
 +
 +                    Thread.sleep(nodeLifeTime);
 +
 +                    stopGrid(idx);
 +
 +                    int c = restartCnt.incrementAndGet();
 +
 +                    if (c % logFreq == 0)
 +                        info("Node restarts: " + c);
 +                }
 +
 +                return true;
 +            }
 +        }, 1);
 +
 +        Thread.sleep(duration);
 +
 +        done.set(true);
 +
 +        fut1.get();
 +        fut2.get();
 +
 +        info("Awaiting preload events [restartCnt=" + restartCnt.get() + ']');
 +
 +        boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 
15000);
 +
 +        for (int i = 0; i < GRID_CNT; i++)
 +            grid(i).events().stopLocalListen(lsnr, 
IgniteEventType.EVT_CACHE_PRELOAD_STOPPED);
 +
 +        assert success;
 +    }
 +
 +    /** Listener that will wait for specified number of events received. */
 +    private class CollectingEventListener implements 
IgnitePredicate<IgniteEvent> {
 +        /** Registered events count. */
 +        private int evtCnt;
 +
 +        /** {@inheritDoc} */
 +        @Override public synchronized boolean apply(IgniteEvent evt) {
 +            evtCnt++;
 +
 +            info("Processed event [evt=" + evt + ", evtCnt=" + evtCnt + ']');
 +
 +            notifyAll();
 +
 +            return true;
 +        }
 +
 +        /**
 +         * Waits until total number of events processed is equal or greater 
then argument passed.
 +         *
 +         * @param cnt Number of events to wait.
 +         * @param timeout Timeout to wait.
 +         * @return {@code True} if successfully waited, {@code false} if 
timeout happened.
 +         * @throws InterruptedException If thread is interrupted.
 +         */
 +        public synchronized boolean awaitEvents(int cnt, long timeout) throws 
InterruptedException {
 +            long start = U.currentTimeMillis();
 +
 +            long now = start;
 +
 +            while (start + timeout > now) {
 +                if (evtCnt >= cnt)
 +                    return true;
 +
 +                wait(start + timeout - now);
 +
 +                now = U.currentTimeMillis();
 +            }
 +
 +            return false;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a800b1a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQuerySelfTest.java
index 33012a2,0000000..e122e0c
mode 100644,000000..100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQuerySelfTest.java
@@@ -1,89 -1,0 +1,80 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache.distributed.replicated;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.query.*;
- import org.apache.ignite.cluster.*;
- import org.apache.ignite.events.*;
- import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.*;
- import org.apache.ignite.lang.*;
- import org.apache.ignite.internal.processors.query.*;
- import org.apache.ignite.internal.util.future.*;
- import org.apache.ignite.internal.util.typedef.*;
- import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.testframework.*;
 +
 +import java.util.*;
 +import java.util.concurrent.*;
 +
- import static org.apache.ignite.events.IgniteEventType.*;
 +import static org.apache.ignite.cache.CacheMode.*;
 +
 +/**
 + * Tests for fields queries.
 + */
 +public class IgniteCacheReplicatedFieldsQuerySelfTest extends 
IgniteCacheAbstractFieldsQuerySelfTest {
 +    /** {@inheritDoc} */
 +    @Override protected CacheMode cacheMode() {
 +        return REPLICATED;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected int gridCount() {
 +        return 3;
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testLostIterator() throws Exception {
 +        IgniteCache<Object, Object> cache = grid(0).jcache(null);
 +
 +        QueryCursor<List<?>> qry = null;
 +
 +        int maximumQueryIteratorCount =
 +            
cache.getConfiguration(CacheConfiguration.class).getMaximumQueryIteratorCount();
 +
 +        for (int i = 0; i < maximumQueryIteratorCount + 1; i++) {
 +            QueryCursor<List<?>> q = cache
 +               .queryFields(new QuerySqlPredicate<>("select _key from Integer 
where _key >= 0 order by _key", 50));
 +
 +            assertEquals(0, q.iterator().next().get(0));
 +
 +            if (qry == null)
 +                qry = q;
 +        }
 +
 +        final QueryCursor<List<?>> qry0 = qry;
 +
 +        GridTestUtils.assertThrows(log, new Callable<Object>() {
 +            @Override
 +            public Object call() throws Exception {
 +                int i = 0;
 +
 +                for (List<?> row : qry0)
 +                    assertEquals(++i % 50, row.get(0));
 +
 +                return null;
 +            }
 +        }, IgniteException.class, null);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a800b1a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java
index d5a8ca2,0000000..4929ab6
mode 100644,000000..100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java
@@@ -1,575 -1,0 +1,575 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache.distributed.replicated;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.query.*;
 +import org.apache.ignite.events.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.transactions.*;
 +import org.apache.ignite.internal.processors.cache.query.*;
 +import org.apache.ignite.internal.util.future.*;
 +import org.apache.ignite.internal.util.lang.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.testframework.*;
 +import org.springframework.util.*;
 +
 +import java.io.*;
 +import java.lang.reflect.*;
 +import java.sql.*;
 +import java.util.*;
 +import java.util.concurrent.*;
 +
- import javax.cache.Cache;
++import javax.cache.*;
 +
 +import static org.apache.ignite.events.IgniteEventType.*;
 +import static org.apache.ignite.cache.CacheMode.*;
 +
 +/**
 + * Tests replicated query.
 + */
 +public class IgniteCacheReplicatedQuerySelfTest extends 
IgniteCacheAbstractQuerySelfTest {
 +    /** */
 +    private static final boolean TEST_DEBUG = false;
 +
 +    /** Grid1. */
 +    private static Ignite ignite1;
 +
 +    /** Grid2. */
 +    private static Ignite ignite2;
 +
 +    /** Grid3. */
 +    private static Ignite ignite3;
 +
 +    /** Cache1. */
 +    private static IgniteCache<CacheKey, CacheValue> cache1;
 +
 +    /** Cache2. */
 +    private static IgniteCache<CacheKey, CacheValue> cache2;
 +
 +    /** Cache3. */
 +    private static IgniteCache<CacheKey, CacheValue> cache3;
 +
 +    /** Key serialization cnt. */
 +    private static volatile int keySerCnt;
 +
 +    /** Key deserialization count. */
 +    private static volatile int keyDesCnt;
 +
 +    /** Value serialization count. */
 +    private static volatile int valSerCnt;
 +
 +    /** Value deserialization count. */
 +    private static volatile int valDesCnt;
 +
 +    /** {@inheritDoc} */
 +    @Override protected int gridCount() {
 +        return 3;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected CacheMode cacheMode() {
 +        return REPLICATED;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTest() throws Exception {
 +        super.beforeTest();
 +
 +        ignite1 = grid(0);
 +        ignite2 = grid(1);
 +        ignite3 = grid(2);
 +
 +        cache1 = ignite1.jcache(null);
 +        cache2 = ignite2.jcache(null);
 +        cache3 = ignite3.jcache(null);
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testClientOnlyNode() throws Exception {
 +        try {
 +            Ignite g = startGrid("client");
 +
 +            IgniteCache<Integer, Integer> c = g.jcache(null);
 +
 +            for (int i = 0; i < 10; i++)
 +                c.put(i, i);
 +
 +            // Client cache should be empty.
 +            assertEquals(0, c.size());
 +
 +            Collection<Cache.Entry<Integer, Integer>> res =
 +                c.query(new QuerySqlPredicate<Integer, Integer>("_key >= 5 
order by _key")).getAll();
 +
 +            assertEquals(5, res.size());
 +
 +            int i = 5;
 +
 +            for (Cache.Entry<Integer, Integer> e : res) {
 +                assertEquals(i, e.getKey().intValue());
 +                assertEquals(i, e.getValue().intValue());
 +
 +                i++;
 +            }
 +        }
 +        finally {
 +            stopGrid("client");
 +        }
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    public void testIterator() throws Exception {
 +        int keyCnt = 100;
 +
 +        for (int i = 0; i < keyCnt; i++)
 +            cache1.put(new CacheKey(i), new CacheValue("val" + i));
 +
 +        assertEquals(keyCnt, cache1.size());
 +        assertEquals(keyCnt, cache2.size());
 +        assertEquals(keyCnt, cache3.size());
 +
 +        QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry =
 +            cache1.query(new QuerySqlPredicate<CacheKey, CacheValue>("select 
* from CacheValue", 10, new Object[0]));
 +
 +        Iterator<Cache.Entry<CacheKey, CacheValue>> iter = qry.iterator();
 +
 +        assert iter.hasNext();
 +
 +        int cnt = 0;
 +
 +        while (iter.hasNext()) {
 +            iter.next();
 +
 +            cnt++;
 +        }
 +
 +        // Expect duplicates since we run query on full projection of 3 nodes 
and dedup flag is false.
 +        assertEquals(keyCnt * 3, cnt);
 +    }
 +
 +    /**
 +     * @throws Exception If test failed.
 +     */
 +    public void testLocalQuery() throws Exception {
 +        cache1.removeAll();
 +
 +        IgniteTx tx = ignite1.transactions().txStart();
 +
 +        try {
 +            cache1.put(new CacheKey(1), new CacheValue("1"));
 +            cache1.put(new CacheKey(2), new CacheValue("2"));
 +            cache1.put(new CacheKey(3), new CacheValue("3"));
 +            cache1.put(new CacheKey(4), new CacheValue("4"));
 +
 +            tx.commit();
 +
 +            info("Committed transaction: " + tx);
 +        }
 +        catch (IgniteCheckedException e) {
 +            tx.rollback();
 +
 +            throw e;
 +        }
 +
 +        checkQueryResults(cache1);
 +        checkQueryResults(cache2);
 +        checkQueryResults(cache3);
 +    }
 +
 +    /**
 +     * @throws Exception If test failed.
 +     */
 +    public void testDistributedQuery() throws Exception {
 +        int keyCnt = 4;
 +
 +        final CountDownLatch latch = new CountDownLatch(keyCnt * 2);
 +
 +        IgnitePredicate<IgniteEvent> lsnr = new 
IgnitePredicate<IgniteEvent>() {
 +            @Override public boolean apply(IgniteEvent evt) {
 +                latch.countDown();
 +
 +                return true;
 +            }
 +        };
 +
 +        ignite2.events().localListen(lsnr, 
IgniteEventType.EVT_CACHE_OBJECT_PUT);
 +        ignite3.events().localListen(lsnr, 
IgniteEventType.EVT_CACHE_OBJECT_PUT);
 +
 +        IgniteTx tx = ignite1.transactions().txStart();
 +
 +        try {
 +            for (int i = 1; i <= keyCnt; i++)
 +                cache1.put(new CacheKey(i), new 
CacheValue(String.valueOf(i)));
 +
 +            tx.commit();
 +
 +            info("Committed transaction: " + tx);
 +        }
 +        catch (IgniteCheckedException e) {
 +            tx.rollback();
 +
 +            throw e;
 +        }
 +
 +        latch.await();
 +
 +        QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry =
 +            cache1.query(new QuerySqlPredicate<CacheKey, CacheValue>("val > 1 
and val < 4"));
 +
 +        // Distributed query.
 +        assertEquals(6, qry.getAll().size());
 +
 +        // Create new query, old query cannot be modified after it has been 
executed.
 +        qry = cache3.localQuery(new QuerySqlPredicate<CacheKey, 
CacheValue>("val > 1 and val < 4"));
 +
 +        // Tests execute on node.
 +        Iterator<Cache.Entry<CacheKey, CacheValue>> iter = qry.iterator();
 +
 +        assert iter != null;
 +        assert iter.hasNext();
 +
 +        iter.next();
 +
 +        assert iter.hasNext();
 +
 +        iter.next();
 +
 +        assert !iter.hasNext();
 +    }
 +
 +    /**
 +     * Returns private field {@code qryIters} of {@link 
GridCacheQueryManager} for the given grid.
 +     *
 +     * @param g Grid which {@link GridCacheQueryManager} should be observed.
 +     * @return {@code qryIters} of {@link GridCacheQueryManager}.
 +     */
 +    private ConcurrentMap<UUID,
 +        Map<Long, 
GridFutureAdapter<GridCloseableIterator<IgniteBiTuple<CacheKey, CacheValue>>>>>
 +        distributedQueryManagerQueryItersMap(Ignite g) {
-         GridCacheContext ctx = ((GridKernal)g).internalCache().context();
++        GridCacheContext ctx = ((IgniteKernal)g).internalCache().context();
 +
 +        Field qryItersField = 
ReflectionUtils.findField(ctx.queries().getClass(), "qryIters");
 +
 +        qryItersField.setAccessible(true);
 +
 +        return (ConcurrentMap<UUID,
 +            Map<Long, 
GridFutureAdapter<GridCloseableIterator<IgniteBiTuple<CacheKey, CacheValue>>>>>)
 +            ReflectionUtils.getField(qryItersField, ctx.queries());
 +    }
 +
 +    /**
 +     * @throws Exception If test failed.
 +     */
 +    public void testToString() throws Exception {
 +        int keyCnt = 4;
 +
 +        for (int i = 1; i <= keyCnt; i++)
 +            cache1.put(new CacheKey(i), new CacheValue(String.valueOf(i)));
 +
 +        // Create query with key filter.
 +
 +        QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry =
 +            cache1.query(new QuerySqlPredicate<CacheKey, CacheValue>("val > 
0"));
 +
 +        assertEquals(keyCnt * 3, qry.getAll().size());
 +
 +        info("Query result: " + qry.getAll());
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testLostIterator() throws Exception {
 +        IgniteCache<Integer, Integer> cache = ignite.jcache(null);
 +
 +        for (int i = 0; i < 1000; i++)
 +            cache.put(i, i);
 +
 +        QueryCursor<Cache.Entry<Integer, Integer>> fut = null;
 +
 +        for (int i = 0; i < 
cache.getConfiguration(CacheConfiguration.class).getMaximumQueryIteratorCount() 
+ 1; i++) {
 +            QueryCursor<Cache.Entry<Integer, Integer>> q =
 +                cache.query(new QuerySqlPredicate<Integer, Integer>("_key >= 
0 order by _key"));
 +
 +            assertEquals(0, (int)q.iterator().next().getKey());
 +
 +            if (fut == null)
 +                fut = q;
 +        }
 +
 +        final QueryCursor<Cache.Entry<Integer, Integer>> fut0 = fut;
 +
 +        GridTestUtils.assertThrows(log, new Callable<Object>() {
 +            @Override public Object call() throws Exception {
 +                int i = 0;
 +
 +                Cache.Entry<Integer, Integer> e;
 +
 +                while ((e = fut0.iterator().next()) != null)
 +                    assertEquals(++i, (int)e.getKey());
 +
 +                return null;
 +            }
 +        }, IgniteException.class, null);
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testNodeLeft() throws Exception {
 +        try {
 +            Ignite g = startGrid();
 +
 +            IgniteCache<Integer, Integer> cache = g.jcache(null);
 +
 +            for (int i = 0; i < 1000; i++)
 +                cache.put(i, i);
 +
 +            QueryCursor<Cache.Entry<Integer, Integer>> q =
 +                cache.query(new QuerySqlPredicate<Integer, Integer>("_key >= 
0 order by _key", 50, new Object[0]));
 +
 +            assertEquals(0, (int) q.iterator().next().getKey());
 +
 +            final ConcurrentMap<UUID, Map<Long, 
GridFutureAdapter<GridCloseableIterator<
 +                IgniteBiTuple<Integer, Integer>>>>> map =
-                 
U.field(((GridKernal)grid(0)).internalCache().context().queries(), "qryIters");
++                
U.field(((IgniteKernal)grid(0)).internalCache().context().queries(), 
"qryIters");
 +
 +            // fut.nextX() does not guarantee the request has completed on 
remote node
 +            // (we could receive page from local one), so we need to wait.
 +            assertTrue(GridTestUtils.waitForCondition(new PA() {
 +                @Override public boolean apply() {
 +                    return map.size() == 1;
 +                }
 +            }, getTestTimeout()));
 +
 +            Map<Long, 
GridFutureAdapter<GridCloseableIterator<IgniteBiTuple<Integer, Integer>>>> futs 
=
 +                map.get(g.cluster().localNode().id());
 +
 +            assertEquals(1, futs.size());
 +
 +            GridCloseableIterator<IgniteBiTuple<Integer, Integer>> iter =
-                 (GridCloseableIterator<IgniteBiTuple<Integer, 
Integer>>)((IgniteFuture)F.first(futs.values()).get()).get();
++                (GridCloseableIterator<IgniteBiTuple<Integer, 
Integer>>)((IgniteInternalFuture)F.first(futs.values()).get()).get();
 +
 +            ResultSet rs = U.field(iter, "data");
 +
 +            assertFalse(rs.isClosed());
 +
 +            final UUID nodeId = g.cluster().localNode().id();
 +            final CountDownLatch latch = new CountDownLatch(1);
 +
 +            grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() {
 +                @Override public boolean apply(IgniteEvent evt) {
 +                    if 
(((IgniteDiscoveryEvent)evt).eventNode().id().equals(nodeId))
 +                        latch.countDown();
 +
 +                    return true;
 +                }
 +            }, EVT_NODE_LEFT);
 +
 +            stopGrid();
 +
 +            latch.await();
 +
 +            assertEquals(0, map.size());
 +            assertTrue(rs.isClosed());
 +        }
 +        finally {
 +            // Ensure that additional node is stopped.
 +            stopGrid();
 +        }
 +    }
 +
 +    /**
 +     * @param cache Cache.
 +     * @throws Exception If check failed.
 +     */
 +    private void checkQueryResults(IgniteCache<CacheKey, CacheValue> cache) 
throws Exception {
 +        QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry =
 +            cache.localQuery(new QuerySqlPredicate<CacheKey, CacheValue>("val 
> 1 and val < 4"));
 +
 +        Iterator<Cache.Entry<CacheKey, CacheValue>> iter = qry.iterator();
 +
 +        assert iter != null;
 +        assert iter.hasNext();
 +
 +        Cache.Entry<CacheKey, CacheValue> entry = iter.next();
 +
 +        assert entry.getKey().equals(new CacheKey(2)) || 
entry.getKey().equals(new CacheKey(3));
 +
 +        assert iter.hasNext();
 +
 +        entry = iter.next();
 +
 +        assert entry.getKey().equals(new CacheKey(2)) || 
entry.getKey().equals(new CacheKey(3));
 +        assert !iter.hasNext();
 +    }
 +
 +    /**
 +     * Cache key.
 +     */
 +    private static class CacheKey implements Externalizable {
 +        /** Key. */
 +        private int key;
 +
 +        /**
 +         * @param key Key.
 +         */
 +        CacheKey(int key) {
 +            this.key = key;
 +        }
 +
 +        /**
 +         *
 +         */
 +        public CacheKey() {
 +            /* No-op. */
 +        }
 +
 +        /**
 +         * @return Key.
 +         */
 +        public int getKey() {
 +            return key;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void readExternal(ObjectInput in) throws 
IOException, ClassNotFoundException {
 +            key = in.readInt();
 +
 +            keyDesCnt++;
 +
 +            if (TEST_DEBUG)
 +                X.println("Deserialized demo key [keyDesCnt=" + keyDesCnt + 
", key=" + this + ']');
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
 +            out.writeInt(key);
 +
 +            keySerCnt++;
 +
 +            if (TEST_DEBUG)
 +                X.println("Serialized demo key [serCnt=" + keySerCnt + ", 
key=" + this + ']');
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public boolean equals(Object o) {
 +            CacheKey cacheKey;
 +
 +            if (o instanceof CacheKey)
 +                cacheKey = (CacheKey)o;
 +            else
 +                return false;
 +
 +            return key == cacheKey.key;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public int hashCode() {
 +            return key;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(CacheKey.class, this);
 +        }
 +    }
 +
 +    /**
 +     * Cache value..
 +     */
 +    private static class CacheValue implements Externalizable {
 +        /** Value. */
 +        @CacheQuerySqlField
 +        private String val;
 +
 +        /**
 +         * @param val Value.
 +         */
 +        CacheValue(String val) {
 +            this.val = val;
 +        }
 +
 +        /**
 +         *
 +         */
 +        public CacheValue() {
 +            /* No-op. */
 +        }
 +
 +        /**
 +         * @return Value.
 +         */
 +        public String getValue() {
 +            return val;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void readExternal(ObjectInput in) throws 
IOException, ClassNotFoundException {
 +            val = U.readString(in);
 +
 +            valDesCnt++;
 +
 +            if (TEST_DEBUG)
 +                X.println("Deserialized demo value [valDesCnt=" + valDesCnt + 
", val=" + this + ']');
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
 +            U.writeString(out, val);
 +
 +            valSerCnt++;
 +
 +            if (TEST_DEBUG)
 +                X.println("Serialized demo value [serCnt=" + valSerCnt + ", 
val=" + this + ']');
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public boolean equals(Object o) {
 +            if (this == o)
 +                return true;
 +
 +            if (o == null || getClass() != o.getClass())
 +                return false;
 +
 +            CacheValue val = (CacheValue)o;
 +
 +            return !(this.val != null ? !this.val.equals(val.val) : val.val 
!= null);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public int hashCode() {
 +            return val != null ? val.hashCode() : 0;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(CacheValue.class, this);
 +        }
 +    }
 +}

Reply via email to