#ignite-743: Revert all.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d0b5d850
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d0b5d850
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d0b5d850

Branch: refs/heads/ignite-446
Commit: d0b5d8502ec7a07a041a31d50e15fc85e68ac679
Parents: ecf963e
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Wed Apr 15 17:11:04 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Wed Apr 15 17:11:04 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheGateway.java      |  128 +-
 .../processors/cache/IgniteCacheProxy.java      | 1217 ++++++++++++++----
 .../cache/IgniteCacheProxyLockFree.java         | 1153 -----------------
 .../datastreamer/DataStreamerImpl.java          |    9 +-
 .../datastreamer/DataStreamerUpdateJob.java     |    4 +-
 5 files changed, 1059 insertions(+), 1452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b5d850/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 4868b3f..97fada9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -69,8 +69,7 @@ public class GridCacheGateway<K, V> {
      * @return {@code true} if enter successful, {@code false} if the cache or 
the node was stopped.
      */
     public boolean enterIfNotClosed() {
-        if (ctx.deploymentEnabled())
-            ctx.deploy().onEnter();
+        enterIfNotClosedNoLock();
 
         // Must unlock in case of unexpected errors to avoid
         // deadlocks during kernal stop.
@@ -86,16 +85,35 @@ public class GridCacheGateway<K, V> {
     }
 
     /**
+     * Enter a cache call without lock.
+     *
+     * @return {@code true} if enter successful, {@code false} if the cache or 
the node was stopped.
+     */
+    public boolean enterIfNotClosedNoLock() {
+        if (ctx.deploymentEnabled())
+            ctx.deploy().onEnter();
+
+        return !stopped;
+    }
+
+    /**
+     * Leave a cache call entered by {@link #enter()} method.
+     */
+    public void leaveNoLock() {
+        ctx.tm().resetContext();
+        ctx.mvcc().contextReset();
+
+        // Unwind eviction notifications.
+        if (!ctx.shared().closed(ctx))
+            CU.unwindEvicts(ctx);
+    }
+
+    /**
      * Leave a cache call entered by {@link #enter()} method.
      */
     public void leave() {
         try {
-            ctx.tm().resetContext();
-            ctx.mvcc().contextReset();
-
-            // Unwind eviction notifications.
-            if (!ctx.shared().closed(ctx))
-                CU.unwindEvicts(ctx);
+           leaveNoLock();
         }
         finally {
             rwLock.readUnlock();
@@ -107,6 +125,45 @@ public class GridCacheGateway<K, V> {
      * @return Previous projection set on this thread.
      */
     @Nullable public GridCacheProjectionImpl<K, V> enter(@Nullable 
GridCacheProjectionImpl<K, V> prj) {
+        onEnter();
+
+        rwLock.readLock();
+
+        if (stopped) {
+            rwLock.readUnlock();
+
+            throw new IllegalStateException("Dynamic cache has been stopped: " 
+ ctx.name());
+        }
+
+        // Must unlock in case of unexpected errors to avoid
+        // deadlocks during kernal stop.
+        try {
+            return setProjectionPerCall(prj);
+        }
+        catch (RuntimeException e) {
+            rwLock.readUnlock();
+
+            throw e;
+        }
+    }
+
+    /**
+     * @param prj Projection to guard.
+     * @return Previous projection set on this thread.
+     */
+    @Nullable public GridCacheProjectionImpl<K, V> enterNoLock(@Nullable 
GridCacheProjectionImpl<K, V> prj) {
+        onEnter();
+
+        if (stopped)
+            throw new IllegalStateException("Dynamic cache has been stopped: " 
+ ctx.name());
+
+        return setProjectionPerCall(prj);
+    }
+
+    /**
+     * On enter.
+     */
+    private void onEnter() {
         try {
             ctx.itHolder().checkWeakQueue();
 
@@ -127,50 +184,47 @@ public class GridCacheGateway<K, V> {
 
         if (ctx.deploymentEnabled())
             ctx.deploy().onEnter();
+    }
 
-        rwLock.readLock();
+    /**
+     * Set thread local projection per call.
+     *
+     * @param prj Projection to guard.
+     * @return Previous projection set on this thread.
+     */
+    private GridCacheProjectionImpl<K, V> setProjectionPerCall(@Nullable 
GridCacheProjectionImpl<K, V> prj) {
+        GridCacheProjectionImpl<K, V> prev = ctx.projectionPerCall();
 
-        if (stopped) {
-            rwLock.readUnlock();
+        if (prev != null || prj != null)
+            ctx.projectionPerCall(prj);
 
-            throw new IllegalStateException("Dynamic cache has been stopped: " 
+ ctx.name());
-        }
+        return prev;
+    }
 
-        // Must unlock in case of unexpected errors to avoid
-        // deadlocks during kernal stop.
+    /**
+     * @param prev Previous.
+     */
+    public void leave(GridCacheProjectionImpl<K, V> prev) {
         try {
-            // Set thread local projection per call.
-            GridCacheProjectionImpl<K, V> prev = ctx.projectionPerCall();
-
-            if (prev != null || prj != null)
-                ctx.projectionPerCall(prj);
-
-            return prev;
+            leaveNoLock(prev);
         }
-        catch (RuntimeException e) {
+        finally {
             rwLock.readUnlock();
-
-            throw e;
         }
     }
 
     /**
      * @param prev Previous.
      */
-    public void leave(GridCacheProjectionImpl<K, V> prev) {
-        try {
-            ctx.tm().resetContext();
-            ctx.mvcc().contextReset();
+    public void leaveNoLock(GridCacheProjectionImpl<K, V> prev) {
+        ctx.tm().resetContext();
+        ctx.mvcc().contextReset();
 
-            // Unwind eviction notifications.
-            CU.unwindEvicts(ctx);
+        // Unwind eviction notifications.
+        CU.unwindEvicts(ctx);
 
-            // Return back previous thread local projection per call.
-            ctx.projectionPerCall(prev);
-        }
-        finally {
-            rwLock.readUnlock();
-        }
+        // Return back previous thread local projection per call.
+        ctx.projectionPerCall(prev);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b5d850/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 2f2d70c..d7ef8ba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -22,11 +22,18 @@ import org.apache.ignite.cache.CacheManager;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.mxbean.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
@@ -41,19 +48,42 @@ import java.util.concurrent.locks.*;
 /**
  * Cache proxy.
  */
-public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, 
Externalizable {
+public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, 
V>>
+    implements IgniteCache<K, V>, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final IgniteBiPredicate ACCEPT_ALL = new 
IgniteBiPredicate() {
+        @Override public boolean apply(Object k, Object v) {
+            return true;
+        }
+    };
+
+    /** Context. */
+    private GridCacheContext<K, V> ctx;
+
+    /** Gateway. */
+    private GridCacheGateway<K, V> gate;
+
     /** Delegate. */
     @GridToStringInclude
-    private IgniteCacheProxyLockFree<K, V> lockFreeCache;
+    private GridCacheProjectionEx<K, V> delegate;
 
     /** Projection. */
     private GridCacheProjectionImpl<K, V> prj;
 
-    /** Gateway. */
-    private GridCacheGateway<K, V> gate;
+    /** */
+    @GridToStringExclude
+    private GridCacheProxyImpl<K, V> legacyProxy;
+
+    /** */
+    @GridToStringExclude
+    private CacheManager cacheMgr;
+
+    /** */
+    @GridToStringExclude
+    private boolean lock;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -74,27 +104,49 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
         @Nullable GridCacheProjectionImpl<K, V> prj,
         boolean async
     ) {
+        this(ctx, delegate, prj, async, false);
+    }
+
+    /**
+     * @param ctx Context.
+     * @param delegate Delegate.
+     * @param prj Projection.
+     * @param async Async support flag.
+     */
+    public IgniteCacheProxy(
+        GridCacheContext<K, V> ctx,
+        GridCacheProjectionEx<K, V> delegate,
+        @Nullable GridCacheProjectionImpl<K, V> prj,
+        boolean async, boolean lock
+    ) {
+        super(async);
+
         assert ctx != null;
+        assert delegate != null;
+
+        this.ctx = ctx;
+        this.delegate = delegate;
+        this.prj = prj;
 
         gate = ctx.gate();
 
-        this.prj = prj;
+        legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, prj);
 
-        lockFreeCache = new IgniteCacheProxyLockFree<>(ctx, delegate, prj, 
async);
+        this.lock = lock;
     }
 
     /**
-     * @return Context.
+     * @return Ignite cache proxy with simple gate.
      */
-    public GridCacheContext<K, V> context() {
-        return lockFreeCache.context();
+    public IgniteCacheProxy<K, V> cacheNoGate() {
+        return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), true);
     }
 
     /**
-     * @return Lock free instance.
+     * @return Context.
      */
-    public IgniteCacheProxyLockFree<K, V> lockFree() {
-        return lockFreeCache;
+    public GridCacheContext<K, V> context() {
+        return ctx;
     }
 
     /**
@@ -106,84 +158,86 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
 
     /** {@inheritDoc} */
     @Override public CacheMetrics metrics() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.metrics();
+            return ctx.cache().metrics();
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public CacheMetrics metrics(ClusterGroup grp) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.metrics(grp);
+            List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
+
+            for (ClusterNode node : grp.nodes()) {
+                Map<Integer, CacheMetrics> nodeCacheMetrics = 
((TcpDiscoveryNode)node).cacheMetrics();
+
+                if (nodeCacheMetrics != null) {
+                    CacheMetrics e = nodeCacheMetrics.get(context().cacheId());
+
+                    if (e != null)
+                        metrics.add(e);
+                }
+            }
+
+            return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public CacheMetricsMXBean mxBean() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.mxBean();
+            return ctx.cache().mxBean();
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteCache<K, V> withAsync() {
-        if (isAsync())
-            return this;
-
-        return new IgniteCacheProxy<>(context(), delegate(), prj, true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isAsync() {
-        return lockFreeCache.isAsync();
-    }
+    @Override public <C extends Configuration<K, V>> C 
getConfiguration(Class<C> clazz) {
+        CacheConfiguration cfg = ctx.config();
 
-    /** {@inheritDoc} */
-    @Override public <R> IgniteFuture<R> future() {
-        return lockFreeCache.future();
-    }
+        if (!clazz.isAssignableFrom(cfg.getClass()))
+            throw new IllegalArgumentException();
 
-    /** {@inheritDoc} */
-    @Override public <C extends Configuration<K, V>> C 
getConfiguration(Class<C> clazz) {
-        return lockFreeCache.getConfiguration(clazz);
+        return clazz.cast(cfg);
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public Entry<K, V> randomEntry() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.randomEntry();
+            return ctx.cache().randomEntry();
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.withExpiryPolicy(plc);
+            GridCacheProjectionEx<K, V> prj0 = prj != null ? 
prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc);
+
+            return new IgniteCacheProxy<>(ctx, prj0, 
(GridCacheProjectionImpl<K, V>)prj0, isAsync());
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
@@ -194,173 +248,442 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
 
     /** {@inheritDoc} */
     @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, 
@Nullable Object... args) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            lockFreeCache.loadCache(p, args);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync())
+                    setFuture(ctx.cache().globalLoadCacheAsync(p, args));
+                else
+                    ctx.cache().globalLoadCache(p, args);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, 
@Nullable Object... args) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            lockFreeCache.localLoadCache(p, args);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync())
+                    setFuture(delegate.localLoadCacheAsync(p, args));
+                else
+                    delegate.localLoadCache(p, args);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws 
CacheException {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.getAndPutIfAbsent(key, val);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.getAndPutIfAbsentAsync(key, val));
+
+                    return null;
+                }
+                else
+                    return delegate.getAndPutIfAbsent(key, val);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public Lock lock(K key) throws CacheException {
-        return lockFreeCache.lock(key);
+        return lockAll(Collections.singleton(key));
     }
 
     /** {@inheritDoc} */
     @Override public Lock lockAll(final Collection<? extends K> keys) {
-        return lockFreeCache.lockAll(keys);
+        return new CacheLockImpl<>(gate, delegate, prj, keys);
     }
 
     /** {@inheritDoc} */
     @Override public boolean isLocalLocked(K key, boolean byCurrThread) {
-        return lockFreeCache.isLocalLocked(key, byCurrThread);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+        try {
+            return byCurrThread ? delegate.isLockedByThread(key) : 
delegate.isLocked(key);
+        }
+        finally {
+            onLeave(prev);
+        }
+    }
+
+    /**
+     * @param filter Filter.
+     * @param grp Optional cluster group.
+     * @return Cursor.
+     */
+    @SuppressWarnings("unchecked")
+    private QueryCursor<Entry<K,V>> query(Query filter, @Nullable ClusterGroup 
grp) {
+        final CacheQuery<Map.Entry<K,V>> qry;
+        final CacheQueryFuture<Map.Entry<K,V>> fut;
+
+        if (filter instanceof ScanQuery) {
+            IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
+
+            qry = delegate.queries().createScanQuery(p != null ? p : 
ACCEPT_ALL);
+
+            if (grp != null)
+                qry.projection(grp);
+
+            fut = qry.execute();
+        }
+        else if (filter instanceof TextQuery) {
+            TextQuery p = (TextQuery)filter;
+
+            qry = delegate.queries().createFullTextQuery(p.getType(), 
p.getText());
+
+            if (grp != null)
+                qry.projection(grp);
+
+            fut = qry.execute();
+        }
+        else if (filter instanceof SpiQuery) {
+            qry = ((GridCacheQueriesEx)delegate.queries()).createSpiQuery();
+
+            if (grp != null)
+                qry.projection(grp);
+
+            fut = qry.execute(((SpiQuery)filter).getArgs());
+        }
+        else {
+            if (filter instanceof SqlFieldsQuery)
+                throw new CacheException("Use methods 'queryFields' and 
'localQueryFields' for " +
+                    SqlFieldsQuery.class.getSimpleName() + ".");
+
+            throw new CacheException("Unsupported query type: " + filter);
+        }
+
+        return new QueryCursorImpl<>(new 
GridCloseableIteratorAdapter<Entry<K,V>>() {
+            /** */
+            private Map.Entry<K,V> cur;
+
+            @Override protected Entry<K,V> onNext() throws 
IgniteCheckedException {
+                if (!onHasNext())
+                    throw new NoSuchElementException();
+
+                Map.Entry<K,V> e = cur;
+
+                cur = null;
+
+                return new CacheEntryImpl<>(e.getKey(), e.getValue());
+            }
+
+            @Override protected boolean onHasNext() throws 
IgniteCheckedException {
+                return cur != null || (cur = fut.next()) != null;
+            }
+
+            @Override protected void onClose() throws IgniteCheckedException {
+                fut.cancel();
+            }
+        });
+    }
+
+    /**
+     * @param local Enforce local.
+     * @return Local node cluster group.
+     */
+    private ClusterGroup projection(boolean local) {
+        if (local || ctx.isLocal() || isReplicatedDataNode())
+            return ctx.kernalContext().grid().cluster().forLocal();
+
+        if (ctx.isReplicated())
+            return 
ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).forRandom();
+
+        return null;
+    }
+
+    /**
+     * Executes continuous query.
+     *
+     * @param qry Query.
+     * @param loc Local flag.
+     * @return Initial iteration cursor.
+     */
+    @SuppressWarnings("unchecked")
+    private QueryCursor<Entry<K, V>> queryContinuous(ContinuousQuery qry, 
boolean loc) {
+        if (qry.getInitialQuery() instanceof ContinuousQuery)
+            throw new IgniteException("Initial predicate for continuous query 
can't be an instance of another " +
+                "continuous query. Use SCAN or SQL query for initial 
iteration.");
+
+        if (qry.getLocalListener() == null)
+            throw new IgniteException("Mandatory local listener is not set for 
the query: " + qry);
+
+        try {
+            final UUID routineId = ctx.continuousQueries().executeQuery(
+                qry.getLocalListener(),
+                qry.getRemoteFilter(),
+                qry.getPageSize(),
+                qry.getTimeInterval(),
+                qry.isAutoUnsubscribe(),
+                loc ? ctx.grid().cluster().forLocal() : null);
+
+            final QueryCursor<Cache.Entry<K, V>> cur =
+                qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : 
null;
+
+            return new QueryCursor<Cache.Entry<K, V>>() {
+                @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                    return cur != null ? cur.iterator() : new 
GridEmptyIterator<Cache.Entry<K, V>>();
+                }
+
+                @Override public List<Cache.Entry<K, V>> getAll() {
+                    return cur != null ? cur.getAll() : 
Collections.<Cache.Entry<K, V>>emptyList();
+                }
+
+                @Override public void close() {
+                    if (cur != null)
+                        cur.close();
+
+                    try {
+                        
ctx.kernalContext().continuous().stopRoutine(routineId).get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw U.convertException(e);
+                    }
+                }
+            };
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <R> QueryCursor<R> query(Query<R> qry) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        A.notNull(qry, "qry");
+
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.query(qry);
+            validate(qry);
+
+            if (qry instanceof ContinuousQuery)
+                return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, 
V>)qry, qry.isLocal());
+
+            if (qry instanceof SqlQuery) {
+                SqlQuery p = (SqlQuery)qry;
+
+                if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
+                    return (QueryCursor<R>)new 
QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(ctx, p));
+
+                return 
(QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
+            }
+
+            if (qry instanceof SqlFieldsQuery) {
+                SqlFieldsQuery p = (SqlFieldsQuery)qry;
+
+                if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
+                    return 
(QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p);
+
+                return 
(QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
+            }
+
+            return (QueryCursor<R>)query(qry, projection(qry.isLocal()));
+        }
+        catch (Exception e) {
+            if (e instanceof CacheException)
+                throw e;
+
+            throw new CacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
+    /**
+     * @return {@code true} If this is a replicated cache and we are on a data 
node.
+     */
+    private boolean isReplicatedDataNode() {
+        return ctx.isReplicated() && ctx.affinityNode();
+    }
+
+    /**
+     * Checks query.
+     *
+     * @param qry Query
+     * @throws CacheException If query indexing disabled for sql query.
+     */
+    private void validate(Query qry) {
+        if (!GridQueryProcessor.isEnabled(ctx.config()) && !(qry instanceof 
ScanQuery) &&
+            !(qry instanceof ContinuousQuery))
+            throw new CacheException("Indexing is disabled for cache: " + 
ctx.cache().name());
+    }
+
     /** {@inheritDoc} */
     @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... 
peekModes) throws CacheException {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.localEntries(peekModes);
+            return delegate.localEntries(peekModes);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public QueryMetrics queryMetrics() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.queryMetrics();
+            return delegate.queries().metrics();
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void localEvict(Collection<? extends K> keys) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.localEvict(keys);
+            delegate.evictAll(keys);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.localPeek(key, peekModes);
+            return delegate.localPeek(key, peekModes, null);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void localPromote(Set<? extends K> keys) throws 
CacheException {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            lockFreeCache.localPromote(keys);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                delegate.promoteAll(keys);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public int size(CachePeekMode... peekModes) throws 
CacheException {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.size(peekModes);
+            if (isAsync()) {
+                setFuture(delegate.sizeAsync(peekModes));
+
+                return 0;
+            }
+            else
+                return delegate.size(peekModes);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public int localSize(CachePeekMode... peekModes) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.localSize(peekModes);
+            return delegate.localSize(peekModes);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public V get(K key) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.get(key);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.getAsync(key));
+
+                    return null;
+                }
+                else
+                    return delegate.get(key);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public Map<K, V> getAll(Set<? extends K> keys) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.getAll(keys);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.getAllAsync(keys));
+
+                    return null;
+                }
+                else
+                    return delegate.getAll(keys);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
@@ -369,13 +692,24 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
      * @return Values map.
      */
     public Map<K, V> getAll(Collection<? extends K> keys) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.getAll(keys);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.getAllAsync(keys));
+
+                    return null;
+                }
+                else
+                    return delegate.getAll(keys);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
@@ -386,37 +720,49 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
      * @return Entry set.
      */
     public Set<Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.entrySetx(filter);
+            return delegate.entrySetx(filter);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean containsKey(K key) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.containsKey(key);
+            if (isAsync()) {
+                setFuture(delegate.containsKeyAsync(key));
+
+                return false;
+            }
+            else
+                return delegate.containsKey(key);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean containsKeys(Set<? extends K> keys) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.containsKeys(keys);
+            if (isAsync()) {
+                setFuture(delegate.containsKeysAsync(keys));
+
+                return false;
+            }
+            else
+                return delegate.containsKeys(keys);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
@@ -426,243 +772,446 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
         boolean replaceExisting,
         @Nullable final CompletionListener completionLsnr
     ) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.loadAll(keys, replaceExisting, completionLsnr);
+            IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, 
replaceExisting);
+
+            if (completionLsnr != null) {
+                fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override
+                    public void apply(IgniteInternalFuture<?> fut) {
+                        try {
+                            fut.get();
+
+                            completionLsnr.onCompletion();
+                        }
+                        catch (IgniteCheckedException e) {
+                            completionLsnr.onException(cacheException(e));
+                        }
+                    }
+                });
+            }
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void put(K key, V val) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            lockFreeCache.put(key, val);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync())
+                    setFuture(delegate.putAsync(key, val));
+                else
+                    delegate.put(key, val);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public V getAndPut(K key, V val) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.getAndPut(key, val);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.getAndPutAsync(key, val));
+
+                    return null;
+                }
+                else
+                    return delegate.getAndPut(key, val);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void putAll(Map<? extends K, ? extends V> map) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            lockFreeCache.putAll(map);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync())
+                    setFuture(delegate.putAllAsync(map));
+                else
+                    delegate.putAll(map);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean putIfAbsent(K key, V val) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.putIfAbsent(key, val);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.putIfAbsentAsync(key, val));
+
+                    return false;
+                }
+                else
+                    return delegate.putIfAbsent(key, val);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean remove(K key) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.remove(key);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.removeAsync(key));
+
+                    return false;
+                }
+                else
+                    return delegate.remove(key);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean remove(K key, V oldVal) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.remove(key, oldVal);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.removeAsync(key, oldVal));
+
+                    return false;
+                }
+                else
+                    return delegate.remove(key, oldVal);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public V getAndRemove(K key) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.getAndRemove(key);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.getAndRemoveAsync(key));
+
+                    return null;
+                }
+                else
+                    return delegate.getAndRemove(key);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean replace(K key, V oldVal, V newVal) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.replace(key, oldVal, newVal);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.replaceAsync(key, oldVal, newVal));
+
+                    return false;
+                }
+                else
+                    return delegate.replace(key, oldVal, newVal);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean replace(K key, V val) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.replace(key, val);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.replaceAsync(key, val));
+
+                    return false;
+                }
+                else
+                    return delegate.replace(key, val);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public V getAndReplace(K key, V val) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.getAndReplace(key, val);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.getAndReplaceAsync(key, val));
+
+                    return null;
+                }
+                else
+                    return delegate.getAndReplace(key, val);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void removeAll(Set<? extends K> keys) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            lockFreeCache.removeAll(keys);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync())
+                    setFuture(delegate.removeAllAsync(keys));
+                else
+                    delegate.removeAll(keys);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void removeAll() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.removeAll();
+            if (isAsync())
+                setFuture(delegate.removeAllAsync());
+            else
+                delegate.removeAll();
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void clear(K key) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.clear(key);
+            if (isAsync())
+                setFuture(delegate.clearAsync(key));
+            else
+                delegate.clear(key);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void clearAll(Set<? extends K> keys) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.clearAll(keys);
+            if (isAsync())
+                setFuture(delegate.clearAsync(keys));
+            else
+                delegate.clearAll(keys);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void clear() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.clear();
+            if (isAsync())
+                setFuture(delegate.clearAsync());
+            else
+                delegate.clear();
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void localClear(K key) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.localClear(key);
+            delegate.clearLocally(key);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void localClearAll(Set<? extends K> keys) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.localClearAll(keys);
+            for (K key : keys)
+                delegate.clearLocally(key);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public <T> T invoke(K key, EntryProcessor<K, V, T> 
entryProcessor, Object... args)
         throws EntryProcessorException {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.invoke(key, entryProcessor, args);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    IgniteInternalFuture<EntryProcessorResult<T>> fut = 
delegate.invokeAsync(key, entryProcessor, args);
+
+                    IgniteInternalFuture<T> fut0 = fut.chain(new 
CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() {
+                        @Override public T 
applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut)
+                            throws IgniteCheckedException {
+                            EntryProcessorResult<T> res = fut.get();
+
+                            return res != null ? res.get() : null;
+                        }
+                    });
+
+                    setFuture(fut0);
+
+                    return null;
+                }
+                else {
+                    EntryProcessorResult<T> res = delegate.invoke(key, 
entryProcessor, args);
+
+                    return res != null ? res.get() : null;
+                }
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> 
entryProcessor, Object... args)
         throws EntryProcessorException {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.invoke(key, entryProcessor, args);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    IgniteInternalFuture<EntryProcessorResult<T>> fut = 
delegate.invokeAsync(key, entryProcessor, args);
+
+                    IgniteInternalFuture<T> fut0 = fut.chain(new 
CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() {
+                        @Override public T 
applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut)
+                            throws IgniteCheckedException {
+                            EntryProcessorResult<T> res = fut.get();
+
+                            return res != null ? res.get() : null;
+                        }
+                    });
+
+                    setFuture(fut0);
+
+                    return null;
+                }
+                else {
+                    EntryProcessorResult<T> res = delegate.invoke(key, 
entryProcessor, args);
+
+                    return res != null ? res.get() : null;
+                }
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
@@ -670,13 +1219,24 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
                                                                    
EntryProcessor<K, V, T> entryProcessor,
                                                                    Object... 
args) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.invokeAll(keys, entryProcessor, args);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.invokeAllAsync(keys, entryProcessor, 
args));
+
+                    return null;
+                }
+                else
+                    return delegate.invokeAll(keys, entryProcessor, args);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
@@ -684,13 +1244,24 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
         CacheEntryProcessor<K, V, T> entryProcessor,
         Object... args) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.invokeAll(keys, entryProcessor, args);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.invokeAllAsync(keys, entryProcessor, 
args));
+
+                    return null;
+                }
+                else
+                    return delegate.invokeAll(keys, entryProcessor, args);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
@@ -698,65 +1269,76 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
         Object... args) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
         try {
-            return lockFreeCache.invokeAll(map, args);
+            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.invokeAllAsync(map, args));
+
+                    return null;
+                }
+                else
+                    return delegate.invokeAll(map, args);
+            }
+            finally {
+                onLeave(prev);
+            }
         }
-        finally {
-            gate.leave(prev);
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public String getName() {
-        return lockFreeCache.getName();
+        return delegate.name();
     }
 
     /** {@inheritDoc} */
     @Override public CacheManager getCacheManager() {
-        return lockFreeCache.getCacheManager();
+        return cacheMgr;
     }
 
     /**
      * @param cacheMgr Cache manager.
      */
     public void setCacheManager(CacheManager cacheMgr) {
-        lockFreeCache.setCacheManager(cacheMgr);
+        this.cacheMgr = cacheMgr;
     }
 
     /** {@inheritDoc} */
     @Override public void close() {
-        if (!gate.enterIfNotClosed())
+        if (!onEnterIfNoClose())
             return;
 
         IgniteInternalFuture<?> fut;
 
         try {
-            fut = 
context().kernalContext().cache().dynamicStopCache(context().name());
+            fut = ctx.kernalContext().cache().dynamicStopCache(ctx.name());
         }
         finally {
-            gate.leave();
+            onLeave();
         }
 
         try {
             fut.get();
         }
         catch (IgniteCheckedException e) {
-            throw CU.convertToCacheException(e);
+            throw cacheException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override public boolean isClosed() {
-        if (!gate.enterIfNotClosed())
+        if (!onEnterIfNoClose())
             return true;
 
         try {
-            return 
context().kernalContext().cache().context().closed(context());
+            return ctx.kernalContext().cache().context().closed(ctx);
         }
         finally {
-            gate.leave();
+            onLeave();
         }
     }
 
@@ -764,51 +1346,69 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
      *
      */
     public GridCacheProjectionEx delegate() {
-        return lockFreeCache.delegate();
+        return delegate;
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <T> T unwrap(Class<T> clazz) {
-        return lockFreeCache.unwrap(clazz);
+        if (clazz.isAssignableFrom(getClass()))
+            return (T)this;
+        else if (clazz.isAssignableFrom(IgniteEx.class))
+            return (T)ctx.grid();
+        else if (clazz.isAssignableFrom(legacyProxy.getClass()))
+            return (T)legacyProxy;
+
+        throw new IllegalArgumentException("Unwrapping to class is not 
supported: " + clazz);
     }
 
     /** {@inheritDoc} */
     @Override public void 
registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.registerCacheEntryListener(lsnrCfg);
+            ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            lockFreeCache.deregisterCacheEntryListener(lsnrCfg);
+            ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /** {@inheritDoc} */
     @Override public Iterator<Cache.Entry<K, V>> iterator() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
-            return lockFreeCache.iterator();
+            return ctx.cache().igniteIterator();
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<K, V> createAsyncInstance() {
+        return new IgniteCacheProxy<>(ctx, delegate, prj, true);
+    }
+
     /**
      * Creates projection that will operate with portable objects. <p> 
Projection returned by this method will force
      * cache not to deserialize portable objects, so keys and values will be 
returned from cache API methods without
@@ -833,24 +1433,24 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
      * @return Projection for portable objects.
      */
     public <K1, V1> IgniteCache<K1, V1> keepPortable() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
             GridCacheProjectionImpl<K1, V1> prj0 = new 
GridCacheProjectionImpl<>(
-                (CacheProjection<K1, V1>)(prj != null ? prj : 
lockFreeCache.delegate()),
-                (GridCacheContext<K1, V1>)context(),
-                prj != null && prj.skipStore(),
+                (CacheProjection<K1, V1>)(prj != null ? prj : delegate),
+                (GridCacheContext<K1, V1>)ctx,
+                prj != null ? prj.skipStore() : false,
                 prj != null ? prj.subjectId() : null,
                 true,
                 prj != null ? prj.expiry() : null);
 
-            return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)context(),
+            return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
                 prj0,
                 prj0,
                 isAsync());
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
@@ -858,7 +1458,7 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
      * @return Cache with skip store enabled.
      */
     public IgniteCache<K, V> skipStore() {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
 
         try {
             boolean skip = prj != null && prj.skipStore();
@@ -867,53 +1467,166 @@ public class IgniteCacheProxy<K, V> implements 
IgniteCache<K, V>, Externalizable
                 return this;
 
             GridCacheProjectionImpl<K, V> prj0 = new GridCacheProjectionImpl<>(
-                (prj != null ? prj : lockFreeCache.delegate()),
-                context(),
+                (prj != null ? prj : delegate),
+                ctx,
                 true,
                 prj != null ? prj.subjectId() : null,
                 prj != null && prj.isKeepPortable(),
                 prj != null ? prj.expiry() : null);
 
-            return new IgniteCacheProxy<>(context(),
+            return new IgniteCacheProxy<>(ctx,
                 prj0,
                 prj0,
                 isAsync());
         }
         finally {
-            gate.leave(prev);
+            onLeave(prev);
         }
     }
 
     /**
+     * @param e Checked exception.
+     * @return Cache exception.
+     */
+    private RuntimeException cacheException(IgniteCheckedException e) {
+        return CU.convertToCacheException(e);
+    }
+
+    /**
+     * @param fut Future for async operation.
+     */
+    private <R> void setFuture(IgniteInternalFuture<R> fut) {
+        curFut.set(new IgniteFutureImpl<>(fut));
+    }
+
+    /**
      * @return Legacy proxy.
      */
     @NotNull
     public GridCacheProxyImpl<K, V> legacyProxy() {
-        return lockFreeCache.legacyProxy();
+        return legacyProxy;
     }
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(lockFreeCache);
+        out.writeObject(ctx);
+
+        out.writeObject(delegate);
+
+        out.writeObject(prj);
+
+        out.writeBoolean(lock);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
     @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        lockFreeCache = (IgniteCacheProxyLockFree<K, V>)in.readObject();
+        ctx = (GridCacheContext<K, V>)in.readObject();
 
-        prj = lockFreeCache.projection();
+        delegate = (GridCacheProjectionEx<K, V>)in.readObject();
 
-        gate = lockFreeCache.context().gate();
+        prj = (GridCacheProjectionImpl<K, V>)in.readObject();
+
+        gate = ctx.gate();
+
+        lock = in.readBoolean();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> rebalance() {
-        return lockFreeCache.rebalance();
+        ctx.preloader().forcePreload();
+
+        return new IgniteFutureImpl<>(ctx.preloader().syncFuture());
+    }
+
+    /**
+     * @param prj Projection to guard.
+     * @return Previous projection set on this thread.
+     */
+    private GridCacheProjectionImpl<K, V> onEnter(GridCacheProjectionImpl<K, 
V> prj) {
+        if (lock)
+            return gate.enter(prj);
+        else
+            return gate.enterNoLock(prj);
+    }
+
+    /**
+     * On enter.
+     */
+    private boolean onEnterIfNoClose() {
+        if (lock)
+            return gate.enterIfNotClosed();
+        else
+            return gate.enterIfNotClosedNoLock();
+    }
+
+    /**
+     * @param prj Projection to guard..
+     */
+    private void onLeave(GridCacheProjectionImpl<K, V> prj) {
+        if (lock)
+            gate.leave(prj);
+        else
+            gate.leaveNoLock(prj);
+    }
+
+    /**
+     * On leave.
+     */
+    private void onLeave() {
+        if (lock)
+            gate.leave();
+        else
+            gate.leaveNoLock();
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteCacheProxy.class, this);
     }
+
+    /**
+     * Closeable iterator.
+     */
+    private abstract static class ClIter<X, Y> extends 
GridCloseableIteratorAdapter<Y> {
+        /** */
+        private X cur;
+
+        /** */
+        private CacheQueryFuture<X> fut;
+
+        /**
+         * @param fut Future.
+         */
+        protected ClIter(CacheQueryFuture<X> fut) {
+            this.fut = fut;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Y onNext() throws IgniteCheckedException {
+            if (!onHasNext())
+                throw new NoSuchElementException();
+
+            X e = cur;
+
+            cur = null;
+
+            return convert(e);
+        }
+
+        /**
+         * @param x X.
+         */
+        protected abstract Y convert(X x);
+
+        /** {@inheritDoc} */
+        @Override protected boolean onHasNext() throws IgniteCheckedException {
+            return cur != null || (cur = fut.next()) != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onClose() throws IgniteCheckedException {
+            fut.cancel();
+        }
+    }
 }

Reply via email to