#ignite-743: Implement IgniteCacheProxyLockFree.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/56ef2695 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/56ef2695 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/56ef2695 Branch: refs/heads/ignite-737 Commit: 56ef2695a429709388ffb13288524993f4f28d26 Parents: 196337a Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue Apr 14 16:17:24 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue Apr 14 16:17:24 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 995 +++------------- .../cache/IgniteCacheProxyLockFree.java | 1127 +++++++++++++++++- .../datastreamer/DataStreamerUpdateJob.java | 6 +- 3 files changed, 1304 insertions(+), 824 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ef2695/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 44a5dbe..dcdf0be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -22,18 +22,11 @@ import org.apache.ignite.cache.CacheManager; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.internal.processors.query.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; -import org.apache.ignite.spi.discovery.tcp.internal.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -53,33 +46,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** */ private static final long serialVersionUID = 0L; - /** */ - private static final IgniteBiPredicate ACCEPT_ALL = new IgniteBiPredicate() { - @Override public boolean apply(Object k, Object v) { - return true; - } - }; - - /** Context. */ - private GridCacheContext<K, V> ctx; - - /** Gateway. */ - private GridCacheGateway<K, V> gate; - /** Delegate. */ @GridToStringInclude - private GridCacheProjectionEx<K, V> delegate; + private IgniteCacheProxyLockFree<K, V> delegate; /** Projection. */ private GridCacheProjectionImpl<K, V> prj; - /** */ - @GridToStringExclude - private GridCacheProxyImpl<K, V> legacyProxy; - - /** */ - @GridToStringExclude - private CacheManager cacheMgr; + /** Gateway. */ + private GridCacheGateway<K, V> gate; /** * Empty constructor required for {@link Externalizable}. @@ -103,22 +78,26 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V super(async); assert ctx != null; - assert delegate != null; - - this.ctx = ctx; - this.delegate = delegate; - this.prj = prj; gate = ctx.gate(); - legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, prj); + this.prj = prj; + + this.delegate = new IgniteCacheProxyLockFree<>(ctx, delegate, prj, async); } /** * @return Context. */ public GridCacheContext<K, V> context() { - return ctx; + return delegate.context(); + } + + /** + * @return Lock free instance. + */ + public IgniteCacheProxyLockFree<K,V> lockFree() { + return delegate; } /** @@ -133,7 +112,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return ctx.cache().metrics(); + return delegate.metrics(); } finally { gate.leave(prev); @@ -145,20 +124,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); - - for (ClusterNode node : grp.nodes()) { - Map<Integer, CacheMetrics> nodeCacheMetrics = ((TcpDiscoveryNode)node).cacheMetrics(); - - if (nodeCacheMetrics != null) { - CacheMetrics e = nodeCacheMetrics.get(context().cacheId()); - - if (e != null) - metrics.add(e); - } - } - - return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics); + return delegate.metrics(grp); } finally { gate.leave(prev); @@ -170,7 +136,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return ctx.cache().mxBean(); + return delegate.mxBean(); } finally { gate.leave(prev); @@ -179,12 +145,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { - CacheConfiguration cfg = ctx.config(); - - if (!clazz.isAssignableFrom(cfg.getClass())) - throw new IllegalArgumentException(); - - return clazz.cast(cfg); + return delegate.getConfiguration(clazz); } /** {@inheritDoc} */ @@ -192,7 +153,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return ctx.cache().randomEntry(); + return delegate.randomEntry(); } finally { gate.leave(prev); @@ -204,9 +165,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc); - - return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0, isAsync()); + return delegate.withExpiryPolicy(plc); } finally { gate.leave(prev); @@ -220,292 +179,68 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) - setFuture(ctx.cache().globalLoadCacheAsync(p, args)); - else - ctx.cache().globalLoadCache(p, args); - } - finally { - gate.leave(prev); - } + delegate.loadCache(p, args); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) - setFuture(delegate.localLoadCacheAsync(p, args)); - else - delegate.localLoadCache(p, args); - } - finally { - gate.leave(prev); - } + delegate.localLoadCache(p, args); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.getAndPutIfAbsentAsync(key, val)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.getAndPutIfAbsent(key, val); - } - finally { - gate.leave(prev); - } + try { + return delegate.getAndPutIfAbsent(key, val); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public Lock lock(K key) throws CacheException { - return lockAll(Collections.singleton(key)); + return delegate.lock(key); } /** {@inheritDoc} */ @Override public Lock lockAll(final Collection<? extends K> keys) { - return new CacheLockImpl<>(gate, delegate, prj, keys); + return delegate.lockAll(keys); } /** {@inheritDoc} */ @Override public boolean isLocalLocked(K key, boolean byCurrThread) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key); - } - finally { - gate.leave(prev); - } - } - - /** - * @param filter Filter. - * @param grp Optional cluster group. - * @return Cursor. - */ - @SuppressWarnings("unchecked") - private QueryCursor<Entry<K,V>> query(Query filter, @Nullable ClusterGroup grp) { - final CacheQuery<Map.Entry<K,V>> qry; - final CacheQueryFuture<Map.Entry<K,V>> fut; - - if (filter instanceof ScanQuery) { - IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); - - qry = delegate.queries().createScanQuery(p != null ? p : ACCEPT_ALL); - - if (grp != null) - qry.projection(grp); - - fut = qry.execute(); - } - else if (filter instanceof TextQuery) { - TextQuery p = (TextQuery)filter; - - qry = delegate.queries().createFullTextQuery(p.getType(), p.getText()); - - if (grp != null) - qry.projection(grp); - - fut = qry.execute(); - } - else if (filter instanceof SpiQuery) { - qry = ((GridCacheQueriesEx)delegate.queries()).createSpiQuery(); - - if (grp != null) - qry.projection(grp); - - fut = qry.execute(((SpiQuery)filter).getArgs()); - } - else { - if (filter instanceof SqlFieldsQuery) - throw new CacheException("Use methods 'queryFields' and 'localQueryFields' for " + - SqlFieldsQuery.class.getSimpleName() + "."); - - throw new CacheException("Unsupported query type: " + filter); - } - - return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,V>>() { - /** */ - private Map.Entry<K,V> cur; - - @Override protected Entry<K,V> onNext() throws IgniteCheckedException { - if (!onHasNext()) - throw new NoSuchElementException(); - - Map.Entry<K,V> e = cur; - - cur = null; - - return new CacheEntryImpl<>(e.getKey(), e.getValue()); - } - - @Override protected boolean onHasNext() throws IgniteCheckedException { - return cur != null || (cur = fut.next()) != null; - } - - @Override protected void onClose() throws IgniteCheckedException { - fut.cancel(); - } - }); - } - - /** - * @param local Enforce local. - * @return Local node cluster group. - */ - private ClusterGroup projection(boolean local) { - if (local || ctx.isLocal() || isReplicatedDataNode()) - return ctx.kernalContext().grid().cluster().forLocal(); - - if (ctx.isReplicated()) - return ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).forRandom(); - - return null; - } - - /** - * Executes continuous query. - * - * @param qry Query. - * @param loc Local flag. - * @return Initial iteration cursor. - */ - @SuppressWarnings("unchecked") - private QueryCursor<Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc) { - if (qry.getInitialQuery() instanceof ContinuousQuery) - throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + - "continuous query. Use SCAN or SQL query for initial iteration."); - - if (qry.getLocalListener() == null) - throw new IgniteException("Mandatory local listener is not set for the query: " + qry); - - try { - final UUID routineId = ctx.continuousQueries().executeQuery( - qry.getLocalListener(), - qry.getRemoteFilter(), - qry.getPageSize(), - qry.getTimeInterval(), - qry.isAutoUnsubscribe(), - loc ? ctx.grid().cluster().forLocal() : null); - - final QueryCursor<Cache.Entry<K, V>> cur = - qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null; - - return new QueryCursor<Cache.Entry<K, V>>() { - @Override public Iterator<Cache.Entry<K, V>> iterator() { - return cur != null ? cur.iterator() : new GridEmptyIterator<Cache.Entry<K, V>>(); - } - - @Override public List<Cache.Entry<K, V>> getAll() { - return cur != null ? cur.getAll() : Collections.<Cache.Entry<K, V>>emptyList(); - } - - @Override public void close() { - if (cur != null) - cur.close(); - - try { - ctx.kernalContext().continuous().stopRoutine(routineId).get(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - }; - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } + return delegate.isLocalLocked(key, byCurrThread); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <R> QueryCursor<R> query(Query<R> qry) { - A.notNull(qry, "qry"); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - validate(qry); - - if (qry instanceof ContinuousQuery) - return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal()); - - if (qry instanceof SqlQuery) { - SqlQuery p = (SqlQuery)qry; - - if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) - return (QueryCursor<R>)new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(ctx, p)); - - return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); - } - - if (qry instanceof SqlFieldsQuery) { - SqlFieldsQuery p = (SqlFieldsQuery)qry; - - if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) - return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p); - - return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); - } - - return (QueryCursor<R>)query(qry, projection(qry.isLocal())); - } - catch (Exception e) { - if (e instanceof CacheException) - throw e; - - throw new CacheException(e); + return delegate.query(qry); } finally { gate.leave(prev); } } - /** - * @return {@code true} If this is a replicated cache and we are on a data node. - */ - private boolean isReplicatedDataNode() { - return ctx.isReplicated() && ctx.affinityNode(); - } - - /** - * Checks query. - * - * @param qry Query - * @throws CacheException If query indexing disabled for sql query. - */ - private void validate(Query qry) { - if (!GridQueryProcessor.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) && - !(qry instanceof ContinuousQuery)) - throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name()); - } - /** {@inheritDoc} */ @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -513,9 +248,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { return delegate.localEntries(peekModes); } - catch (IgniteCheckedException e) { - throw cacheException(e); - } finally { gate.leave(prev); } @@ -526,7 +258,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.queries().metrics(); + return delegate.queryMetrics(); } finally { gate.leave(prev); @@ -538,7 +270,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.evictAll(keys); + delegate.localEvict(keys); } finally { gate.leave(prev); @@ -550,10 +282,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.localPeek(key, peekModes, null); - } - catch (IgniteCheckedException e) { - throw cacheException(e); + return delegate.localPeek(key, peekModes); } finally { gate.leave(prev); @@ -562,18 +291,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void localPromote(Set<? extends K> keys) throws CacheException { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - try { - delegate.promoteAll(keys); - } - finally { - gate.leave(prev); - } + try { + delegate.localPromote(keys); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } @@ -582,16 +306,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - if (isAsync()) { - setFuture(delegate.sizeAsync(peekModes)); - - return 0; - } - else - return delegate.size(peekModes); - } - catch (IgniteCheckedException e) { - throw cacheException(e); + return delegate.size(peekModes); } finally { gate.leave(prev); @@ -605,9 +320,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { return delegate.localSize(peekModes); } - catch (IgniteCheckedException e) { - throw cacheException(e); - } finally { gate.leave(prev); } @@ -615,47 +327,25 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public V get(K key) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.getAsync(key)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.get(key); - } - finally { - gate.leave(prev); - } + try { + return delegate.get(key); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.getAllAsync(keys)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.getAll(keys); - } - finally { - gate.leave(prev); - } + try { + return delegate.getAll(keys); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } @@ -664,24 +354,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Values map. */ public Map<K, V> getAll(Collection<? extends K> keys) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.getAllAsync(keys)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.getAll(keys); - } - finally { - gate.leave(prev); - } + try { + return delegate.getAll(keys); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } @@ -707,13 +386,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - if (isAsync()) { - setFuture(delegate.containsKeyAsync(key)); - - return false; - } - else - return delegate.containsKey(key); + return delegate.containsKey(key); } finally { gate.leave(prev); @@ -725,13 +398,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - if (isAsync()) { - setFuture(delegate.containsKeysAsync(keys)); - - return false; - } - else - return delegate.containsKeys(keys); + return delegate.containsKeys(keys); } finally { gate.leave(prev); @@ -747,22 +414,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting); - - if (completionLsnr != null) { - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - try { - fut.get(); - - completionLsnr.onCompletion(); - } - catch (IgniteCheckedException e) { - completionLsnr.onException(cacheException(e)); - } - } - }); - } + delegate.loadAll(keys, replaceExisting, completionLsnr); } finally { gate.leave(prev); @@ -771,245 +423,133 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public void put(K key, V val) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) - setFuture(delegate.putAsync(key, val)); - else - delegate.put(key, val); - } - finally { - gate.leave(prev); - } + delegate.put(key, val); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.getAndPutAsync(key, val)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.getAndPut(key, val); - } - finally { - gate.leave(prev); - } + try { + return delegate.getAndPut(key, val); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) - setFuture(delegate.putAllAsync(map)); - else - delegate.putAll(map); - } - finally { - gate.leave(prev); - } + delegate.putAll(map); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.putIfAbsentAsync(key, val)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return false; - } - else - return delegate.putIfAbsent(key, val); - } - finally { - gate.leave(prev); - } + try { + return delegate.putIfAbsent(key, val); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public boolean remove(K key) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.removeAsync(key)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return false; - } - else - return delegate.remove(key); - } - finally { - gate.leave(prev); - } + try { + return delegate.remove(key); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.removeAsync(key, oldVal)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return false; - } - else - return delegate.remove(key, oldVal); - } - finally { - gate.leave(prev); - } + try { + return delegate.remove(key, oldVal); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public V getAndRemove(K key) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.getAndRemoveAsync(key)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.getAndRemove(key); - } - finally { - gate.leave(prev); - } + try { + return delegate.getAndRemove(key); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.replaceAsync(key, oldVal, newVal)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return false; - } - else - return delegate.replace(key, oldVal, newVal); - } - finally { - gate.leave(prev); - } + try { + return delegate.replace(key, oldVal, newVal); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.replaceAsync(key, val)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return false; - } - else - return delegate.replace(key, val); - } - finally { - gate.leave(prev); - } + try { + return delegate.replace(key, val); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.getAndReplaceAsync(key, val)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.getAndReplace(key, val); - } - finally { - gate.leave(prev); - } + try { + return delegate.getAndReplace(key, val); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) - setFuture(delegate.removeAllAsync(keys)); - else - delegate.removeAll(keys); - } - finally { - gate.leave(prev); - } + delegate.removeAll(keys); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } @@ -1018,13 +558,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - if (isAsync()) - setFuture(delegate.removeAllAsync()); - else - delegate.removeAll(); - } - catch (IgniteCheckedException e) { - throw cacheException(e); + delegate.removeAll(); } finally { gate.leave(prev); @@ -1036,13 +570,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - if (isAsync()) - setFuture(delegate.clearAsync(key)); - else - delegate.clear(key); - } - catch (IgniteCheckedException e) { - throw cacheException(e); + delegate.clear(key); } finally { gate.leave(prev); @@ -1054,13 +582,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - if (isAsync()) - setFuture(delegate.clearAsync(keys)); - else - delegate.clearAll(keys); - } - catch (IgniteCheckedException e) { - throw cacheException(e); + delegate.clearAll(keys); } finally { gate.leave(prev); @@ -1072,13 +594,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - if (isAsync()) - setFuture(delegate.clearAsync()); - else - delegate.clear(); - } - catch (IgniteCheckedException e) { - throw cacheException(e); + delegate.clear(); } finally { gate.leave(prev); @@ -1090,7 +606,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - delegate.clearLocally(key); + delegate.localClear(key); } finally { gate.leave(prev); @@ -1102,8 +618,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - for (K key : keys) - delegate.clearLocally(key); + delegate.localClearAll(keys); } finally { gate.leave(prev); @@ -1113,76 +628,26 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); - - IgniteInternalFuture<T> fut0 = fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { - @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut) - throws IgniteCheckedException { - EntryProcessorResult<T> res = fut.get(); - - return res != null ? res.get() : null; - } - }); - - setFuture(fut0); - - return null; - } - else { - EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return res != null ? res.get() : null; - } - } - finally { - gate.leave(prev); - } + try { + return delegate.invoke(key, entryProcessor, args); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); - - IgniteInternalFuture<T> fut0 = fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { - @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut) - throws IgniteCheckedException { - EntryProcessorResult<T> res = fut.get(); - - return res != null ? res.get() : null; - } - }); - - setFuture(fut0); - - return null; - } - else { - EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return res != null ? res.get() : null; - } - } - finally { - gate.leave(prev); - } + try { + return delegate.invoke(key, entryProcessor, args); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } @@ -1190,24 +655,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.invokeAll(keys, entryProcessor, args); - } - finally { - gate.leave(prev); - } + try { + return delegate.invokeAll(keys, entryProcessor, args); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } @@ -1215,24 +669,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.invokeAll(keys, entryProcessor, args); - } - finally { - gate.leave(prev); - } + try { + return delegate.invokeAll(keys, entryProcessor, args); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } @@ -1240,42 +683,31 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { - try { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - if (isAsync()) { - setFuture(delegate.invokeAllAsync(map, args)); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return null; - } - else - return delegate.invokeAll(map, args); - } - finally { - gate.leave(prev); - } + try { + return delegate.invokeAll(map, args); } - catch (IgniteCheckedException e) { - throw cacheException(e); + finally { + gate.leave(prev); } } /** {@inheritDoc} */ @Override public String getName() { - return delegate.name(); + return delegate.getName(); } /** {@inheritDoc} */ @Override public CacheManager getCacheManager() { - return cacheMgr; + return delegate.getCacheManager(); } /** * @param cacheMgr Cache manager. */ public void setCacheManager(CacheManager cacheMgr) { - this.cacheMgr = cacheMgr; + delegate.setCacheManager(cacheMgr); } /** {@inheritDoc} */ @@ -1286,7 +718,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V IgniteInternalFuture<?> fut; try { - fut = ctx.kernalContext().cache().dynamicStopCache(ctx.name()); + fut = context().kernalContext().cache().dynamicStopCache(context().name()); } finally { gate.leave(); @@ -1296,7 +728,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V fut.get(); } catch (IgniteCheckedException e) { - throw cacheException(e); + throw CU.convertToCacheException(e); } } @@ -1306,7 +738,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return true; try { - return ctx.kernalContext().cache().context().closed(ctx); + return context().kernalContext().cache().context().closed(context()); } finally { gate.leave(); @@ -1317,20 +749,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * */ public GridCacheProjectionEx delegate() { - return delegate; + return delegate.delegate(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> clazz) { - if (clazz.isAssignableFrom(getClass())) - return (T)this; - else if (clazz.isAssignableFrom(IgniteEx.class)) - return (T)ctx.grid(); - else if (clazz.isAssignableFrom(legacyProxy.getClass())) - return (T)legacyProxy; - - throw new IllegalArgumentException("Unwrapping to class is not supported: " + clazz); + return delegate.unwrap(clazz); } /** {@inheritDoc} */ @@ -1338,10 +763,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false); - } - catch (IgniteCheckedException e) { - throw cacheException(e); + delegate.registerCacheEntryListener(lsnrCfg); } finally { gate.leave(prev); @@ -1353,10 +775,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); - } - catch (IgniteCheckedException e) { - throw cacheException(e); + delegate.deregisterCacheEntryListener(lsnrCfg); } finally { gate.leave(prev); @@ -1368,7 +787,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return ctx.cache().igniteIterator(); + return delegate.iterator(); } finally { gate.leave(prev); @@ -1377,7 +796,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override protected IgniteCache<K, V> createAsyncInstance() { - return new IgniteCacheProxy<>(ctx, delegate, prj, true); + return delegate.createAsyncInstance(); } /** @@ -1409,13 +828,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { GridCacheProjectionImpl<K1, V1> prj0 = new GridCacheProjectionImpl<>( (CacheProjection<K1, V1>)(prj != null ? prj : delegate), - (GridCacheContext<K1, V1>)ctx, - prj != null ? prj.skipStore() : false, + (GridCacheContext<K1, V1>)context(), + prj != null && prj.skipStore(), prj != null ? prj.subjectId() : null, true, prj != null ? prj.expiry() : null); - return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx, + return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)context(), prj0, prj0, isAsync()); @@ -1438,14 +857,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return this; GridCacheProjectionImpl<K, V> prj0 = new GridCacheProjectionImpl<>( - (prj != null ? prj : delegate), - ctx, + (prj != null ? prj : delegate.delegate()), + context(), true, prj != null ? prj.subjectId() : null, prj != null && prj.isKeepPortable(), prj != null ? prj.expiry() : null); - return new IgniteCacheProxy<>(ctx, + return new IgniteCacheProxy<>(context(), prj0, prj0, isAsync()); @@ -1456,32 +875,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * @param e Checked exception. - * @return Cache exception. - */ - private RuntimeException cacheException(IgniteCheckedException e) { - return CU.convertToCacheException(e); - } - - /** - * @param fut Future for async operation. - */ - private <R> void setFuture(IgniteInternalFuture<R> fut) { - curFut.set(new IgniteFutureImpl<>(fut)); - } - - /** * @return Legacy proxy. */ @NotNull public GridCacheProxyImpl<K, V> legacyProxy() { - return legacyProxy; + return delegate.legacyProxy(); } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(ctx); - out.writeObject(delegate); out.writeObject(prj); @@ -1490,69 +892,20 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ctx = (GridCacheContext<K, V>)in.readObject(); - - delegate = (GridCacheProjectionEx<K, V>)in.readObject(); + delegate = (IgniteCacheProxyLockFree<K, V>)in.readObject(); prj = (GridCacheProjectionImpl<K, V>)in.readObject(); - gate = ctx.gate(); + gate = delegate.context().gate(); } /** {@inheritDoc} */ @Override public IgniteFuture<?> rebalance() { - ctx.preloader().forcePreload(); - - return new IgniteFutureImpl<>(ctx.preloader().syncFuture()); + return delegate.rebalance(); } /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteCacheProxy.class, this); } - - /** - * Closeable iterator. - */ - private abstract static class ClIter<X, Y> extends GridCloseableIteratorAdapter<Y> { - /** */ - private X cur; - - /** */ - private CacheQueryFuture<X> fut; - - /** - * @param fut Future. - */ - protected ClIter(CacheQueryFuture<X> fut) { - this.fut = fut; - } - - /** {@inheritDoc} */ - @Override protected Y onNext() throws IgniteCheckedException { - if (!onHasNext()) - throw new NoSuchElementException(); - - X e = cur; - - cur = null; - - return convert(e); - } - - /** - * @param x X. - */ - protected abstract Y convert(X x); - - /** {@inheritDoc} */ - @Override protected boolean onHasNext() throws IgniteCheckedException { - return cur != null || (cur = fut.next()) != null; - } - - /** {@inheritDoc} */ - @Override protected void onClose() throws IgniteCheckedException { - fut.cancel(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ef2695/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyLockFree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyLockFree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyLockFree.java index 81e1f24..976eeac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyLockFree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyLockFree.java @@ -17,8 +17,1133 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.CacheManager; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.mxbean.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.expiry.*; +import javax.cache.integration.*; +import javax.cache.processor.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.locks.*; + /** * Cache proxy lock free. */ -public class IgniteCacheProxyLockFree { +public class IgniteCacheProxyLockFree <K, V> extends AsyncSupportAdapter<IgniteCache<K, V>> + implements IgniteCache<K, V>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final IgniteBiPredicate ACCEPT_ALL = new IgniteBiPredicate() { + @Override public boolean apply(Object k, Object v) { + return true; + } + }; + + /** Context. */ + private GridCacheContext<K, V> ctx; + + /** Delegate. */ + @GridToStringInclude + private GridCacheProjectionEx<K, V> delegate; + + /** Projection. */ + private GridCacheProjectionImpl<K, V> prj; + + /** */ + @GridToStringExclude + private GridCacheProxyImpl<K, V> legacyProxy; + + /** */ + @GridToStringExclude + private CacheManager cacheMgr; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public IgniteCacheProxyLockFree() { + // No-op. + } + + /** + * @param ctx Context. + * @param delegate Delegate. + * @param prj Projection. + * @param async Async support flag. + */ + public IgniteCacheProxyLockFree( + GridCacheContext<K, V> ctx, + GridCacheProjectionEx<K, V> delegate, + @Nullable GridCacheProjectionImpl<K, V> prj, + boolean async + ) { + super(async); + + assert ctx != null; + assert delegate != null; + + this.ctx = ctx; + this.delegate = delegate; + this.prj = prj; + + legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, prj); + } + + /** + * @return Context. + */ + public GridCacheContext<K, V> context() { + return ctx; + } + + /** {@inheritDoc} */ + @Override public CacheMetrics metrics() { + return ctx.cache().metrics(); + } + + /** {@inheritDoc} */ + @Override public CacheMetrics metrics(ClusterGroup grp) { + List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); + + for (ClusterNode node : grp.nodes()) { + Map<Integer, CacheMetrics> nodeCacheMetrics = ((TcpDiscoveryNode)node).cacheMetrics(); + + if (nodeCacheMetrics != null) { + CacheMetrics e = nodeCacheMetrics.get(context().cacheId()); + + if (e != null) + metrics.add(e); + } + } + + return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics); + } + + /** {@inheritDoc} */ + @Override public CacheMetricsMXBean mxBean() { + return ctx.cache().mxBean(); + } + + /** {@inheritDoc} */ + @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { + CacheConfiguration cfg = ctx.config(); + + if (!clazz.isAssignableFrom(cfg.getClass())) + throw new IllegalArgumentException(); + + return clazz.cast(cfg); + } + + /** {@inheritDoc} */ + @Nullable @Override public Entry<K, V> randomEntry() { + return ctx.cache().randomEntry(); + } + + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) { + GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc); + + return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0, isAsync()); + } + + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withSkipStore() { + return skipStore(); + } + + /** {@inheritDoc} */ + @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + try { + if (isAsync()) + setFuture(ctx.cache().globalLoadCacheAsync(p, args)); + else + ctx.cache().globalLoadCache(p, args); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + try { + if (isAsync()) + setFuture(delegate.localLoadCacheAsync(p, args)); + else + delegate.localLoadCache(p, args); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { + try { + if (isAsync()) { + setFuture(delegate.getAndPutIfAbsentAsync(key, val)); + + return null; + } + else + return delegate.getAndPutIfAbsent(key, val); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public Lock lock(K key) throws CacheException { + return lockAll(Collections.singleton(key)); + } + + /** {@inheritDoc} */ + @Override public Lock lockAll(final Collection<? extends K> keys) { + return new CacheLockImpl<>(ctx.gate(), delegate, prj, keys); + } + + /** {@inheritDoc} */ + @Override public boolean isLocalLocked(K key, boolean byCurrThread) { + return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key); + } + + /** + * @param filter Filter. + * @param grp Optional cluster group. + * @return Cursor. + */ + @SuppressWarnings("unchecked") + private QueryCursor<Entry<K,V>> query(Query filter, @Nullable ClusterGroup grp) { + final CacheQuery<Map.Entry<K,V>> qry; + final CacheQueryFuture<Map.Entry<K,V>> fut; + + if (filter instanceof ScanQuery) { + IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); + + qry = delegate.queries().createScanQuery(p != null ? p : ACCEPT_ALL); + + if (grp != null) + qry.projection(grp); + + fut = qry.execute(); + } + else if (filter instanceof TextQuery) { + TextQuery p = (TextQuery)filter; + + qry = delegate.queries().createFullTextQuery(p.getType(), p.getText()); + + if (grp != null) + qry.projection(grp); + + fut = qry.execute(); + } + else if (filter instanceof SpiQuery) { + qry = ((GridCacheQueriesEx)delegate.queries()).createSpiQuery(); + + if (grp != null) + qry.projection(grp); + + fut = qry.execute(((SpiQuery)filter).getArgs()); + } + else { + if (filter instanceof SqlFieldsQuery) + throw new CacheException("Use methods 'queryFields' and 'localQueryFields' for " + + SqlFieldsQuery.class.getSimpleName() + "."); + + throw new CacheException("Unsupported query type: " + filter); + } + + return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,V>>() { + /** */ + private Map.Entry<K,V> cur; + + @Override protected Entry<K,V> onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); + + Map.Entry<K,V> e = cur; + + cur = null; + + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return cur != null || (cur = fut.next()) != null; + } + + @Override protected void onClose() throws IgniteCheckedException { + fut.cancel(); + } + }); + } + + /** + * @param local Enforce local. + * @return Local node cluster group. + */ + private ClusterGroup projection(boolean local) { + if (local || ctx.isLocal() || isReplicatedDataNode()) + return ctx.kernalContext().grid().cluster().forLocal(); + + if (ctx.isReplicated()) + return ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).forRandom(); + + return null; + } + + /** + * Executes continuous query. + * + * @param qry Query. + * @param loc Local flag. + * @return Initial iteration cursor. + */ + @SuppressWarnings("unchecked") + private QueryCursor<Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc) { + if (qry.getInitialQuery() instanceof ContinuousQuery) + throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + + "continuous query. Use SCAN or SQL query for initial iteration."); + + if (qry.getLocalListener() == null) + throw new IgniteException("Mandatory local listener is not set for the query: " + qry); + + try { + final UUID routineId = ctx.continuousQueries().executeQuery( + qry.getLocalListener(), + qry.getRemoteFilter(), + qry.getPageSize(), + qry.getTimeInterval(), + qry.isAutoUnsubscribe(), + loc ? ctx.grid().cluster().forLocal() : null); + + final QueryCursor<Cache.Entry<K, V>> cur = + qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null; + + return new QueryCursor<Cache.Entry<K, V>>() { + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return cur != null ? cur.iterator() : new GridEmptyIterator<Cache.Entry<K, V>>(); + } + + @Override public List<Cache.Entry<K, V>> getAll() { + return cur != null ? cur.getAll() : Collections.<Cache.Entry<K, V>>emptyList(); + } + + @Override public void close() { + if (cur != null) + cur.close(); + + try { + ctx.kernalContext().continuous().stopRoutine(routineId).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + }; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <R> QueryCursor<R> query(Query<R> qry) { + A.notNull(qry, "qry"); + + try { + validate(qry); + + if (qry instanceof ContinuousQuery) + return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal()); + + if (qry instanceof SqlQuery) { + SqlQuery p = (SqlQuery)qry; + + if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) + return (QueryCursor<R>)new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(ctx, p)); + + return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); + } + + if (qry instanceof SqlFieldsQuery) { + SqlFieldsQuery p = (SqlFieldsQuery)qry; + + if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) + return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p); + + return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); + } + + return (QueryCursor<R>)query(qry, projection(qry.isLocal())); + } + catch (Exception e) { + if (e instanceof CacheException) + throw e; + + throw new CacheException(e); + } + } + + /** + * @return {@code true} If this is a replicated cache and we are on a data node. + */ + private boolean isReplicatedDataNode() { + return ctx.isReplicated() && ctx.affinityNode(); + } + + /** + * Checks query. + * + * @param qry Query + * @throws CacheException If query indexing disabled for sql query. + */ + private void validate(Query qry) { + if (!GridQueryProcessor.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) && + !(qry instanceof ContinuousQuery)) + throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name()); + } + + /** {@inheritDoc} */ + @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { + try { + return delegate.localEntries(peekModes); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public QueryMetrics queryMetrics() { + return delegate.queries().metrics(); + } + + /** {@inheritDoc} */ + @Override public void localEvict(Collection<? extends K> keys) { + delegate.evictAll(keys); + } + + /** {@inheritDoc} */ + @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { + try { + return delegate.localPeek(key, peekModes, null); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void localPromote(Set<? extends K> keys) throws CacheException { + try { + delegate.promoteAll(keys); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public int size(CachePeekMode... peekModes) throws CacheException { + try { + if (isAsync()) { + setFuture(delegate.sizeAsync(peekModes)); + + return 0; + } + else + return delegate.size(peekModes); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public int localSize(CachePeekMode... peekModes) { + try { + return delegate.localSize(peekModes); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public V get(K key) { + try { + if (isAsync()) { + setFuture(delegate.getAsync(key)); + + return null; + } + else + return delegate.get(key); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public Map<K, V> getAll(Set<? extends K> keys) { + try { + if (isAsync()) { + setFuture(delegate.getAllAsync(keys)); + + return null; + } + else + return delegate.getAll(keys); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** + * @param keys Keys. + * @return Values map. + */ + public Map<K, V> getAll(Collection<? extends K> keys) { + try { + if (isAsync()) { + setFuture(delegate.getAllAsync(keys)); + + return null; + } + else + return delegate.getAll(keys); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** + * Gets entry set containing internal entries. + * + * @param filter Filter. + * @return Entry set. + */ + public Set<Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { + return delegate.entrySetx(filter); + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(K key) { + if (isAsync()) { + setFuture(delegate.containsKeyAsync(key)); + + return false; + } + else + return delegate.containsKey(key); + + } + + /** {@inheritDoc} */ + @Override public boolean containsKeys(Set<? extends K> keys) { + if (isAsync()) { + setFuture(delegate.containsKeysAsync(keys)); + + return false; + } + else + return delegate.containsKeys(keys); + } + + /** {@inheritDoc} */ + @Override public void loadAll( + Set<? extends K> keys, + boolean replaceExisting, + @Nullable final CompletionListener completionLsnr + ) { + IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting); + + if (completionLsnr != null) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + try { + fut.get(); + + completionLsnr.onCompletion(); + } + catch (IgniteCheckedException e) { + completionLsnr.onException(cacheException(e)); + } + } + }); + } + } + + /** {@inheritDoc} */ + @Override public void put(K key, V val) { + try { + if (isAsync()) + setFuture(delegate.putAsync(key, val)); + else + delegate.put(key, val); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public V getAndPut(K key, V val) { + try { + if (isAsync()) { + setFuture(delegate.getAndPutAsync(key, val)); + + return null; + } + else + return delegate.getAndPut(key, val); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void putAll(Map<? extends K, ? extends V> map) { + try { + if (isAsync()) + setFuture(delegate.putAllAsync(map)); + else + delegate.putAll(map); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean putIfAbsent(K key, V val) { + try { + if (isAsync()) { + setFuture(delegate.putIfAbsentAsync(key, val)); + + return false; + } + else + return delegate.putIfAbsent(key, val); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean remove(K key) { + try { + if (isAsync()) { + setFuture(delegate.removeAsync(key)); + + return false; + } + else + return delegate.remove(key); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean remove(K key, V oldVal) { + try { + if (isAsync()) { + setFuture(delegate.removeAsync(key, oldVal)); + + return false; + } + else + return delegate.remove(key, oldVal); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public V getAndRemove(K key) { + try { + if (isAsync()) { + setFuture(delegate.getAndRemoveAsync(key)); + + return null; + } + else + return delegate.getAndRemove(key); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean replace(K key, V oldVal, V newVal) { + try { + if (isAsync()) { + setFuture(delegate.replaceAsync(key, oldVal, newVal)); + + return false; + } + else + return delegate.replace(key, oldVal, newVal); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean replace(K key, V val) { + try { + if (isAsync()) { + setFuture(delegate.replaceAsync(key, val)); + + return false; + } + else + return delegate.replace(key, val); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public V getAndReplace(K key, V val) { + try { + if (isAsync()) { + setFuture(delegate.getAndReplaceAsync(key, val)); + + return null; + } + else + return delegate.getAndReplace(key, val); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void removeAll(Set<? extends K> keys) { + try { + if (isAsync()) + setFuture(delegate.removeAllAsync(keys)); + else + delegate.removeAll(keys); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void removeAll() { + try { + if (isAsync()) + setFuture(delegate.removeAllAsync()); + else + delegate.removeAll(); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void clear(K key) { + try { + if (isAsync()) + setFuture(delegate.clearAsync(key)); + else + delegate.clear(key); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void clearAll(Set<? extends K> keys) { + try { + if (isAsync()) + setFuture(delegate.clearAsync(keys)); + else + delegate.clearAll(keys); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void clear() { + try { + if (isAsync()) + setFuture(delegate.clearAsync()); + else + delegate.clear(); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void localClear(K key) { + delegate.clearLocally(key); + } + + /** {@inheritDoc} */ + @Override public void localClearAll(Set<? extends K> keys) { + for (K key : keys) + delegate.clearLocally(key); + } + + /** {@inheritDoc} */ + @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) + throws EntryProcessorException { + try { + if (isAsync()) { + IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); + + IgniteInternalFuture<T> fut0 = fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { + @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut) + throws IgniteCheckedException { + EntryProcessorResult<T> res = fut.get(); + + return res != null ? res.get() : null; + } + }); + + setFuture(fut0); + + return null; + } + else { + EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); + + return res != null ? res.get() : null; + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) + throws EntryProcessorException { + try { + if (isAsync()) { + IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); + + IgniteInternalFuture<T> fut0 = fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { + @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut) + throws IgniteCheckedException { + EntryProcessorResult<T> res = fut.get(); + + return res != null ? res.get() : null; + } + }); + + setFuture(fut0); + + return null; + } + else { + EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); + + return res != null ? res.get() : null; + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + + return null; + } + else + return delegate.invokeAll(keys, entryProcessor, args); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + CacheEntryProcessor<K, V, T> entryProcessor, + Object... args) { + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + + return null; + } + else + return delegate.invokeAll(keys, entryProcessor, args); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + Object... args) { + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(map, args)); + + return null; + } + else + return delegate.invokeAll(map, args); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public String getName() { + return delegate.name(); + } + + /** {@inheritDoc} */ + @Override public CacheManager getCacheManager() { + return cacheMgr; + } + + /** + * @param cacheMgr Cache manager. + */ + public void setCacheManager(CacheManager cacheMgr) { + this.cacheMgr = cacheMgr; + } + + /** {@inheritDoc} */ + @Override public void close() { + IgniteInternalFuture<?> fut; + + fut = ctx.kernalContext().cache().dynamicStopCache(ctx.name()); + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isClosed() { + return ctx.kernalContext().cache().context().closed(ctx); + } + + /** + * + */ + public GridCacheProjectionEx delegate() { + return delegate; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> clazz) { + if (clazz.isAssignableFrom(getClass())) + return (T)this; + else if (clazz.isAssignableFrom(IgniteEx.class)) + return (T)ctx.grid(); + else if (clazz.isAssignableFrom(legacyProxy.getClass())) + return (T)legacyProxy; + + throw new IllegalArgumentException("Unwrapping to class is not supported: " + clazz); + } + + /** {@inheritDoc} */ + @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { + try { + ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { + try { + ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return ctx.cache().igniteIterator(); + } + + /** {@inheritDoc} */ + @Override protected IgniteCache<K, V> createAsyncInstance() { + return new IgniteCacheProxyLockFree<>(ctx, delegate, prj, true); + } + + /** + * Creates projection that will operate with portable objects. <p> Projection returned by this method will force + * cache not to deserialize portable objects, so keys and values will be returned from cache API methods without + * changes. Therefore, signature of the projection can contain only following types: <ul> <li>{@code PortableObject} + * for portable classes</li> <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li> + * <li>Arrays of primitives (byte[], int[], ...)</li> <li>{@link String} and array of {@link String}s</li> + * <li>{@link UUID} and array of {@link UUID}s</li> <li>{@link Date} and array of {@link Date}s</li> <li>{@link + * java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li> <li>Enums and array of enums</li> <li> Maps, + * collections and array of objects (but objects inside them will still be converted if they are portable) </li> + * </ul> <p> For example, if you use {@link Integer} as a key and {@code Value} class as a value (which will be + * stored in portable format), you should acquire following projection to avoid deserialization: + * <pre> + * CacheProjection<Integer, GridPortableObject> prj = cache.keepPortable(); + * + * // Value is not deserialized and returned in portable format. + * GridPortableObject po = prj.get(1); + * </pre> + * <p> Note that this method makes sense only if cache is working in portable mode ({@code + * CacheConfiguration#isPortableEnabled()} returns {@code true}. If not, this method is no-op and will return + * current projection. + * + * @return Projection for portable objects. + */ + public <K1, V1> IgniteCache<K1, V1> keepPortable() { + GridCacheProjectionImpl<K1, V1> prj0 = new GridCacheProjectionImpl<>( + (CacheProjection<K1, V1>)(prj != null ? prj : delegate), + (GridCacheContext<K1, V1>)ctx, + prj != null && prj.skipStore(), + prj != null ? prj.subjectId() : null, + true, + prj != null ? prj.expiry() : null); + + return new IgniteCacheProxyLockFree<>((GridCacheContext<K1, V1>)ctx, + prj0, + prj0, + isAsync()); + } + + /** + * @return Cache with skip store enabled. + */ + public IgniteCache<K, V> skipStore() { + boolean skip = prj != null && prj.skipStore(); + + if (skip) + return this; + + GridCacheProjectionImpl<K, V> prj0 = new GridCacheProjectionImpl<>( + (prj != null ? prj : delegate), + ctx, + true, + prj != null ? prj.subjectId() : null, + prj != null && prj.isKeepPortable(), + prj != null ? prj.expiry() : null); + + return new IgniteCacheProxyLockFree<>(ctx, + prj0, + prj0, + isAsync()); + } + + /** + * @param e Checked exception. + * @return Cache exception. + */ + private RuntimeException cacheException(IgniteCheckedException e) { + return CU.convertToCacheException(e); + } + + /** + * @param fut Future for async operation. + */ + private <R> void setFuture(IgniteInternalFuture<R> fut) { + curFut.set(new IgniteFutureImpl<>(fut)); + } + + /** + * @return Legacy proxy. + */ + @NotNull + public GridCacheProxyImpl<K, V> legacyProxy() { + return legacyProxy; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx); + + out.writeObject(delegate); + + out.writeObject(prj); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ctx = (GridCacheContext<K, V>)in.readObject(); + + delegate = (GridCacheProjectionEx<K, V>)in.readObject(); + + prj = (GridCacheProjectionImpl<K, V>)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> rebalance() { + ctx.preloader().forcePreload(); + + return new IgniteFutureImpl<>(ctx.preloader().syncFuture()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteCacheProxyLockFree.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ef2695/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java index e06946f..323c8ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -100,10 +100,12 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { // if (ignoreDepOwnership) // cache.context().deploy().ignoreOwnership(true); - IgniteCacheProxy cache = ctx.cache().jcache(cacheName); + IgniteCacheProxyLockFree cache = ctx.cache().jcache(cacheName).lockFree(); + + cache.context().awaitStarted(); if (skipStore) - cache = (IgniteCacheProxy<?, ?>)cache.withSkipStore(); + cache = (IgniteCacheProxyLockFree<?, ?>)cache.withSkipStore(); if (ignoreDepOwnership) cache.context().deploy().ignoreOwnership(true);