#ignite-743: Revert all.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d0b5d850 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d0b5d850 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d0b5d850 Branch: refs/heads/ignite-737 Commit: d0b5d8502ec7a07a041a31d50e15fc85e68ac679 Parents: ecf963e Author: ivasilinets <ivasilin...@gridgain.com> Authored: Wed Apr 15 17:11:04 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Wed Apr 15 17:11:04 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheGateway.java | 128 +- .../processors/cache/IgniteCacheProxy.java | 1217 ++++++++++++++---- .../cache/IgniteCacheProxyLockFree.java | 1153 ----------------- .../datastreamer/DataStreamerImpl.java | 9 +- .../datastreamer/DataStreamerUpdateJob.java | 4 +- 5 files changed, 1059 insertions(+), 1452 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b5d850/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 4868b3f..97fada9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -69,8 +69,7 @@ public class GridCacheGateway<K, V> { * @return {@code true} if enter successful, {@code false} if the cache or the node was stopped. */ public boolean enterIfNotClosed() { - if (ctx.deploymentEnabled()) - ctx.deploy().onEnter(); + enterIfNotClosedNoLock(); // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. @@ -86,16 +85,35 @@ public class GridCacheGateway<K, V> { } /** + * Enter a cache call without lock. + * + * @return {@code true} if enter successful, {@code false} if the cache or the node was stopped. + */ + public boolean enterIfNotClosedNoLock() { + if (ctx.deploymentEnabled()) + ctx.deploy().onEnter(); + + return !stopped; + } + + /** + * Leave a cache call entered by {@link #enter()} method. + */ + public void leaveNoLock() { + ctx.tm().resetContext(); + ctx.mvcc().contextReset(); + + // Unwind eviction notifications. + if (!ctx.shared().closed(ctx)) + CU.unwindEvicts(ctx); + } + + /** * Leave a cache call entered by {@link #enter()} method. */ public void leave() { try { - ctx.tm().resetContext(); - ctx.mvcc().contextReset(); - - // Unwind eviction notifications. - if (!ctx.shared().closed(ctx)) - CU.unwindEvicts(ctx); + leaveNoLock(); } finally { rwLock.readUnlock(); @@ -107,6 +125,45 @@ public class GridCacheGateway<K, V> { * @return Previous projection set on this thread. */ @Nullable public GridCacheProjectionImpl<K, V> enter(@Nullable GridCacheProjectionImpl<K, V> prj) { + onEnter(); + + rwLock.readLock(); + + if (stopped) { + rwLock.readUnlock(); + + throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); + } + + // Must unlock in case of unexpected errors to avoid + // deadlocks during kernal stop. + try { + return setProjectionPerCall(prj); + } + catch (RuntimeException e) { + rwLock.readUnlock(); + + throw e; + } + } + + /** + * @param prj Projection to guard. + * @return Previous projection set on this thread. + */ + @Nullable public GridCacheProjectionImpl<K, V> enterNoLock(@Nullable GridCacheProjectionImpl<K, V> prj) { + onEnter(); + + if (stopped) + throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); + + return setProjectionPerCall(prj); + } + + /** + * On enter. + */ + private void onEnter() { try { ctx.itHolder().checkWeakQueue(); @@ -127,50 +184,47 @@ public class GridCacheGateway<K, V> { if (ctx.deploymentEnabled()) ctx.deploy().onEnter(); + } - rwLock.readLock(); + /** + * Set thread local projection per call. + * + * @param prj Projection to guard. + * @return Previous projection set on this thread. + */ + private GridCacheProjectionImpl<K, V> setProjectionPerCall(@Nullable GridCacheProjectionImpl<K, V> prj) { + GridCacheProjectionImpl<K, V> prev = ctx.projectionPerCall(); - if (stopped) { - rwLock.readUnlock(); + if (prev != null || prj != null) + ctx.projectionPerCall(prj); - throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); - } + return prev; + } - // Must unlock in case of unexpected errors to avoid - // deadlocks during kernal stop. + /** + * @param prev Previous. + */ + public void leave(GridCacheProjectionImpl<K, V> prev) { try { - // Set thread local projection per call. - GridCacheProjectionImpl<K, V> prev = ctx.projectionPerCall(); - - if (prev != null || prj != null) - ctx.projectionPerCall(prj); - - return prev; + leaveNoLock(prev); } - catch (RuntimeException e) { + finally { rwLock.readUnlock(); - - throw e; } } /** * @param prev Previous. */ - public void leave(GridCacheProjectionImpl<K, V> prev) { - try { - ctx.tm().resetContext(); - ctx.mvcc().contextReset(); + public void leaveNoLock(GridCacheProjectionImpl<K, V> prev) { + ctx.tm().resetContext(); + ctx.mvcc().contextReset(); - // Unwind eviction notifications. - CU.unwindEvicts(ctx); + // Unwind eviction notifications. + CU.unwindEvicts(ctx); - // Return back previous thread local projection per call. - ctx.projectionPerCall(prev); - } - finally { - rwLock.readUnlock(); - } + // Return back previous thread local projection per call. + ctx.projectionPerCall(prev); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b5d850/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 2f2d70c..d7ef8ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -22,11 +22,18 @@ import org.apache.ignite.cache.CacheManager; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -41,19 +48,42 @@ import java.util.concurrent.locks.*; /** * Cache proxy. */ -public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable { +public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V>> + implements IgniteCache<K, V>, Externalizable { /** */ private static final long serialVersionUID = 0L; + /** */ + private static final IgniteBiPredicate ACCEPT_ALL = new IgniteBiPredicate() { + @Override public boolean apply(Object k, Object v) { + return true; + } + }; + + /** Context. */ + private GridCacheContext<K, V> ctx; + + /** Gateway. */ + private GridCacheGateway<K, V> gate; + /** Delegate. */ @GridToStringInclude - private IgniteCacheProxyLockFree<K, V> lockFreeCache; + private GridCacheProjectionEx<K, V> delegate; /** Projection. */ private GridCacheProjectionImpl<K, V> prj; - /** Gateway. */ - private GridCacheGateway<K, V> gate; + /** */ + @GridToStringExclude + private GridCacheProxyImpl<K, V> legacyProxy; + + /** */ + @GridToStringExclude + private CacheManager cacheMgr; + + /** */ + @GridToStringExclude + private boolean lock; /** * Empty constructor required for {@link Externalizable}. @@ -74,27 +104,49 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable @Nullable GridCacheProjectionImpl<K, V> prj, boolean async ) { + this(ctx, delegate, prj, async, false); + } + + /** + * @param ctx Context. + * @param delegate Delegate. + * @param prj Projection. + * @param async Async support flag. + */ + public IgniteCacheProxy( + GridCacheContext<K, V> ctx, + GridCacheProjectionEx<K, V> delegate, + @Nullable GridCacheProjectionImpl<K, V> prj, + boolean async, boolean lock + ) { + super(async); + assert ctx != null; + assert delegate != null; + + this.ctx = ctx; + this.delegate = delegate; + this.prj = prj; gate = ctx.gate(); - this.prj = prj; + legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, prj); - lockFreeCache = new IgniteCacheProxyLockFree<>(ctx, delegate, prj, async); + this.lock = lock; } /** - * @return Context. + * @return Ignite cache proxy with simple gate. */ - public GridCacheContext<K, V> context() { - return lockFreeCache.context(); + public IgniteCacheProxy<K, V> cacheNoGate() { + return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), true); } /** - * @return Lock free instance. + * @return Context. */ - public IgniteCacheProxyLockFree<K, V> lockFree() { - return lockFreeCache; + public GridCacheContext<K, V> context() { + return ctx; } /** @@ -106,84 +158,86 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public CacheMetrics metrics() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.metrics(); + return ctx.cache().metrics(); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public CacheMetrics metrics(ClusterGroup grp) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.metrics(grp); + List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); + + for (ClusterNode node : grp.nodes()) { + Map<Integer, CacheMetrics> nodeCacheMetrics = ((TcpDiscoveryNode)node).cacheMetrics(); + + if (nodeCacheMetrics != null) { + CacheMetrics e = nodeCacheMetrics.get(context().cacheId()); + + if (e != null) + metrics.add(e); + } + } + + return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public CacheMetricsMXBean mxBean() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.mxBean(); + return ctx.cache().mxBean(); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ - @Override public IgniteCache<K, V> withAsync() { - if (isAsync()) - return this; - - return new IgniteCacheProxy<>(context(), delegate(), prj, true); - } - - /** {@inheritDoc} */ - @Override public boolean isAsync() { - return lockFreeCache.isAsync(); - } + @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { + CacheConfiguration cfg = ctx.config(); - /** {@inheritDoc} */ - @Override public <R> IgniteFuture<R> future() { - return lockFreeCache.future(); - } + if (!clazz.isAssignableFrom(cfg.getClass())) + throw new IllegalArgumentException(); - /** {@inheritDoc} */ - @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { - return lockFreeCache.getConfiguration(clazz); + return clazz.cast(cfg); } /** {@inheritDoc} */ @Nullable @Override public Entry<K, V> randomEntry() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.randomEntry(); + return ctx.cache().randomEntry(); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.withExpiryPolicy(plc); + GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc); + + return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0, isAsync()); } finally { - gate.leave(prev); + onLeave(prev); } } @@ -194,173 +248,442 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - lockFreeCache.loadCache(p, args); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) + setFuture(ctx.cache().globalLoadCacheAsync(p, args)); + else + ctx.cache().globalLoadCache(p, args); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - lockFreeCache.localLoadCache(p, args); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) + setFuture(delegate.localLoadCacheAsync(p, args)); + else + delegate.localLoadCache(p, args); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.getAndPutIfAbsent(key, val); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.getAndPutIfAbsentAsync(key, val)); + + return null; + } + else + return delegate.getAndPutIfAbsent(key, val); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public Lock lock(K key) throws CacheException { - return lockFreeCache.lock(key); + return lockAll(Collections.singleton(key)); } /** {@inheritDoc} */ @Override public Lock lockAll(final Collection<? extends K> keys) { - return lockFreeCache.lockAll(keys); + return new CacheLockImpl<>(gate, delegate, prj, keys); } /** {@inheritDoc} */ @Override public boolean isLocalLocked(K key, boolean byCurrThread) { - return lockFreeCache.isLocalLocked(key, byCurrThread); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key); + } + finally { + onLeave(prev); + } + } + + /** + * @param filter Filter. + * @param grp Optional cluster group. + * @return Cursor. + */ + @SuppressWarnings("unchecked") + private QueryCursor<Entry<K,V>> query(Query filter, @Nullable ClusterGroup grp) { + final CacheQuery<Map.Entry<K,V>> qry; + final CacheQueryFuture<Map.Entry<K,V>> fut; + + if (filter instanceof ScanQuery) { + IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); + + qry = delegate.queries().createScanQuery(p != null ? p : ACCEPT_ALL); + + if (grp != null) + qry.projection(grp); + + fut = qry.execute(); + } + else if (filter instanceof TextQuery) { + TextQuery p = (TextQuery)filter; + + qry = delegate.queries().createFullTextQuery(p.getType(), p.getText()); + + if (grp != null) + qry.projection(grp); + + fut = qry.execute(); + } + else if (filter instanceof SpiQuery) { + qry = ((GridCacheQueriesEx)delegate.queries()).createSpiQuery(); + + if (grp != null) + qry.projection(grp); + + fut = qry.execute(((SpiQuery)filter).getArgs()); + } + else { + if (filter instanceof SqlFieldsQuery) + throw new CacheException("Use methods 'queryFields' and 'localQueryFields' for " + + SqlFieldsQuery.class.getSimpleName() + "."); + + throw new CacheException("Unsupported query type: " + filter); + } + + return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,V>>() { + /** */ + private Map.Entry<K,V> cur; + + @Override protected Entry<K,V> onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); + + Map.Entry<K,V> e = cur; + + cur = null; + + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return cur != null || (cur = fut.next()) != null; + } + + @Override protected void onClose() throws IgniteCheckedException { + fut.cancel(); + } + }); + } + + /** + * @param local Enforce local. + * @return Local node cluster group. + */ + private ClusterGroup projection(boolean local) { + if (local || ctx.isLocal() || isReplicatedDataNode()) + return ctx.kernalContext().grid().cluster().forLocal(); + + if (ctx.isReplicated()) + return ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).forRandom(); + + return null; + } + + /** + * Executes continuous query. + * + * @param qry Query. + * @param loc Local flag. + * @return Initial iteration cursor. + */ + @SuppressWarnings("unchecked") + private QueryCursor<Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc) { + if (qry.getInitialQuery() instanceof ContinuousQuery) + throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + + "continuous query. Use SCAN or SQL query for initial iteration."); + + if (qry.getLocalListener() == null) + throw new IgniteException("Mandatory local listener is not set for the query: " + qry); + + try { + final UUID routineId = ctx.continuousQueries().executeQuery( + qry.getLocalListener(), + qry.getRemoteFilter(), + qry.getPageSize(), + qry.getTimeInterval(), + qry.isAutoUnsubscribe(), + loc ? ctx.grid().cluster().forLocal() : null); + + final QueryCursor<Cache.Entry<K, V>> cur = + qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null; + + return new QueryCursor<Cache.Entry<K, V>>() { + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return cur != null ? cur.iterator() : new GridEmptyIterator<Cache.Entry<K, V>>(); + } + + @Override public List<Cache.Entry<K, V>> getAll() { + return cur != null ? cur.getAll() : Collections.<Cache.Entry<K, V>>emptyList(); + } + + @Override public void close() { + if (cur != null) + cur.close(); + + try { + ctx.kernalContext().continuous().stopRoutine(routineId).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + }; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <R> QueryCursor<R> query(Query<R> qry) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + A.notNull(qry, "qry"); + + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.query(qry); + validate(qry); + + if (qry instanceof ContinuousQuery) + return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal()); + + if (qry instanceof SqlQuery) { + SqlQuery p = (SqlQuery)qry; + + if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) + return (QueryCursor<R>)new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(ctx, p)); + + return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); + } + + if (qry instanceof SqlFieldsQuery) { + SqlFieldsQuery p = (SqlFieldsQuery)qry; + + if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) + return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p); + + return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); + } + + return (QueryCursor<R>)query(qry, projection(qry.isLocal())); + } + catch (Exception e) { + if (e instanceof CacheException) + throw e; + + throw new CacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } + /** + * @return {@code true} If this is a replicated cache and we are on a data node. + */ + private boolean isReplicatedDataNode() { + return ctx.isReplicated() && ctx.affinityNode(); + } + + /** + * Checks query. + * + * @param qry Query + * @throws CacheException If query indexing disabled for sql query. + */ + private void validate(Query qry) { + if (!GridQueryProcessor.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) && + !(qry instanceof ContinuousQuery)) + throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name()); + } + /** {@inheritDoc} */ @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.localEntries(peekModes); + return delegate.localEntries(peekModes); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public QueryMetrics queryMetrics() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.queryMetrics(); + return delegate.queries().metrics(); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public void localEvict(Collection<? extends K> keys) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.localEvict(keys); + delegate.evictAll(keys); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.localPeek(key, peekModes); + return delegate.localPeek(key, peekModes, null); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public void localPromote(Set<? extends K> keys) throws CacheException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - lockFreeCache.localPromote(keys); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + delegate.promoteAll(keys); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public int size(CachePeekMode... peekModes) throws CacheException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.size(peekModes); + if (isAsync()) { + setFuture(delegate.sizeAsync(peekModes)); + + return 0; + } + else + return delegate.size(peekModes); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.localSize(peekModes); + return delegate.localSize(peekModes); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public V get(K key) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.get(key); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.getAsync(key)); + + return null; + } + else + return delegate.get(key); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.getAll(keys); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.getAllAsync(keys)); + + return null; + } + else + return delegate.getAll(keys); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } @@ -369,13 +692,24 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable * @return Values map. */ public Map<K, V> getAll(Collection<? extends K> keys) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.getAll(keys); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.getAllAsync(keys)); + + return null; + } + else + return delegate.getAll(keys); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } @@ -386,37 +720,49 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable * @return Entry set. */ public Set<Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.entrySetx(filter); + return delegate.entrySetx(filter); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public boolean containsKey(K key) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.containsKey(key); + if (isAsync()) { + setFuture(delegate.containsKeyAsync(key)); + + return false; + } + else + return delegate.containsKey(key); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public boolean containsKeys(Set<? extends K> keys) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.containsKeys(keys); + if (isAsync()) { + setFuture(delegate.containsKeysAsync(keys)); + + return false; + } + else + return delegate.containsKeys(keys); } finally { - gate.leave(prev); + onLeave(prev); } } @@ -426,243 +772,446 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable boolean replaceExisting, @Nullable final CompletionListener completionLsnr ) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.loadAll(keys, replaceExisting, completionLsnr); + IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting); + + if (completionLsnr != null) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override + public void apply(IgniteInternalFuture<?> fut) { + try { + fut.get(); + + completionLsnr.onCompletion(); + } + catch (IgniteCheckedException e) { + completionLsnr.onException(cacheException(e)); + } + } + }); + } } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public void put(K key, V val) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - lockFreeCache.put(key, val); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) + setFuture(delegate.putAsync(key, val)); + else + delegate.put(key, val); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.getAndPut(key, val); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.getAndPutAsync(key, val)); + + return null; + } + else + return delegate.getAndPut(key, val); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - lockFreeCache.putAll(map); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) + setFuture(delegate.putAllAsync(map)); + else + delegate.putAll(map); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.putIfAbsent(key, val); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.putIfAbsentAsync(key, val)); + + return false; + } + else + return delegate.putIfAbsent(key, val); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public boolean remove(K key) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.remove(key); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.removeAsync(key)); + + return false; + } + else + return delegate.remove(key); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.remove(key, oldVal); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.removeAsync(key, oldVal)); + + return false; + } + else + return delegate.remove(key, oldVal); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public V getAndRemove(K key) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.getAndRemove(key); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.getAndRemoveAsync(key)); + + return null; + } + else + return delegate.getAndRemove(key); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.replace(key, oldVal, newVal); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.replaceAsync(key, oldVal, newVal)); + + return false; + } + else + return delegate.replace(key, oldVal, newVal); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.replace(key, val); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.replaceAsync(key, val)); + + return false; + } + else + return delegate.replace(key, val); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.getAndReplace(key, val); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.getAndReplaceAsync(key, val)); + + return null; + } + else + return delegate.getAndReplace(key, val); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - lockFreeCache.removeAll(keys); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) + setFuture(delegate.removeAllAsync(keys)); + else + delegate.removeAll(keys); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public void removeAll() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.removeAll(); + if (isAsync()) + setFuture(delegate.removeAllAsync()); + else + delegate.removeAll(); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public void clear(K key) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.clear(key); + if (isAsync()) + setFuture(delegate.clearAsync(key)); + else + delegate.clear(key); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public void clearAll(Set<? extends K> keys) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.clearAll(keys); + if (isAsync()) + setFuture(delegate.clearAsync(keys)); + else + delegate.clearAll(keys); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public void clear() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.clear(); + if (isAsync()) + setFuture(delegate.clearAsync()); + else + delegate.clear(); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public void localClear(K key) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.localClear(key); + delegate.clearLocally(key); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public void localClearAll(Set<? extends K> keys) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.localClearAll(keys); + for (K key : keys) + delegate.clearLocally(key); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.invoke(key, entryProcessor, args); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); + + IgniteInternalFuture<T> fut0 = fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { + @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut) + throws IgniteCheckedException { + EntryProcessorResult<T> res = fut.get(); + + return res != null ? res.get() : null; + } + }); + + setFuture(fut0); + + return null; + } + else { + EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); + + return res != null ? res.get() : null; + } + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.invoke(key, entryProcessor, args); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); + + IgniteInternalFuture<T> fut0 = fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { + @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut) + throws IgniteCheckedException { + EntryProcessorResult<T> res = fut.get(); + + return res != null ? res.get() : null; + } + }); + + setFuture(fut0); + + return null; + } + else { + EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); + + return res != null ? res.get() : null; + } + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } @@ -670,13 +1219,24 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.invokeAll(keys, entryProcessor, args); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + + return null; + } + else + return delegate.invokeAll(keys, entryProcessor, args); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } @@ -684,13 +1244,24 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.invokeAll(keys, entryProcessor, args); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + + return null; + } + else + return delegate.invokeAll(keys, entryProcessor, args); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } @@ -698,65 +1269,76 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - return lockFreeCache.invokeAll(map, args); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); + + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(map, args)); + + return null; + } + else + return delegate.invokeAll(map, args); + } + finally { + onLeave(prev); + } } - finally { - gate.leave(prev); + catch (IgniteCheckedException e) { + throw cacheException(e); } } /** {@inheritDoc} */ @Override public String getName() { - return lockFreeCache.getName(); + return delegate.name(); } /** {@inheritDoc} */ @Override public CacheManager getCacheManager() { - return lockFreeCache.getCacheManager(); + return cacheMgr; } /** * @param cacheMgr Cache manager. */ public void setCacheManager(CacheManager cacheMgr) { - lockFreeCache.setCacheManager(cacheMgr); + this.cacheMgr = cacheMgr; } /** {@inheritDoc} */ @Override public void close() { - if (!gate.enterIfNotClosed()) + if (!onEnterIfNoClose()) return; IgniteInternalFuture<?> fut; try { - fut = context().kernalContext().cache().dynamicStopCache(context().name()); + fut = ctx.kernalContext().cache().dynamicStopCache(ctx.name()); } finally { - gate.leave(); + onLeave(); } try { fut.get(); } catch (IgniteCheckedException e) { - throw CU.convertToCacheException(e); + throw cacheException(e); } } /** {@inheritDoc} */ @Override public boolean isClosed() { - if (!gate.enterIfNotClosed()) + if (!onEnterIfNoClose()) return true; try { - return context().kernalContext().cache().context().closed(context()); + return ctx.kernalContext().cache().context().closed(ctx); } finally { - gate.leave(); + onLeave(); } } @@ -764,51 +1346,69 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable * */ public GridCacheProjectionEx delegate() { - return lockFreeCache.delegate(); + return delegate; } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> clazz) { - return lockFreeCache.unwrap(clazz); + if (clazz.isAssignableFrom(getClass())) + return (T)this; + else if (clazz.isAssignableFrom(IgniteEx.class)) + return (T)ctx.grid(); + else if (clazz.isAssignableFrom(legacyProxy.getClass())) + return (T)legacyProxy; + + throw new IllegalArgumentException("Unwrapping to class is not supported: " + clazz); } /** {@inheritDoc} */ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.registerCacheEntryListener(lsnrCfg); + ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - lockFreeCache.deregisterCacheEntryListener(lsnrCfg); + ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); + } + catch (IgniteCheckedException e) { + throw cacheException(e); } finally { - gate.leave(prev); + onLeave(prev); } } /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { - return lockFreeCache.iterator(); + return ctx.cache().igniteIterator(); } finally { - gate.leave(prev); + onLeave(prev); } } + /** {@inheritDoc} */ + @Override protected IgniteCache<K, V> createAsyncInstance() { + return new IgniteCacheProxy<>(ctx, delegate, prj, true); + } + /** * Creates projection that will operate with portable objects. <p> Projection returned by this method will force * cache not to deserialize portable objects, so keys and values will be returned from cache API methods without @@ -833,24 +1433,24 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable * @return Projection for portable objects. */ public <K1, V1> IgniteCache<K1, V1> keepPortable() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { GridCacheProjectionImpl<K1, V1> prj0 = new GridCacheProjectionImpl<>( - (CacheProjection<K1, V1>)(prj != null ? prj : lockFreeCache.delegate()), - (GridCacheContext<K1, V1>)context(), - prj != null && prj.skipStore(), + (CacheProjection<K1, V1>)(prj != null ? prj : delegate), + (GridCacheContext<K1, V1>)ctx, + prj != null ? prj.skipStore() : false, prj != null ? prj.subjectId() : null, true, prj != null ? prj.expiry() : null); - return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)context(), + return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx, prj0, prj0, isAsync()); } finally { - gate.leave(prev); + onLeave(prev); } } @@ -858,7 +1458,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable * @return Cache with skip store enabled. */ public IgniteCache<K, V> skipStore() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = onEnter(prj); try { boolean skip = prj != null && prj.skipStore(); @@ -867,53 +1467,166 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable return this; GridCacheProjectionImpl<K, V> prj0 = new GridCacheProjectionImpl<>( - (prj != null ? prj : lockFreeCache.delegate()), - context(), + (prj != null ? prj : delegate), + ctx, true, prj != null ? prj.subjectId() : null, prj != null && prj.isKeepPortable(), prj != null ? prj.expiry() : null); - return new IgniteCacheProxy<>(context(), + return new IgniteCacheProxy<>(ctx, prj0, prj0, isAsync()); } finally { - gate.leave(prev); + onLeave(prev); } } /** + * @param e Checked exception. + * @return Cache exception. + */ + private RuntimeException cacheException(IgniteCheckedException e) { + return CU.convertToCacheException(e); + } + + /** + * @param fut Future for async operation. + */ + private <R> void setFuture(IgniteInternalFuture<R> fut) { + curFut.set(new IgniteFutureImpl<>(fut)); + } + + /** * @return Legacy proxy. */ @NotNull public GridCacheProxyImpl<K, V> legacyProxy() { - return lockFreeCache.legacyProxy(); + return legacyProxy; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(lockFreeCache); + out.writeObject(ctx); + + out.writeObject(delegate); + + out.writeObject(prj); + + out.writeBoolean(lock); } /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - lockFreeCache = (IgniteCacheProxyLockFree<K, V>)in.readObject(); + ctx = (GridCacheContext<K, V>)in.readObject(); - prj = lockFreeCache.projection(); + delegate = (GridCacheProjectionEx<K, V>)in.readObject(); - gate = lockFreeCache.context().gate(); + prj = (GridCacheProjectionImpl<K, V>)in.readObject(); + + gate = ctx.gate(); + + lock = in.readBoolean(); } /** {@inheritDoc} */ @Override public IgniteFuture<?> rebalance() { - return lockFreeCache.rebalance(); + ctx.preloader().forcePreload(); + + return new IgniteFutureImpl<>(ctx.preloader().syncFuture()); + } + + /** + * @param prj Projection to guard. + * @return Previous projection set on this thread. + */ + private GridCacheProjectionImpl<K, V> onEnter(GridCacheProjectionImpl<K, V> prj) { + if (lock) + return gate.enter(prj); + else + return gate.enterNoLock(prj); + } + + /** + * On enter. + */ + private boolean onEnterIfNoClose() { + if (lock) + return gate.enterIfNotClosed(); + else + return gate.enterIfNotClosedNoLock(); + } + + /** + * @param prj Projection to guard.. + */ + private void onLeave(GridCacheProjectionImpl<K, V> prj) { + if (lock) + gate.leave(prj); + else + gate.leaveNoLock(prj); + } + + /** + * On leave. + */ + private void onLeave() { + if (lock) + gate.leave(); + else + gate.leaveNoLock(); } /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteCacheProxy.class, this); } + + /** + * Closeable iterator. + */ + private abstract static class ClIter<X, Y> extends GridCloseableIteratorAdapter<Y> { + /** */ + private X cur; + + /** */ + private CacheQueryFuture<X> fut; + + /** + * @param fut Future. + */ + protected ClIter(CacheQueryFuture<X> fut) { + this.fut = fut; + } + + /** {@inheritDoc} */ + @Override protected Y onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); + + X e = cur; + + cur = null; + + return convert(e); + } + + /** + * @param x X. + */ + protected abstract Y convert(X x); + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + return cur != null || (cur = fut.next()) != null; + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + fut.cancel(); + } + } }