http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0e7b6d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
deleted file mode 100644
index 9c02003..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
+++ /dev/null
@@ -1,3197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.query;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.indexing.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.query.*;
-import org.gridgain.grid.kernal.*;
-import org.gridgain.grid.kernal.managers.eventstorage.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.datastructures.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
-import org.gridgain.grid.kernal.processors.query.*;
-import org.gridgain.grid.kernal.processors.task.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.future.*;
-import org.gridgain.grid.util.io.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.sql.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.gridgain.grid.cache.GridCacheMode.*;
-import static org.apache.ignite.events.IgniteEventType.*;
-import static org.gridgain.grid.kernal.GridClosureCallMode.*;
-import static 
org.gridgain.grid.kernal.processors.cache.query.GridCacheQueryType.*;
-
-/**
- * Query and index manager.
- */
-@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
-    /** */
-    protected GridQueryProcessor qryProc;
-
-    /** */
-    private String space;
-
-    /** */
-    private int maxIterCnt;
-
-    /** */
-    private volatile GridCacheQueryMetricsAdapter metrics = new 
GridCacheQueryMetricsAdapter();
-
-    /** */
-    private final ConcurrentMap<UUID, Map<Long, 
GridFutureAdapter<QueryResult<K, V>>>> qryIters =
-        new ConcurrentHashMap8<>();
-
-    /** */
-    private final ConcurrentMap<UUID, Map<Long, 
GridFutureAdapter<FieldsResult>>> fieldsQryRes =
-        new ConcurrentHashMap8<>();
-
-    /** */
-    private volatile ConcurrentMap<Object, CachedResult<?>> qryResCache = new 
ConcurrentHashMap8<>();
-
-    /** */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
-    /** {@inheritDoc} */
-    @Override public void start0() throws IgniteCheckedException {
-        qryProc = cctx.kernalContext().query();
-        space = cctx.name();
-        maxIterCnt = cctx.config().getMaximumQueryIteratorCount();
-
-        cctx.events().addListener(new GridLocalEventListener() {
-            @Override public void onEvent(IgniteEvent evt) {
-                UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id();
-
-                Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs = 
qryIters.remove(nodeId);
-
-                if (futs != null) {
-                    for (Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>> 
entry : futs.entrySet()) {
-                        final Object recipient = recipient(nodeId, 
entry.getKey());
-
-                        entry.getValue().listenAsync(new 
CIX1<IgniteFuture<QueryResult<K, V>>>() {
-                            @Override public void 
applyx(IgniteFuture<QueryResult<K, V>> f) throws IgniteCheckedException {
-                                f.get().closeIfNotShared(recipient);
-                            }
-                        });
-                    }
-                }
-
-                Map<Long, GridFutureAdapter<FieldsResult>> fieldsFuts = 
fieldsQryRes.remove(nodeId);
-
-                if (fieldsFuts != null) {
-                    for (Map.Entry<Long, GridFutureAdapter<FieldsResult>> 
entry : fieldsFuts.entrySet()) {
-                        final Object recipient = recipient(nodeId, 
entry.getKey());
-
-                        entry.getValue().listenAsync(new 
CIX1<IgniteFuture<FieldsResult>>() {
-                            @Override public void 
applyx(IgniteFuture<FieldsResult> f)
-                                throws IgniteCheckedException {
-                                f.get().closeIfNotShared(recipient);
-                            }
-                        });
-                    }
-                }
-            }
-        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onKernalStop0(boolean cancel) {
-        busyLock.block();
-
-        if (cancel)
-            onCancelAtStop();
-        else
-            onWaitAtStop();
-    }
-
-    /**
-     * @return {@code True} if entered busy state.
-     */
-    private boolean enterBusy() {
-        return busyLock.enterBusy();
-    }
-
-    /**
-     *  Leaves busy state.
-     */
-    private void leaveBusy() {
-        busyLock.leaveBusy();
-    }
-
-    /**
-     * Stops query manager.
-     *
-     * @param cancel Cancel queries.
-     */
-    @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
-    @Override public final void stop0(boolean cancel) {
-        if (log.isDebugEnabled())
-            log.debug("Stopped cache query manager.");
-    }
-
-    /**
-     * Gets number of objects of given type in index.
-     *
-     * @param valType Value type.
-     * @return Number of objects or -1 if type was not indexed at all.
-     * @throws IgniteCheckedException If failed.
-     */
-    public long size(Class<?> valType) throws IgniteCheckedException {
-        if (!enterBusy())
-            throw new IllegalStateException("Failed to get size (grid is 
stopping).");
-
-        try {
-            return qryProc.size(space, valType);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Rebuilds all search indexes of given value type.
-     *
-     * @param valType Value type.
-     * @return Future that will be completed when rebuilding of all indexes is 
finished.
-     */
-    public IgniteFuture<?> rebuildIndexes(Class<?> valType) {
-        return rebuildIndexes(valType.getName());
-    }
-
-    /**
-     * Rebuilds all search indexes of given value type.
-     *
-     * @param typeName Value type name.
-     * @return Future that will be completed when rebuilding of all indexes is 
finished.
-     */
-    public IgniteFuture<?> rebuildIndexes(String typeName) {
-        if (!enterBusy())
-            throw new IllegalStateException("Failed to rebuild indexes (grid 
is stopping).");
-
-        try {
-            return qryProc.rebuildIndexes(space, typeName);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Rebuilds all search indexes of all types.
-     *
-     * @return Future that will be completed when rebuilding of all indexes is 
finished.
-     */
-    public IgniteFuture<?> rebuildAllIndexes() {
-        if (!enterBusy())
-            throw new IllegalStateException("Failed to rebuild indexes (grid 
is stopping).");
-
-        try {
-            return qryProc.rebuildAllIndexes();
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Marks this request as canceled.
-     *
-     * @param reqId Request id.
-     */
-    void onQueryFutureCanceled(long reqId) {
-        // No-op.
-    }
-
-    /**
-     * Cancel flag handler at stop.
-     */
-    void onCancelAtStop() {
-        // No-op.
-    }
-
-    /**
-     * Wait flag handler at stop.
-     */
-    void onWaitAtStop() {
-        // No-op.
-    }
-
-    /**
-     * Processes cache query request.
-     *
-     * @param sndId Sender node id.
-     * @param req Query request.
-     */
-    void processQueryRequest(UUID sndId, GridCacheQueryRequest req) {
-        // No-op.
-    }
-
-    /**
-     * Entry for given key unswapped.
-     *
-     * @param swapSpaceName Swap space name.
-     * @param key Key.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void onSwap(String swapSpaceName, K key) throws 
IgniteCheckedException {
-        if (!enterBusy())
-            return; // Ignore index update when node is stopping.
-
-        try {
-            qryProc.onSwap(space, key);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Entry for given key unswapped.
-     *
-     * @param key Key.
-     * @param val Value
-     * @param valBytes Value bytes.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void onUnswap(K key, V val, byte[] valBytes) throws 
IgniteCheckedException {
-        if (!enterBusy())
-            return; // Ignore index update when node is stopping.
-
-        try {
-            qryProc.onUnswap(space, key, val, valBytes);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     *
-     */
-    private void invalidateResultCache() {
-        if (!qryResCache.isEmpty())
-            qryResCache = new ConcurrentHashMap8<>();
-    }
-
-    /**
-     * Writes key-value pair to index.
-     *
-     * @param key Key.
-     * @param keyBytes Byte array with key data.
-     * @param val Value.
-     * @param valBytes Value bytes.
-     * @param ver Cache entry version.
-     * @param expirationTime Expiration time or 0 if never expires.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void store(K key, @Nullable byte[] keyBytes, @Nullable V val, 
@Nullable byte[] valBytes,
-        GridCacheVersion ver, long expirationTime)
-        throws IgniteCheckedException {
-        assert key != null;
-        assert val != null || valBytes != null;
-
-        if (!cctx.config().isQueryIndexEnabled() && !(key instanceof 
GridCacheInternal))
-            return; // No-op.
-
-        if (!enterBusy())
-            return; // Ignore index update when node is stopping.
-
-        try {
-            if (val == null)
-                val = cctx.marshaller().unmarshal(valBytes, 
cctx.deploy().globalLoader());
-
-            qryProc.store(space, key, keyBytes, val, valBytes, 
CU.versionToBytes(ver), expirationTime);
-        }
-        finally {
-            invalidateResultCache();
-
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param key Key.
-     * @param keyBytes Byte array with key value.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    @SuppressWarnings("SimplifiableIfStatement")
-    public void remove(K key, @Nullable byte[] keyBytes) throws 
IgniteCheckedException {
-        assert key != null;
-
-        if (!cctx.config().isQueryIndexEnabled() && !(key instanceof 
GridCacheInternal))
-            return; // No-op.
-
-        if (!enterBusy())
-            return; // Ignore index update when node is stopping.
-
-        try {
-            qryProc.remove(space, key);
-        }
-        finally {
-            invalidateResultCache();
-
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Undeploys given class loader.
-     *
-     * @param ldr Class loader to undeploy.
-     */
-    public void onUndeploy(ClassLoader ldr) {
-        if (!enterBusy())
-            return; // Ignore index update when node is stopping.
-
-        try {
-            qryProc.onUndeploy(space, ldr);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            invalidateResultCache();
-
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Executes local query.
-     *
-     * @param qry Query.
-     * @return Query future.
-     */
-    public abstract GridCacheQueryFuture<?> queryLocal(GridCacheQueryBean qry);
-
-    /**
-     * Executes distributed query.
-     *
-     * @param qry Query.
-     * @param nodes Nodes.
-     * @return Query future.
-     */
-    public abstract GridCacheQueryFuture<?> 
queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes);
-
-    /**
-     * Loads page.
-     *
-     * @param id Query ID.
-     * @param qry Query.
-     * @param nodes Nodes.
-     * @param all Whether to load all pages.
-     */
-    public abstract void loadPage(long id, GridCacheQueryAdapter<?> qry, 
Collection<ClusterNode> nodes, boolean all);
-
-    /**
-     * Executes distributed fields query.
-     *
-     * @param qry Query.
-     * @return Query future.
-     */
-    public abstract GridCacheQueryFuture<?> 
queryFieldsLocal(GridCacheQueryBean qry);
-
-    /**
-     * Executes distributed fields query.
-     *
-     * @param qry Query.
-     * @param nodes Nodes.
-     * @return Query future.
-     */
-    public abstract GridCacheQueryFuture<?> 
queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes);
-
-    /**
-     * Performs query.
-     *
-     * @param qry Query.
-     * @param args Arguments.
-     * @param loc Local query or not.
-     * @param subjId Security subject ID.
-     * @param taskName Task name.
-     * @param recipient ID of the recipient.
-     * @return Collection of found keys.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private QueryResult<K, V> executeQuery(GridCacheQueryAdapter<?> qry,
-        @Nullable Object[] args, boolean loc, @Nullable UUID subjId, @Nullable 
String taskName, Object recipient)
-        throws IgniteCheckedException {
-        if (qry.type() == null) {
-            assert !loc;
-
-            throw new IgniteCheckedException("Received next page request after 
iterator was removed. " +
-                "Consider increasing maximum number of stored iterators (see " 
+
-                "GridCacheConfiguration.getMaximumQueryIteratorCount() 
configuration property).");
-        }
-
-        QueryResult<K, V> res;
-
-        T3<String, String, List<Object>> resKey = null;
-
-        if (qry.type() == SQL) {
-            resKey = new T3<>(qry.queryClassName(), qry.clause(), 
F.asList(args));
-
-            res = (QueryResult<K, V>)qryResCache.get(resKey);
-
-            if (res != null && res.addRecipient(recipient))
-                return res;
-
-            res = new QueryResult<>(qry.type(), recipient);
-
-            if (qryResCache.putIfAbsent(resKey, res) != null)
-                resKey = null;
-        }
-        else
-            res = new QueryResult<>(qry.type(), recipient);
-
-        GridCloseableIterator<IgniteBiTuple<K, V>> iter;
-
-        try {
-            switch (qry.type()) {
-                case SQL:
-                    if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                        cctx.gridEvents().record(new 
IgniteCacheQueryExecutedEvent<>(
-                            cctx.localNode(),
-                            "SQL query executed.",
-                            EVT_CACHE_QUERY_EXECUTED,
-                            
org.gridgain.grid.cache.query.GridCacheQueryType.SQL,
-                            cctx.namex(),
-                            qry.queryClassName(),
-                            qry.clause(),
-                            null,
-                            null,
-                            args,
-                            subjId,
-                            taskName));
-                    }
-
-                    iter = qryProc.query(space, qry.clause(), F.asList(args),
-                        qry.queryClassName(), filter(qry));
-
-                    break;
-
-                case SCAN:
-                    if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                        cctx.gridEvents().record(new 
IgniteCacheQueryExecutedEvent<>(
-                            cctx.localNode(),
-                            "Scan query executed.",
-                            EVT_CACHE_QUERY_EXECUTED,
-                            
org.gridgain.grid.cache.query.GridCacheQueryType.SCAN,
-                            cctx.namex(),
-                            null,
-                            null,
-                            qry.scanFilter(),
-                            null,
-                            null,
-                            subjId,
-                            taskName));
-                    }
-
-                    iter = scanIterator(qry);
-
-                    break;
-
-                case TEXT:
-                    if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                        cctx.gridEvents().record(new 
IgniteCacheQueryExecutedEvent<>(
-                            cctx.localNode(),
-                            "Full text query executed.",
-                            EVT_CACHE_QUERY_EXECUTED,
-                            
org.gridgain.grid.cache.query.GridCacheQueryType.FULL_TEXT,
-                            cctx.namex(),
-                            qry.queryClassName(),
-                            qry.clause(),
-                            null,
-                            null,
-                            null,
-                            subjId,
-                            taskName));
-                    }
-
-                    iter = qryProc.queryText(space, qry.clause(), 
qry.queryClassName(), filter(qry));
-
-                    break;
-
-                case SET:
-                    iter = setIterator(qry);
-
-                    break;
-
-                case SQL_FIELDS:
-                    assert false : "SQL fields query is incorrectly 
processed.";
-
-                default:
-                    throw new IgniteCheckedException("Unknown query type: " + 
qry.type());
-            }
-
-            res.onDone(iter);
-        }
-        catch (Exception e) {
-            res.onDone(e);
-        }
-        finally {
-            if (resKey != null)
-                qryResCache.remove(resKey, res);
-        }
-
-        return res;
-    }
-
-    /**
-     * Performs fields query.
-     *
-     * @param qry Query.
-     * @param args Arguments.
-     * @param loc Local query or not.
-     * @param subjId Security subject ID.
-     * @param taskName Task name.
-     * @param recipient ID of the recipient.
-     * @return Collection of found keys.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private FieldsResult executeFieldsQuery(GridCacheQueryAdapter<?> qry, 
@Nullable Object[] args,
-        boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object 
recipient) throws IgniteCheckedException {
-        assert qry != null;
-
-        FieldsResult res;
-
-        T2<String, List<Object>> resKey = null;
-
-        if (qry.type() == SQL_FIELDS) {
-            if (qry.clause() == null) {
-                assert !loc;
-
-                throw new IgniteCheckedException("Received next page request 
after iterator was removed. " +
-                    "Consider increasing maximum number of stored iterators 
(see " +
-                    "GridCacheConfiguration.getMaximumQueryIteratorCount() 
configuration property).");
-            }
-
-            if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                cctx.gridEvents().record(new IgniteCacheQueryExecutedEvent<>(
-                    cctx.localNode(),
-                    "SQL fields query executed.",
-                    EVT_CACHE_QUERY_EXECUTED,
-                    
org.gridgain.grid.cache.query.GridCacheQueryType.SQL_FIELDS,
-                    cctx.namex(),
-                    null,
-                    qry.clause(),
-                    null,
-                    null,
-                    args,
-                    subjId,
-                    taskName));
-            }
-
-            // Attempt to get result from cache.
-            resKey = new T2<>(qry.clause(), F.asList(args));
-
-            res = (FieldsResult)qryResCache.get(resKey);
-
-            if (res != null && res.addRecipient(recipient))
-                return res; // Cached result found.
-
-            res = new FieldsResult(recipient);
-
-            if (qryResCache.putIfAbsent(resKey, res) != null)
-                resKey = null; // Failed to cache result.
-        }
-        else {
-            assert qry.type() == SPI;
-
-            if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                cctx.gridEvents().record(new IgniteCacheQueryExecutedEvent<>(
-                    cctx.localNode(),
-                    "SPI query executed.",
-                    EVT_CACHE_QUERY_EXECUTED,
-                    org.gridgain.grid.cache.query.GridCacheQueryType.SPI,
-                    cctx.namex(),
-                    null,
-                    null,
-                    null,
-                    null,
-                    args,
-                    subjId,
-                    taskName));
-            }
-
-            res = new FieldsResult(recipient);
-        }
-
-        try {
-            if (qry.type() == SPI) {
-                IgniteSpiCloseableIterator<?> iter = 
cctx.kernalContext().indexing().query(space, F.asList(args),
-                    filter(qry));
-
-                res.onDone(iter);
-            }
-            else {
-                assert qry.type() == SQL_FIELDS;
-
-                GridQueryFieldsResult qryRes = qryProc.queryFields(space, 
qry.clause(), F.asList(args), filter(qry));
-
-                res.metaData(qryRes.metaData());
-
-                res.onDone(qryRes.iterator());
-            }
-        }
-        catch (Exception e) {
-            res.onDone(e);
-        }
-        finally {
-            if (resKey != null)
-                qryResCache.remove(resKey, res);
-        }
-
-        return res;
-    }
-
-    /**
-     * @param qry Query.
-     * @return Cache set items iterator.
-     */
-    private GridCloseableIterator<IgniteBiTuple<K, V>> 
setIterator(GridCacheQueryAdapter<?> qry) {
-        final GridSetQueryPredicate filter = 
(GridSetQueryPredicate)qry.scanFilter();
-
-        filter.init(cctx);
-
-        IgniteUuid id = filter.setId();
-
-        Collection<GridCacheSetItemKey> data = 
cctx.dataStructures().setData(id);
-
-        if (data == null)
-            data = Collections.emptyList();
-
-        final GridIterator<IgniteBiTuple<K, V>> it = F.iterator(
-            data,
-            new C1<GridCacheSetItemKey, IgniteBiTuple<K, V>>() {
-                @Override public IgniteBiTuple<K, V> apply(GridCacheSetItemKey 
e) {
-                    return new IgniteBiTuple<>((K)e.item(), (V)Boolean.TRUE);
-                }
-            },
-            true,
-            new P1<GridCacheSetItemKey>() {
-                @Override public boolean apply(GridCacheSetItemKey e) {
-                    return filter.apply(e, null);
-                }
-            });
-
-        return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
-            @Override protected boolean onHasNext() {
-                return it.hasNext();
-            }
-
-            @Override protected IgniteBiTuple<K, V> onNext() {
-                return it.next();
-            }
-
-            @Override protected void onRemove() {
-                it.remove();
-            }
-
-            @Override protected void onClose() {
-                // No-op.
-            }
-        };
-    }
-
-    /**
-     * @param qry Query.
-     * @return Full-scan row iterator.
-     * @throws IgniteCheckedException If failed to get iterator.
-     */
-    @SuppressWarnings({"unchecked"})
-    private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator(final 
GridCacheQueryAdapter<?> qry)
-        throws IgniteCheckedException {
-        IgnitePredicate<GridCacheEntry<K, V>> filter = null;
-
-        if (qry.projectionFilter() != null) {
-            filter = new P1<GridCacheEntry<K, V>>() {
-                @Override public boolean apply(GridCacheEntry<K, V> e) {
-                    return 
qry.projectionFilter().apply((GridCacheEntry<Object, Object>)e);
-                }
-            };
-        }
-
-        GridCacheProjection<K, V> prj0 = filter != null ? 
cctx.cache().projection(filter) : cctx.cache();
-
-        if (qry.keepPortable())
-            prj0 = prj0.keepPortable();
-
-        final GridCacheProjection<K, V> prj = prj0;
-
-        final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
-
-        injectResources(keyValFilter);
-
-        GridIterator<IgniteBiTuple<K, V>> heapIt = new 
GridIteratorAdapter<IgniteBiTuple<K, V>>() {
-            private IgniteBiTuple<K, V> next;
-
-            private Iterator<K> iter = prj.keySet().iterator();
-
-            {
-                advance();
-            }
-
-            @Override public boolean hasNextX() {
-                return next != null;
-            }
-
-            @Override public IgniteBiTuple<K, V> nextX() {
-                if (next == null)
-                    throw new NoSuchElementException();
-
-                IgniteBiTuple<K, V> next0 = next;
-
-                advance();
-
-                return next0;
-            }
-
-            @Override public void removeX() {
-                throw new UnsupportedOperationException();
-            }
-
-            private void advance() {
-                IgniteBiTuple<K, V> next0 = null;
-
-                while (iter.hasNext()) {
-                    next0 = null;
-
-                    K key = iter.next();
-
-                    V val = prj.peek(key);
-
-                    if (val != null) {
-                        next0 = F.t(key, val);
-
-                        if (checkPredicate(next0))
-                            break;
-                        else
-                            next0 = null;
-                    }
-                }
-
-                next = next0 != null ?
-                    new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
-                    null;
-            }
-
-            private boolean checkPredicate(Map.Entry<K, V> e) {
-                if (keyValFilter != null) {
-                    Map.Entry<K, V> e0 = (Map.Entry<K, 
V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
-
-                    return keyValFilter.apply(e0.getKey(), e0.getValue());
-                }
-
-                return true;
-            }
-        };
-
-        final GridIterator<IgniteBiTuple<K, V>> it;
-
-        if (cctx.isSwapOrOffheapEnabled()) {
-            List<GridIterator<IgniteBiTuple<K, V>>> iters = new ArrayList<>(3);
-
-            iters.add(heapIt);
-
-            if (cctx.isOffHeapEnabled())
-                iters.add(offheapIterator(qry));
-
-            if (cctx.swap().swapEnabled())
-                iters.add(swapIterator(qry));
-
-            it = new CompoundIterator<>(iters);
-        }
-        else
-            it = heapIt;
-
-        return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
-            @Override protected boolean onHasNext() {
-                return it.hasNext();
-            }
-
-            @Override protected IgniteBiTuple<K, V> onNext() {
-                return it.next();
-            }
-
-            @Override protected void onRemove() {
-                it.remove();
-            }
-        };
-    }
-
-    /**
-     * @param qry Query.
-     * @return Swap iterator.
-     * @throws IgniteCheckedException If failed.
-     */
-    private GridIterator<IgniteBiTuple<K, V>> 
swapIterator(GridCacheQueryAdapter<?> qry)
-        throws IgniteCheckedException {
-        IgnitePredicate<GridCacheEntry<Object, Object>> prjPred = 
qry.projectionFilter();
-
-        IgniteBiPredicate<K, V> filter = qry.scanFilter();
-
-        Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator();
-
-        return scanIterator(it, prjPred, filter, qry.keepPortable());
-    }
-
-    /**
-     * @param qry Query.
-     * @return Offheap iterator.
-     */
-    private GridIterator<IgniteBiTuple<K, V>> 
offheapIterator(GridCacheQueryAdapter<?> qry) {
-        IgnitePredicate<GridCacheEntry<Object, Object>> prjPred = 
qry.projectionFilter();
-
-        IgniteBiPredicate<K, V> filter = qry.scanFilter();
-
-        if ((cctx.portableEnabled() && cctx.offheapTiered()) && (prjPred != 
null || filter != null)) {
-            OffheapIteratorClosure c = new OffheapIteratorClosure(prjPred, 
filter, qry.keepPortable());
-
-            return cctx.swap().rawOffHeapIterator(c);
-        }
-        else {
-            Iterator<Map.Entry<byte[], byte[]>> it = 
cctx.swap().rawOffHeapIterator();
-
-            return scanIterator(it, prjPred, filter, qry.keepPortable());
-        }
-    }
-
-    /**
-     * @param it Lazy swap or offheap iterator.
-     * @param prjPred Projection predicate.
-     * @param filter Scan filter.
-     * @param keepPortable Keep portable flag.
-     * @return Iterator.
-     */
-    private GridIteratorAdapter<IgniteBiTuple<K, V>> scanIterator(
-        @Nullable final Iterator<Map.Entry<byte[], byte[]>> it,
-        @Nullable final IgnitePredicate<GridCacheEntry<Object, Object>> 
prjPred,
-        @Nullable final IgniteBiPredicate<K, V> filter,
-        final boolean keepPortable) {
-        if (it == null)
-            return new GridEmptyCloseableIterator<>();
-
-        return new GridIteratorAdapter<IgniteBiTuple<K, V>>() {
-            private IgniteBiTuple<K, V> next;
-
-            {
-                advance();
-            }
-
-            @Override public boolean hasNextX() {
-                return next != null;
-            }
-
-            @Override public IgniteBiTuple<K, V> nextX() {
-                if (next == null)
-                    throw new NoSuchElementException();
-
-                IgniteBiTuple<K, V> next0 = next;
-
-                advance();
-
-                return next0;
-            }
-
-            @Override public void removeX() {
-                throw new UnsupportedOperationException();
-            }
-
-            private void advance() {
-                next = null;
-
-                while (it.hasNext()) {
-                    final LazySwapEntry e = new LazySwapEntry(it.next());
-
-                    if (prjPred != null) {
-                        GridCacheEntry<K, V> cacheEntry = new 
GridCacheScanSwapEntry(e);
-
-                        if (!prjPred.apply((GridCacheEntry<Object, 
Object>)cacheEntry))
-                            continue;
-                    }
-
-                    if (filter != null) {
-                        K key = (K)cctx.unwrapPortableIfNeeded(e.key(), 
keepPortable);
-                        V val = (V)cctx.unwrapPortableIfNeeded(e.value(), 
keepPortable);
-
-                        if (!filter.apply(key, val))
-                            continue;
-                    }
-
-                    next = new IgniteBiTuple<>(e.key(), e.value());
-
-                    break;
-                }
-            }
-        };
-    }
-
-    /**
-     * @param o Object to inject resources to.
-     * @throws IgniteCheckedException If failure occurred while injecting 
resources.
-     */
-    private void injectResources(@Nullable Object o) throws 
IgniteCheckedException {
-        if (o != null) {
-            GridKernalContext ctx = cctx.kernalContext();
-
-            ClassLoader ldr = o.getClass().getClassLoader();
-
-            if (ctx.deploy().isGlobalLoader(ldr))
-                
ctx.resource().inject(ctx.deploy().getDeployment(ctx.deploy().getClassLoaderId(ldr)),
 o.getClass(), o);
-            else
-                
ctx.resource().inject(ctx.deploy().getDeployment(o.getClass().getName()), 
o.getClass(), o);
-        }
-    }
-
-    /**
-     * Processes fields query request.
-     *
-     * @param qryInfo Query info.
-     */
-    protected void runFieldsQuery(GridCacheQueryInfo qryInfo) {
-        assert qryInfo != null;
-
-        if (!enterBusy()) {
-            if (cctx.localNodeId().equals(qryInfo.senderId()))
-                throw new IllegalStateException("Failed to process query 
request (grid is stopping).");
-
-            return; // Ignore remote requests when when node is stopping.
-        }
-
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Running query: " + qryInfo);
-
-            boolean rmvRes = true;
-
-            try {
-                // Preparing query closures.
-                IgnitePredicate<GridCacheEntry<Object, Object>> prjFilter = 
qryInfo.projectionPredicate();
-                IgniteClosure<Object, Object> trans = (IgniteClosure<Object, 
Object>)qryInfo.transformer();
-                IgniteReducer<Object, Object> rdc = (IgniteReducer<Object, 
Object>)qryInfo.reducer();
-
-                injectResources(prjFilter);
-                injectResources(trans);
-                injectResources(rdc);
-
-                GridCacheQueryAdapter<?> qry = qryInfo.query();
-
-                int pageSize = qry.pageSize();
-
-                Collection<Object> data = null;
-                Collection<Object> entities = null;
-
-                if (qryInfo.local() || rdc != null || 
cctx.isLocalNode(qryInfo.senderId()))
-                    data = new ArrayList<>(pageSize);
-                else
-                    entities = new ArrayList<>(pageSize);
-
-                String taskName = 
cctx.kernalContext().task().resolveTaskName(qry.taskHash());
-
-                FieldsResult res = qryInfo.local() ?
-                    executeFieldsQuery(qry, qryInfo.arguments(), 
qryInfo.local(), qry.subjectId(), taskName,
-                    recipient(qryInfo.senderId(), qryInfo.requestId())) :
-                    fieldsQueryResult(qryInfo, taskName);
-
-                // If metadata needs to be returned to user and cleaned from 
internal fields - copy it.
-                List<GridQueryFieldMetadata> meta = qryInfo.includeMetaData() ?
-                    (res.metaData() != null ? new 
ArrayList<GridQueryFieldMetadata>(res.metaData()) : null) :
-                    res.metaData();
-
-                if (!qryInfo.includeMetaData())
-                    meta = null;
-
-                GridCloseableIterator<?> it = new 
GridSpiCloseableIteratorWrapper<Object>(
-                    res.iterator(recipient(qryInfo.senderId(), 
qryInfo.requestId())));
-
-                if (log.isDebugEnabled())
-                    log.debug("Received fields iterator [iterHasNext=" + 
it.hasNext() + ']');
-
-                if (!it.hasNext()) {
-                    if (rdc != null)
-                        data = Collections.singletonList(rdc.reduce());
-
-                    onFieldsPageReady(qryInfo.local(), qryInfo, meta, 
entities, data, true, null);
-
-                    return;
-                }
-
-                int cnt = 0;
-                boolean metaSent = false;
-
-                while (!Thread.currentThread().isInterrupted() && 
it.hasNext()) {
-                    Object row = it.next();
-
-                    // Query is cancelled.
-                    if (row == null) {
-                        onPageReady(qryInfo.local(), qryInfo, null, true, 
null);
-
-                        break;
-                    }
-
-                    if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
-                        cctx.gridEvents().record(new 
IgniteCacheQueryReadEvent<K, V>(
-                            cctx.localNode(),
-                            "SQL fields query result set row read.",
-                            EVT_CACHE_QUERY_OBJECT_READ,
-                            
org.gridgain.grid.cache.query.GridCacheQueryType.SQL_FIELDS,
-                            cctx.namex(),
-                            null,
-                            qry.clause(),
-                            null,
-                            null,
-                            qryInfo.arguments(),
-                            qry.subjectId(),
-                            taskName,
-                            null,
-                            null,
-                            null,
-                            row));
-                    }
-
-                    if ((qryInfo.local() || rdc != null || 
cctx.isLocalNode(qryInfo.senderId()))) {
-                        // Reduce.
-                        if (rdc != null) {
-                            if (!rdc.collect(row))
-                                break;
-                        }
-                        else
-                            data.add(row);
-                    }
-                    else
-                        entities.add(row);
-
-                    if (rdc == null && ((!qryInfo.allPages() && ++cnt == 
pageSize) || !it.hasNext())) {
-                        onFieldsPageReady(qryInfo.local(), qryInfo, !metaSent 
? meta : null,
-                            entities, data, !it.hasNext(), null);
-
-                        if (it.hasNext())
-                            rmvRes = false;
-
-                        if (!qryInfo.allPages())
-                            return;
-                    }
-                }
-
-                if (rdc != null) {
-                    onFieldsPageReady(qryInfo.local(), qryInfo, meta, null,
-                        Collections.singletonList(rdc.reduce()), true, null);
-                }
-            }
-            catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled() || !e.hasCause(SQLException.class))
-                    U.error(log, "Failed to run fields query [qry=" + qryInfo 
+ ", node=" + cctx.nodeId() + ']', e);
-                else {
-                    if (e.hasCause(SQLException.class))
-                        U.error(log, "Failed to run fields query [node=" + 
cctx.nodeId() +
-                            ", msg=" + 
e.getCause(SQLException.class).getMessage() + ']');
-                    else
-                        U.error(log, "Failed to run fields query [node=" + 
cctx.nodeId() +
-                            ", msg=" + e.getMessage() + ']');
-                }
-
-                onFieldsPageReady(qryInfo.local(), qryInfo, null, null, null, 
true, e);
-            }
-            catch (Throwable e) {
-                U.error(log, "Failed to run fields query [qry=" + qryInfo + ", 
node=" + cctx.nodeId() + "]", e);
-
-                onFieldsPageReady(qryInfo.local(), qryInfo, null, null, null, 
true, e);
-            }
-            finally {
-                if (rmvRes)
-                    removeFieldsQueryResult(qryInfo.senderId(), 
qryInfo.requestId());
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Processes cache query request.
-     *
-     * @param qryInfo Query info.
-     */
-    @SuppressWarnings("unchecked")
-    protected void runQuery(GridCacheQueryInfo qryInfo) {
-        assert qryInfo != null;
-
-        if (!enterBusy()) {
-            if (cctx.localNodeId().equals(qryInfo.senderId()))
-                throw new IllegalStateException("Failed to process query 
request (grid is stopping).");
-
-            return; // Ignore remote requests when when node is stopping.
-        }
-
-        try {
-            boolean loc = qryInfo.local();
-
-            if (log.isDebugEnabled())
-                log.debug("Running query: " + qryInfo);
-
-            boolean rmvIter = true;
-
-            try {
-                // Preparing query closures.
-                IgnitePredicate<GridCacheEntry<Object, Object>> prjFilter = 
qryInfo.projectionPredicate();
-                IgniteClosure<Map.Entry<K, V>, Object> trans = 
(IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
-                IgniteReducer<Map.Entry<K, V>, Object> rdc = 
(IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
-
-                injectResources(prjFilter);
-                injectResources(trans);
-                injectResources(rdc);
-
-                GridCacheQueryAdapter<?> qry = qryInfo.query();
-
-                int pageSize = qry.pageSize();
-
-                boolean incBackups = qry.includeBackups();
-
-                String taskName = 
cctx.kernalContext().task().resolveTaskName(qry.taskHash());
-
-                IgniteSpiCloseableIterator<IgniteBiTuple<K, V>> iter;
-                GridCacheQueryType type;
-
-                QueryResult<K, V> res;
-
-                res = loc ?
-                    executeQuery(qry, qryInfo.arguments(), loc, 
qry.subjectId(), taskName,
-                    recipient(qryInfo.senderId(), qryInfo.requestId())) :
-                    queryResult(qryInfo, taskName);
-
-                iter = res.iterator(recipient(qryInfo.senderId(), 
qryInfo.requestId()));
-                type = res.type();
-
-                GridCacheAdapter<K, V> cache = cctx.cache();
-
-                if (log.isDebugEnabled())
-                    log.debug("Received index iterator [iterHasNext=" + 
iter.hasNext() +
-                        ", cacheSize=" + cache.size() + ']');
-
-                int cnt = 0;
-
-                boolean stop = false;
-                boolean pageSent = false;
-
-                Collection<Object> data = new ArrayList<>(pageSize);
-
-                long topVer = cctx.affinity().affinityTopologyVersion();
-
-                while (!Thread.currentThread().isInterrupted() && 
iter.hasNext()) {
-                    IgniteBiTuple<K, V> row = iter.next();
-
-                    // Query is cancelled.
-                    if (row == null) {
-                        onPageReady(loc, qryInfo, null, true, null);
-
-                        break;
-                    }
-
-                    K key = row.getKey();
-
-                    // Filter backups for SCAN queries. Other types are 
filtered in indexing manager.
-                    if (!cctx.isReplicated() && cctx.config().getCacheMode() 
!= LOCAL && qry.type() == SCAN &&
-                        !incBackups && 
!cctx.affinity().primary(cctx.localNode(), key, topVer)) {
-                        if (log.isDebugEnabled())
-                            log.debug("Ignoring backup element [row=" + row +
-                                ", cacheMode=" + cctx.config().getCacheMode() 
+ ", incBackups=" + incBackups +
-                                ", primary=" + 
cctx.affinity().primary(cctx.localNode(), key, topVer) + ']');
-
-                        continue;
-                    }
-
-                    V val = row.getValue();
-
-                    if (log.isDebugEnabled())
-                        log.debug("Record [key=" + key + ", val=" + val + ", 
incBackups=" +
-                            incBackups + "priNode=" + 
U.id8(CU.primaryNode(cctx, key).id()) +
-                            ", node=" + U.id8(cctx.grid().localNode().id()) + 
']');
-
-                    if (val == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Unsuitable record value: " + val);
-
-                        continue;
-                    }
-
-                    switch (type) {
-                        case SQL:
-                            if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
-                                cctx.gridEvents().record(new 
IgniteCacheQueryReadEvent<>(
-                                    cctx.localNode(),
-                                    "SQL query entry read.",
-                                    EVT_CACHE_QUERY_OBJECT_READ,
-                                    
org.gridgain.grid.cache.query.GridCacheQueryType.SQL,
-                                    cctx.namex(),
-                                    qry.queryClassName(),
-                                    qry.clause(),
-                                    null,
-                                    null,
-                                    qryInfo.arguments(),
-                                    qry.subjectId(),
-                                    taskName,
-                                    key,
-                                    val,
-                                    null,
-                                    null));
-                            }
-
-                            break;
-
-                        case TEXT:
-                            if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
-                                cctx.gridEvents().record(new 
IgniteCacheQueryReadEvent<>(
-                                    cctx.localNode(),
-                                    "Full text query entry read.",
-                                    EVT_CACHE_QUERY_OBJECT_READ,
-                                    
org.gridgain.grid.cache.query.GridCacheQueryType.FULL_TEXT,
-                                    cctx.namex(),
-                                    qry.queryClassName(),
-                                    qry.clause(),
-                                    null,
-                                    null,
-                                    null,
-                                    qry.subjectId(),
-                                    taskName,
-                                    key,
-                                    val,
-                                    null,
-                                    null));
-                            }
-
-                            break;
-
-                        case SCAN:
-                            if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
-                                cctx.gridEvents().record(new 
IgniteCacheQueryReadEvent<>(
-                                    cctx.localNode(),
-                                    "Scan query entry read.",
-                                    EVT_CACHE_QUERY_OBJECT_READ,
-                                    
org.gridgain.grid.cache.query.GridCacheQueryType.SCAN,
-                                    cctx.namex(),
-                                    null,
-                                    null,
-                                    qry.scanFilter(),
-                                    null,
-                                    null,
-                                    qry.subjectId(),
-                                    taskName,
-                                    key,
-                                    val,
-                                    null,
-                                    null));
-                            }
-
-                            break;
-                    }
-
-                    Map.Entry<K, V> entry = F.t(key, val);
-
-                    // Unwrap entry for reducer or transformer only.
-                    if (rdc != null || trans != null)
-                        entry = (Map.Entry<K, 
V>)cctx.unwrapPortableIfNeeded(entry, qry.keepPortable());
-
-                    // Reduce.
-                    if (rdc != null) {
-                        if (!rdc.collect(entry) || !iter.hasNext()) {
-                            onPageReady(loc, qryInfo, 
Collections.singletonList(rdc.reduce()), true, null);
-
-                            pageSent = true;
-
-                            break;
-                        }
-                        else
-                            continue;
-                    }
-
-                    data.add(trans != null ? trans.apply(entry) :
-                        !loc ? new GridCacheQueryResponseEntry<>(key, val) : 
F.t(key, val));
-
-                    if (!loc) {
-                        if (++cnt == pageSize || !iter.hasNext()) {
-                            boolean finished = !iter.hasNext();
-
-                            onPageReady(loc, qryInfo, data, finished, null);
-
-                            pageSent = true;
-
-                            if (!finished)
-                                rmvIter = false;
-
-                            if (!qryInfo.allPages())
-                                return;
-
-                            data = new ArrayList<>(pageSize);
-
-                            if (stop)
-                                break; // while
-                        }
-                    }
-                }
-
-                if (!pageSent) {
-                    if (rdc == null)
-                        onPageReady(loc, qryInfo, data, true, null);
-                    else
-                        onPageReady(loc, qryInfo, 
Collections.singletonList(rdc.reduce()), true, null);
-                }
-            }
-            catch (Throwable e) {
-                U.error(log, "Failed to run query [qry=" + qryInfo + ", node=" 
+ cctx.nodeId() + "]", e);
-
-                onPageReady(loc, qryInfo, null, true, e);
-            }
-            finally {
-                if (rmvIter)
-                    removeQueryResult(qryInfo.senderId(), qryInfo.requestId());
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param qryInfo Info.
-     * @return Iterator.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private QueryResult<K, V> queryResult(GridCacheQueryInfo qryInfo, String 
taskName) throws IgniteCheckedException {
-        final UUID sndId = qryInfo.senderId();
-
-        assert sndId != null;
-
-        Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs = 
qryIters.get(sndId);
-
-        if (futs == null) {
-            futs = new LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, 
V>>>(
-                16, 0.75f, true) {
-                @Override protected boolean removeEldestEntry(Map.Entry<Long, 
GridFutureAdapter<QueryResult<K, V>>> e) {
-                    boolean rmv = size() > maxIterCnt;
-
-                    if (rmv) {
-                        try {
-                            
e.getValue().get().closeIfNotShared(recipient(sndId, e.getKey()));
-                        }
-                        catch (IgniteCheckedException ex) {
-                            U.error(log, "Failed to close query iterator.", 
ex);
-                        }
-                    }
-
-                    return rmv;
-                }
-            };
-
-            Map<Long, GridFutureAdapter<QueryResult<K, V>>> old = 
qryIters.putIfAbsent(sndId, futs);
-
-            if (old != null)
-                futs = old;
-        }
-
-        return queryResult(futs, qryInfo, taskName);
-    }
-
-    /**
-     * @param futs Futures map.
-     * @param qryInfo Info.
-     * @return Iterator.
-     * @throws IgniteCheckedException In case of error.
-     */
-    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
-        "NonPrivateFieldAccessedInSynchronizedContext"})
-    private QueryResult<K, V> queryResult(Map<Long, 
GridFutureAdapter<QueryResult<K, V>>> futs,
-        GridCacheQueryInfo qryInfo, String taskName) throws 
IgniteCheckedException {
-        assert futs != null;
-        assert qryInfo != null;
-
-        GridFutureAdapter<QueryResult<K, V>> fut;
-
-        boolean exec = false;
-
-        synchronized (futs) {
-            fut = futs.get(qryInfo.requestId());
-
-            if (fut == null) {
-                futs.put(qryInfo.requestId(), fut = new 
GridFutureAdapter<>(cctx.kernalContext()));
-
-                exec = true;
-            }
-        }
-
-        if (exec) {
-            try {
-                fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), 
false,
-                    qryInfo.query().subjectId(), taskName, 
recipient(qryInfo.senderId(), qryInfo.requestId())));
-            }
-            catch (Error e) {
-                fut.onDone(e);
-
-                throw e;
-            }
-            catch (Throwable e) {
-                fut.onDone(e);
-            }
-        }
-
-        return fut.get();
-    }
-
-    /**
-     * @param sndId Sender node ID.
-     * @param reqId Request ID.
-     */
-    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    protected void removeQueryResult(@Nullable UUID sndId, long reqId) {
-        if (sndId == null)
-            return;
-
-        Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs = 
qryIters.get(sndId);
-
-        if (futs != null) {
-            IgniteFuture<QueryResult<K, V>> fut;
-
-            synchronized (futs) {
-                fut = futs.remove(reqId);
-            }
-
-            if (fut != null) {
-                try {
-                    fut.get().closeIfNotShared(recipient(sndId, reqId));
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to close iterator.", e);
-                }
-            }
-        }
-    }
-
-    /**
-     * @param sndId Sender node ID.
-     * @param reqId Request ID.
-     * @return Recipient ID.
-     */
-    private static Object recipient(UUID sndId, long reqId) {
-        assert sndId != null;
-
-        return new IgniteBiTuple<>(sndId, reqId);
-    }
-
-    /**
-     * @param qryInfo Info.
-     * @return Iterator.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private FieldsResult fieldsQueryResult(GridCacheQueryInfo qryInfo, String 
taskName)
-        throws IgniteCheckedException {
-        final UUID sndId = qryInfo.senderId();
-
-        assert sndId != null;
-
-        Map<Long, GridFutureAdapter<FieldsResult>> iters = 
fieldsQryRes.get(sndId);
-
-        if (iters == null) {
-            iters = new LinkedHashMap<Long, 
GridFutureAdapter<FieldsResult>>(16, 0.75f, true) {
-                @Override protected boolean removeEldestEntry(Map.Entry<Long,
-                    GridFutureAdapter<FieldsResult>> e) {
-                    boolean rmv = size() > maxIterCnt;
-
-                    if (rmv) {
-                        try {
-                            
e.getValue().get().closeIfNotShared(recipient(sndId, e.getKey()));
-                        }
-                        catch (IgniteCheckedException ex) {
-                            U.error(log, "Failed to close fields query 
iterator.", ex);
-                        }
-                    }
-
-                    return rmv;
-                }
-
-                @Override public boolean equals(Object o) {
-                    return o == this;
-                }
-            };
-
-            Map<Long, GridFutureAdapter<FieldsResult>> old = 
fieldsQryRes.putIfAbsent(sndId, iters);
-
-            if (old != null)
-                iters = old;
-        }
-
-        return fieldsQueryResult(iters, qryInfo, taskName);
-    }
-
-    /**
-     * @param resMap Results map.
-     * @param qryInfo Info.
-     * @return Fields query result.
-     * @throws IgniteCheckedException In case of error.
-     */
-    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
-        "NonPrivateFieldAccessedInSynchronizedContext"})
-    private FieldsResult fieldsQueryResult(Map<Long, 
GridFutureAdapter<FieldsResult>> resMap,
-        GridCacheQueryInfo qryInfo, String taskName) throws 
IgniteCheckedException {
-        assert resMap != null;
-        assert qryInfo != null;
-
-        GridFutureAdapter<FieldsResult> fut;
-
-        boolean exec = false;
-
-        synchronized (resMap) {
-            fut = resMap.get(qryInfo.requestId());
-
-            if (fut == null) {
-                resMap.put(qryInfo.requestId(), fut =
-                    new GridFutureAdapter<>(cctx.kernalContext()));
-
-                exec = true;
-            }
-        }
-
-        if (exec) {
-            try {
-                fut.onDone(executeFieldsQuery(qryInfo.query(), 
qryInfo.arguments(), false,
-                    qryInfo.query().subjectId(), taskName, 
recipient(qryInfo.senderId(), qryInfo.requestId())));
-            }
-            catch (IgniteCheckedException e) {
-                fut.onDone(e);
-            }
-        }
-
-        return fut.get();
-    }
-
-    /**
-     * @param sndId Sender node ID.
-     * @param reqId Request ID.
-     */
-    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    protected void removeFieldsQueryResult(@Nullable UUID sndId, long reqId) {
-        if (sndId == null)
-            return;
-
-        Map<Long, GridFutureAdapter<FieldsResult>> futs = 
fieldsQryRes.get(sndId);
-
-        if (futs != null) {
-            IgniteFuture<FieldsResult> fut;
-
-            synchronized (futs) {
-                fut = futs.remove(reqId);
-            }
-
-            if (fut != null) {
-                assert fut.isDone();
-
-                try {
-                    fut.get().closeIfNotShared(recipient(sndId, reqId));
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to close iterator.", e);
-                }
-            }
-        }
-    }
-
-    /**
-     * Called when data for page is ready.
-     *
-     * @param loc Local query or not.
-     * @param qryInfo Query info.
-     * @param data Result data.
-     * @param finished Last page or not.
-     * @param e Exception in case of error.
-     * @return {@code true} if page was processed right.
-     */
-    protected abstract boolean onPageReady(boolean loc, GridCacheQueryInfo 
qryInfo,
-        @Nullable Collection<?> data, boolean finished, @Nullable Throwable e);
-
-    /**
-     * @param loc Local query or not.
-     * @param qryInfo Query info.
-     * @param metaData Meta data.
-     * @param entities Indexing entities.
-     * @param data Data.
-     * @param finished Last page or not.
-     * @param e Exception in case of error.
-     * @return {@code true} if page was processed right.
-     */
-    protected abstract boolean onFieldsPageReady(boolean loc, 
GridCacheQueryInfo qryInfo,
-        @Nullable List<GridQueryFieldMetadata> metaData,
-        @Nullable Collection<?> entities,
-        @Nullable Collection<?> data,
-        boolean finished, @Nullable Throwable e);
-
-    /**
-     * Gets cache queries metrics.
-     *
-     * @return Cache queries metrics.
-     */
-    public GridCacheQueryMetrics metrics() {
-        return metrics.copy();
-    }
-
-    /**
-     * Resets metrics.
-     */
-    public void resetMetrics() {
-        metrics = new GridCacheQueryMetricsAdapter();
-    }
-
-    /**
-     * @param duration Execution duration.
-     * @param fail {@code true} if execution failed.
-     */
-    public void onMetricsUpdate(long duration, boolean fail) {
-        metrics.onQueryExecute(duration, fail);
-    }
-
-    /**
-     * Gets SQL metadata.
-     *
-     * @return SQL metadata.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public Collection<GridCacheSqlMetadata> sqlMetadata() throws 
IgniteCheckedException {
-        if (!enterBusy())
-            throw new IllegalStateException("Failed to get metadata (grid is 
stopping).");
-
-        try {
-            Callable<Collection<CacheSqlMetadata>> job = new MetadataJob();
-
-            // Remote nodes that have current cache.
-            Collection<ClusterNode> nodes = 
F.view(cctx.discovery().remoteNodes(), new P1<ClusterNode>() {
-                @Override public boolean apply(ClusterNode n) {
-                    return U.hasCache(n, space);
-                }
-            });
-
-            Collection<Collection<CacheSqlMetadata>> res = new 
ArrayList<>(nodes.size() + 1);
-
-            IgniteFuture<Collection<Collection<CacheSqlMetadata>>> rmtFut = 
null;
-
-            // Get metadata from remote nodes.
-            if (!nodes.isEmpty())
-                rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST, 
F.asSet(job), nodes, true);
-
-            // Get local metadata.
-            IgniteFuture<Collection<CacheSqlMetadata>> locFut = 
cctx.closures().callLocalSafe(job, true);
-
-            if (rmtFut != null)
-                res.addAll(rmtFut.get());
-
-            res.add(locFut.get());
-
-            Map<String, Collection<CacheSqlMetadata>> map = new HashMap<>();
-
-            for (Collection<CacheSqlMetadata> col : res) {
-                for (CacheSqlMetadata meta : col) {
-                    String name = meta.cacheName();
-
-                    Collection<CacheSqlMetadata> cacheMetas = map.get(name);
-
-                    if (cacheMetas == null)
-                        map.put(name, cacheMetas = new LinkedList<>());
-
-                    cacheMetas.add(meta);
-                }
-            }
-
-            Collection<GridCacheSqlMetadata> col = new ArrayList<>(map.size());
-
-            // Metadata for current cache must be first in list.
-            col.add(new CacheSqlMetadata(map.remove(space)));
-
-            for (Collection<CacheSqlMetadata> metas : map.values())
-                col.add(new CacheSqlMetadata(metas));
-
-            return col;
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Gets projection filter for query.
-     *
-     * @param qry Query.
-     * @return Filter.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable private GridIndexingQueryFilter 
projectionFilter(GridCacheQueryAdapter<?> qry) {
-        assert qry != null;
-
-        final IgnitePredicate<GridCacheEntry<Object, Object>> prjFilter = 
qry.projectionFilter();
-
-        if (prjFilter == null || F.isAlwaysTrue(prjFilter))
-            return null;
-
-        return new GridIndexingQueryFilter() {
-            @Nullable @Override public IgniteBiPredicate<K, V> forSpace(String 
spaceName) {
-                if (!F.eq(space, spaceName))
-                    return null;
-
-                return new IgniteBiPredicate<K, V>() {
-                    @Override public boolean apply(K k, V v) {
-                        try {
-                            GridCacheEntry<K, V> entry = 
context().cache().entry(k);
-
-                            return entry != null && 
prjFilter.apply((GridCacheEntry<Object, Object>)entry);
-                        }
-                        catch (GridDhtInvalidPartitionException ignore) {
-                            return false;
-                        }
-                    }
-                };
-            }
-        };
-    }
-
-    /**
-     * @param <K> Key type.
-     * @param <V> Value type.
-     * @return Predicate.
-     * @param includeBackups Include backups.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable public <K, V> GridIndexingQueryFilter backupsFilter(boolean 
includeBackups) {
-        if (includeBackups)
-            return null;
-
-        return new GridIndexingQueryFilter() {
-            @Nullable @Override public IgniteBiPredicate<K, V> forSpace(final 
String spaceName) {
-                final GridKernalContext ctx = cctx.kernalContext();
-
-                final GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(spaceName);
-
-                if (cache.context().isReplicated() || 
cache.configuration().getBackups() == 0)
-                    return null;
-
-                return new IgniteBiPredicate<K, V>() {
-                    @Override public boolean apply(K k, V v) {
-                        return 
cache.context().affinity().primary(ctx.discovery().localNode(), k, -1);
-                    }
-                };
-            }
-        };
-    }
-
-    /**
-     * @param qry Query.
-     * @return Filter.
-     */
-    private GridIndexingQueryFilter filter(GridCacheQueryAdapter<?> qry) {
-        return and(backupsFilter(qry.includeBackups()), projectionFilter(qry));
-    }
-
-    /**
-     * @param f1 First filter.
-     * @param f2 Second filter.
-     * @return And filter of the given two.
-     */
-    @Nullable private static GridIndexingQueryFilter and(@Nullable final 
GridIndexingQueryFilter f1,
-        @Nullable final GridIndexingQueryFilter f2) {
-        if (f1 == null)
-            return f2;
-
-        if (f2 == null)
-            return f1;
-
-        return new GridIndexingQueryFilter() {
-            @Nullable @Override public <K, V> IgniteBiPredicate<K, V> 
forSpace(String spaceName) {
-                final IgniteBiPredicate<K, V> fltr1 = f1.forSpace(spaceName);
-                final IgniteBiPredicate<K, V> fltr2 = f2.forSpace(spaceName);
-
-                if (fltr1 == null)
-                    return fltr2;
-
-                if (fltr2 == null)
-                    return fltr1;
-
-                return new IgniteBiPredicate<K, V>() {
-                    @Override public boolean apply(K k, V v) {
-                        return fltr1.apply(k, v) && fltr2.apply(k, v);
-                    }
-                };
-            }
-        };
-    }
-
-    /**
-     * Prints memory statistics for debugging purposes.
-     */
-    @Override public void printMemoryStats() {
-        X.println(">>>");
-        X.println(">>> Query manager memory stats [grid=" + cctx.gridName() + 
", cache=" + cctx.name() + ']');
-    }
-
-    /**
-     * FOR TESTING ONLY
-     *
-     * @return Indexing space for this query manager.
-     */
-    public String space() {
-        return space;
-    }
-
-    /**
-     * Metadata job.
-     */
-    @GridInternal
-    private static class MetadataJob implements 
IgniteCallable<Collection<CacheSqlMetadata>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Grid */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** {@inheritDoc} */
-        @Override public Collection<CacheSqlMetadata> call() {
-            final GridKernalContext ctx = ((GridKernal) ignite).context();
-
-            Collection<String> cacheNames = 
F.viewReadOnly(ctx.cache().caches(),
-                new C1<GridCache<?, ?>, String>() {
-                    @Override public String apply(GridCache<?, ?> c) {
-                        return c.name();
-                    }
-                },
-                new P1<GridCache<?, ?>>() {
-                    @Override public boolean apply(GridCache<?, ?> c) {
-                        return !CU.UTILITY_CACHE_NAME.equals(c.name());
-                    }
-                }
-            );
-
-            return F.transform(cacheNames, new C1<String, CacheSqlMetadata>() {
-                @Override public CacheSqlMetadata apply(String cacheName) {
-                    Collection<GridQueryTypeDescriptor> types = 
ctx.query().types(cacheName);
-
-                    Collection<String> names = U.newHashSet(types.size());
-                    Map<String, String> keyClasses = 
U.newHashMap(types.size());
-                    Map<String, String> valClasses = 
U.newHashMap(types.size());
-                    Map<String, Map<String, String>> fields = 
U.newHashMap(types.size());
-                    Map<String, Collection<GridCacheSqlIndexMetadata>> indexes 
= U.newHashMap(types.size());
-
-                    for (GridQueryTypeDescriptor type : types) {
-                        // Filter internal types (e.g., data structures).
-                        if (type.name().startsWith("GridCache"))
-                            continue;
-
-                        names.add(type.name());
-
-                        keyClasses.put(type.name(), type.keyClass().getName());
-                        valClasses.put(type.name(), 
type.valueClass().getName());
-
-                        int size = 2 + type.keyFields().size() + 
type.valueFields().size();
-
-                        Map<String, String> fieldsMap = 
U.newLinkedHashMap(size);
-
-                        // _KEY and _VAL are not included in 
GridIndexingTypeDescriptor.valueFields
-                        fieldsMap.put("_KEY", type.keyClass().getName());
-                        fieldsMap.put("_VAL", type.valueClass().getName());
-
-                        for (Map.Entry<String, Class<?>> e : 
type.keyFields().entrySet())
-                            fieldsMap.put(e.getKey().toUpperCase(), 
e.getValue().getName());
-
-                        for (Map.Entry<String, Class<?>> e : 
type.valueFields().entrySet())
-                            fieldsMap.put(e.getKey().toUpperCase(), 
e.getValue().getName());
-
-                        fields.put(type.name(), fieldsMap);
-
-                        Collection<GridCacheSqlIndexMetadata> indexesCol =
-                            new ArrayList<>(type.indexes().size());
-
-                        for (Map.Entry<String, GridQueryIndexDescriptor> e : 
type.indexes().entrySet()) {
-                            GridQueryIndexDescriptor desc = e.getValue();
-
-                            // Add only SQL indexes.
-                            if (desc.type() == GridQueryIndexType.SORTED) {
-                                Collection<String> idxFields = 
e.getValue().fields();
-                                Collection<String> descendings = new 
LinkedList<>();
-
-                                for (String idxField : idxFields)
-                                    if (desc.descending(idxField))
-                                        descendings.add(idxField);
-
-                                indexesCol.add(new 
CacheSqlIndexMetadata(e.getKey().toUpperCase(),
-                                    idxFields, descendings, false));
-                            }
-                        }
-
-                        indexes.put(type.name(), indexesCol);
-                    }
-
-                    return new CacheSqlMetadata(cacheName, names, keyClasses, 
valClasses, fields, indexes);
-                }
-            });
-        }
-    }
-
-    /**
-     * Cache metadata.
-     */
-    private static class CacheSqlMetadata implements GridCacheSqlMetadata {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private String cacheName;
-
-        /** */
-        private Collection<String> types;
-
-        /** */
-        private Map<String, String> keyClasses;
-
-        /** */
-        private Map<String, String> valClasses;
-
-        /** */
-        private Map<String, Map<String, String>> fields;
-
-        /** */
-        private Map<String, Collection<GridCacheSqlIndexMetadata>> indexes;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public CacheSqlMetadata() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         * @param types Types.
-         * @param keyClasses Key classes map.
-         * @param valClasses Value classes map.
-         * @param fields Fields maps.
-         * @param indexes Indexes.
-         */
-        CacheSqlMetadata(@Nullable String cacheName, Collection<String> types, 
Map<String, String> keyClasses,
-            Map<String, String> valClasses, Map<String, Map<String, String>> 
fields,
-            Map<String, Collection<GridCacheSqlIndexMetadata>> indexes) {
-            assert types != null;
-            assert keyClasses != null;
-            assert valClasses != null;
-            assert fields != null;
-            assert indexes != null;
-
-            this.cacheName = cacheName;
-            this.types = types;
-            this.keyClasses = keyClasses;
-            this.valClasses = valClasses;
-            this.fields = fields;
-            this.indexes = indexes;
-        }
-
-        /**
-         * @param metas Meta data instances from different nodes.
-         */
-        CacheSqlMetadata(Iterable<CacheSqlMetadata> metas) {
-            types = new HashSet<>();
-            keyClasses = new HashMap<>();
-            valClasses = new HashMap<>();
-            fields = new HashMap<>();
-            indexes = new HashMap<>();
-
-            for (CacheSqlMetadata meta : metas) {
-                if (cacheName == null)
-                    cacheName = meta.cacheName;
-                else
-                    assert F.eq(cacheName, meta.cacheName);
-
-                types.addAll(meta.types);
-                keyClasses.putAll(meta.keyClasses);
-                valClasses.putAll(meta.valClasses);
-                fields.putAll(meta.fields);
-                indexes.putAll(meta.indexes);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String cacheName() {
-            return cacheName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<String> types() {
-            return types;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String keyClass(String type) {
-            return keyClasses.get(type);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String valueClass(String type) {
-            return valClasses.get(type);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<String, String> fields(String type) {
-            return fields.get(type);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<GridCacheSqlIndexMetadata> indexes(String 
type) {
-            return indexes.get(type);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, cacheName);
-            U.writeCollection(out, types);
-            U.writeMap(out, keyClasses);
-            U.writeMap(out, valClasses);
-            U.writeMap(out, fields);
-            U.writeMap(out, indexes);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cacheName = U.readString(in);
-            types = U.readCollection(in);
-            keyClasses = U.readMap(in);
-            valClasses = U.readMap(in);
-            fields = U.readMap(in);
-            indexes = U.readMap(in);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(CacheSqlMetadata.class, this);
-        }
-    }
-
-    /**
-     * Cache metadata index.
-     */
-    private static class CacheSqlIndexMetadata implements 
GridCacheSqlIndexMetadata {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private String name;
-
-        /** */
-        private Collection<String> fields;
-
-        /** */
-        private Collection<String> descendings;
-
-        /** */
-        private boolean unique;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public CacheSqlIndexMetadata() {
-            // No-op.
-        }
-
-        /**
-         * @param name Index name.
-         * @param fields Fields.
-         * @param descendings Descendings.
-         * @param unique Unique flag.
-         */
-        CacheSqlIndexMetadata(String name, Collection<String> fields, 
Collection<String> descendings,
-            boolean unique) {
-            assert name != null;
-            assert fields != null;
-            assert descendings != null;
-
-            this.name = name;
-            this.fields = fields;
-            this.descendings = descendings;
-            this.unique = unique;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return name;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<String> fields() {
-            return fields;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean descending(String field) {
-            return descendings.contains(field);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean unique() {
-            return unique;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, name);
-            U.writeCollection(out, fields);
-            U.writeCollection(out, descendings);
-            out.writeBoolean(unique);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            name = U.readString(in);
-            fields = U.readCollection(in);
-            descendings = U.readCollection(in);
-            unique = in.readBoolean();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(CacheSqlIndexMetadata.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class QueryResult<K, V> extends 
CachedResult<IgniteBiTuple<K, V>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final GridCacheQueryType type;
-
-        /**
-         * @param type Query type.
-         * @param recipient ID of the recipient.
-         */
-        private QueryResult(GridCacheQueryType type, Object recipient) {
-            super(recipient);
-
-            this.type = type;
-        }
-
-        /**
-         * @return Type.
-         */
-        public GridCacheQueryType type() {
-            return type;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class FieldsResult<Q> extends CachedResult<Q> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private List<GridQueryFieldMetadata> meta;
-
-        /**
-         * @param recipient ID of the recipient.
-         */
-        FieldsResult(Object recipient) {
-            super(recipient);
-        }
-
-        /**
-         * @return Metadata.
-         */
-        public List<GridQueryFieldMetadata> metaData() throws 
IgniteCheckedException {
-            get(); // Ensure that result is ready.
-
-            return meta;
-        }
-
-        /**
-         * @param meta Metadata.
-         */
-        public void metaData(List<GridQueryFieldMetadata> meta) {
-            this.meta = meta;
-        }
-    }
-
-    /**
-     *
-     */
-    private abstract class AbstractLazySwapEntry {
-        /** */
-        private K key;
-
-        /** */
-        private V val;
-
-        /**
-         * @return Key bytes.
-         */
-        protected abstract byte[] keyBytes();
-
-        /**
-         * @return Value.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected abstract V unmarshalValue() throws IgniteCheckedException;
-
-        /**
-         * @return Key.
-         */
-        K key() {
-            try {
-                if (key != null)
-                    return key;
-
-                key = cctx.marshaller().unmarshal(keyBytes(), 
cctx.deploy().globalLoader());
-
-                return key;
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /**
-         * @return Value.
-         */
-        V value() {
-            try {
-                if (val != null)
-                    return val;
-
-                val = unmarshalValue();
-
-                return val;
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /**
-         * @return TTL.
-         */
-        abstract long timeToLive();
-
-        /**
-         * @return Expire time.
-         */
-        abstract long expireTime();
-
-        /**
-         * @return Version.
-         */
-        abstract GridCacheVersion version();
-    }
-
-    /**
-     *
-     */
-    private class LazySwapEntry extends AbstractLazySwapEntry {
-        /** */
-        private final Map.Entry<byte[], byte[]> e;
-
-        /**
-         * @param e Entry with
-         */
-        LazySwapEntry(Map.Entry<byte[], byte[]> e) {
-            this.e = e;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected byte[] keyBytes() {
-            return e.getKey();
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("IfMayBeConditional")
-        @Override protected V unmarshalValue() throws IgniteCheckedException {
-            byte[] bytes = e.getValue();
-
-            byte[] val = GridCacheSwapEntryImpl.getValueIfByteArray(bytes);
-
-            if (val != null)
-                return (V)val;
-
-            if (cctx.offheapTiered() && cctx.portableEnabled())
-                return (V)cctx.portable().unmarshal(bytes, 
GridCacheSwapEntryImpl.valueOffset(bytes));
-            else {
-                GridByteArrayInputStream in = new 
GridByteArrayInputStream(bytes,
-                    GridCacheSwapEntryImpl.valueOffset(bytes),
-                    bytes.length);
-
-                return cctx.marshaller().unmarshal(in, 
cctx.deploy().globalLoader());
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override long timeToLive() {
-            return GridCacheSwapEntryImpl.timeToLive(e.getValue());
-        }
-
-        /** {@inheritDoc} */
-        @Override long expireTime() {
-            return GridCacheSwapEntryImpl.expireTime(e.getValue());
-        }
-
-        /** {@inheritDoc} */
-        @Override GridCacheVersion version() {
-            return GridCacheSwapEntryImpl.version(e.getValue());
-        }
-    }
-
-    /**
-     *
-     */
-    private class LazyOffheapEntry extends AbstractLazySwapEntry {
-        /** */
-        private final T2<Long, Integer> keyPtr;
-
-        /** */
-        private final T2<Long, Integer> valPtr;
-
-        /**
-         * @param keyPtr Key address.
-         * @param valPtr Value address.
-         */
-        private LazyOffheapEntry(T2<Long, Integer> keyPtr, T2<Long, Integer> 
valPtr) {
-            assert keyPtr != null;
-            assert valPtr != null;
-
-            this.keyPtr = keyPtr;
-            this.valPtr = valPtr;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected byte[] keyBytes() {
-            return U.copyMemory(keyPtr.get1(), keyPtr.get2());
-        }
-
-        /** {@inheritDoc} */
-        @Override protected V unmarshalValue() throws IgniteCheckedException {
-            long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), 
valPtr.get2());
-
-            V val = (V)cctx.portable().unmarshal(ptr, false);
-
-            assert val != null;
-
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override long timeToLive() {
-            return GridCacheOffheapSwapEntry.timeToLive(valPtr.get1());
-        }
-
-        /** {@inheritDoc} */
-        @Override long expireTime() {
-            return GridCacheOffheapSwapEntry.expireTime(valPtr.get1());
-        }
-
-        /** {@inheritDoc} */
-        @Override GridCacheVersion version() {
-            return GridCacheOffheapSwapEntry.version(valPtr.get1());
-        }
-    }
-
-    /**
-     *
-     */
-    private class OffheapIteratorClosure
-        extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> 
{
-        /** */
-        private static final long serialVersionUID = 7410163202728985912L;
-
-        /** */
-        private IgnitePredicate<GridCacheEntry<Object, Object>> prjPred;
-
-        /** */
-        private IgniteBiPredicate<K, V> filter;
-
-        /** */
-        private boolean keepPortable;
-
-        /**
-         * @param prjPred Projection filter.
-         * @param filter Filter.
-         * @param keepPortable Keep portable flag.
-         */
-        private OffheapIteratorClosure(
-            @Nullable IgnitePredicate<GridCacheEntry<Object, Object>> prjPred,
-            @Nullable IgniteBiPredicate<K, V> filter,
-            boolean keepPortable) {
-            assert prjPred != null || filter != null;
-
-            this.prjPred = prjPred;
-            this.filter = filter;
-            this.keepPortable = keepPortable;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, 
Integer> keyPtr,
-            T2<Long, Integer> valPtr)
-            throws IgniteCheckedException {
-            LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr);
-
-            if (prjPred != null) {
-                GridCacheEntry<K, V> cacheEntry = new 
GridCacheScanSwapEntry(e);
-
-                if (!prjPred.apply((GridCacheEntry<Object, Object>)cacheEntry))
-                    return null;
-            }
-
-            if (filter != null) {
-                K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable);
-                V val = (V)cctx.unwrapPortableIfNeeded(e.value(), 
keepPortable);
-
-                if (!filter.apply(key, val))
-                    return null;
-            }
-
-            return new IgniteBiTuple<>(e.key(), 
(V)cctx.unwrapTemporary(e.value())) ;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class CompoundIterator<T> extends GridIteratorAdapter<T> {
-        /** */
-        private static final long serialVersionUID = 4585888051556166304L;
-
-        /** */
-        private final List<GridIterator<T>> iters;
-
-        /** */
-        private int idx;
-
-        /** */
-        private GridIterator<T> iter;
-
-        /**
-         * @param iters Iterators.
-         */
-        private CompoundIterator(List<GridIterator<T>> iters) {
-            if (iters.isEmpty())
-                throw new IllegalArgumentException();
-
-            this.iters = iters;
-
-            iter = F.first(iters);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNextX() throws IgniteCheckedException {
-            if (iter.hasNextX())
-                return true;
-
-            idx++;
-
-            while(idx < iters.size()) {
-                iter = iters.get(idx);
-
-                if (iter.hasNextX())
-                    return true;
-
-                idx++;
-            }
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public T nextX() throws IgniteCheckedException {
-            if (!hasNextX())
-                throw new NoSuchElementException();
-
-            return iter.nextX();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void removeX() throws IgniteCheckedException {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    /**
-     *
-     */
-    private class GridCacheScanSwapEntry implements GridCacheEntry<K, V> {
-        /** */
-        private static final long serialVersionUID = 1262515168518736214L;
-
-        /** */
-        private final AbstractLazySwapEntry e;
-
-        /**
-         * @param e Entry.
-         */
-        private GridCacheScanSwapEntry(AbstractLazySwapEntry e) {
-            this.e = e;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public V peek() {
-            return e.value();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object version() {
-            return e.version();
-        }
-
-        /** {@inheritDoc} */
-        @Override public long expirationTime() {
-            return e.expireTime();
-        }
-
-        /** {@inheritDoc} */
-        @Override public long timeToLive() {
-            return e.timeToLive();
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public V getValue() {
-            return e.value();
-        }
-
-        /** {@inheritDoc} */
-        @Override public K getKey() {
-            return e.key();
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCacheProjection<K, V> projection() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public V peek(@Nullable 
Collection<GridCachePeekMode> modes) throws IgniteCheckedException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public V reload() {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<V> reloadAsync() {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isLocked() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isLockedByThread() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void timeToLive(long ttl) {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean primary() {
-            return cctx.affinity().
-                primary(cctx.localNode(), getKey(), 
cctx.affinity().affinityTopologyVersion());
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean backup() {
-            return cctx.affinity().
-                backu

<TRUNCATED>

Reply via email to