Ignite-52 - Get rid of filters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a03fee32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a03fee32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a03fee32 Branch: refs/heads/sprint-1 Commit: a03fee32a2f970a2231bae01dc8240d303b9e7ca Parents: 0acd2d4 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon Jan 26 16:57:27 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon Jan 26 16:57:27 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 398 +++++++------------ .../processors/cache/GridCacheEntryEx.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 161 +++----- .../cache/GridCacheProjectionImpl.java | 20 +- .../GridDistributedTxRemoteAdapter.java | 4 +- .../distributed/dht/GridDhtCacheAdapter.java | 22 +- .../cache/distributed/dht/GridDhtGetFuture.java | 21 +- .../dht/GridDhtTransactionalCacheAdapter.java | 1 - .../dht/GridPartitionedGetFuture.java | 10 - .../dht/atomic/GridDhtAtomicCache.java | 16 +- .../dht/colocated/GridDhtColocatedCache.java | 10 +- .../distributed/near/GridNearAtomicCache.java | 4 +- .../distributed/near/GridNearCacheAdapter.java | 39 +- .../distributed/near/GridNearCacheEntry.java | 4 +- .../distributed/near/GridNearGetFuture.java | 13 +- .../distributed/near/GridNearGetRequest.java | 114 +----- .../near/GridNearTransactionalCache.java | 9 +- .../cache/distributed/near/GridNearTxLocal.java | 2 - .../local/atomic/GridLocalAtomicCache.java | 23 +- .../cache/transactions/IgniteTxAdapter.java | 1 - .../transactions/IgniteTxLocalAdapter.java | 202 ++++------ .../cache/transactions/IgniteTxLocalEx.java | 4 +- .../cache/transactions/IgniteTxProxyImpl.java | 7 + .../processors/cache/GridCacheTestEntryEx.java | 3 +- .../IgniteCacheContainsKeyAbstractSelfTest.java | 125 ++++++ ...GridCacheDhtEvictionNearReadersSelfTest.java | 2 +- ...IgniteCacheContainsKeyColocatedSelfTest.java | 28 ++ 27 files changed, 519 insertions(+), 728 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 4e44e14..6448234 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1371,7 +1371,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, String taskName = ctx.kernalContext().job().currentTaskName(); return getAllAsync(F.asList(key), !ctx.config().isReadFromBackup(), /*skip tx*/false, entry, null, taskName, - deserializePortable, filter).get().get(key); + deserializePortable).get().get(key); } /** {@inheritDoc} */ @@ -1413,44 +1413,28 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Nullable @Override public V reload(K key) throws IgniteCheckedException { - return reload(key, (IgnitePredicate<CacheEntry<K, V>>[])null); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<V> reloadAsync(K key) { - return reloadAsync(key, (IgnitePredicate<CacheEntry<K, V>>[])null); - } - - /** {@inheritDoc} */ @Override public void reloadAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { - reloadAll(keys, (IgnitePredicate<CacheEntry<K, V>>[])null); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys) { - return reloadAllAsync(keys, (IgnitePredicate<CacheEntry<K, V>>[])null); + reloadAll(keys, false); } /** {@inheritDoc} */ @Override public void reloadAll() throws IgniteCheckedException { ctx.denyOnFlags(F.asList(LOCAL, READ)); - reloadAll(keySet(), (IgnitePredicate<CacheEntry<K, V>>[])null); + reloadAll(keySet()); } /** {@inheritDoc} */ @Override public IgniteFuture<?> reloadAllAsync() { ctx.denyOnFlags(F.asList(LOCAL, READ)); - return reloadAllAsync(keySet(), (IgnitePredicate<CacheEntry<K, V>>[])null); + return reloadAllAsync(keySet()); } /** * @param keys Keys. * @param reload Reload flag. * @param tx Transaction. - * @param filter Filter. * @param subjId Subject ID. * @param taskName Task name. * @param vis Visitor. @@ -1459,7 +1443,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, public IgniteFuture<Object> readThroughAllAsync(final Collection<? extends K> keys, boolean reload, @Nullable final IgniteTxEx<K, V> tx, - IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable UUID subjId, String taskName, final IgniteBiInClosure<K, V> vis) { @@ -1480,176 +1463,165 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * @param keys Keys. * @param ret Return flag. - * @param filter Optional filter. * @return Non-{@code null} map if return flag is {@code true}. * @throws IgniteCheckedException If failed. */ - @Nullable public Map<K, V> reloadAll(@Nullable Collection<? extends K> keys, boolean ret, - @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { + @Nullable public Map<K, V> reloadAll(@Nullable Collection<? extends K> keys, boolean ret) throws IgniteCheckedException { UUID subjId = ctx.subjectIdPerCall(null); String taskName = ctx.kernalContext().job().currentTaskName(); - return reloadAllAsync(keys, ret, subjId, taskName, filter).get(); + return reloadAllAsync(keys, ret, subjId, taskName).get(); } /** * @param keys Keys. * @param ret Return flag. - * @param filter Filter. * @return Future. */ public IgniteFuture<Map<K, V>> reloadAllAsync(@Nullable Collection<? extends K> keys, boolean ret, - @Nullable UUID subjId, String taskName, @Nullable final IgnitePredicate<CacheEntry<K, V>>... filter) { + @Nullable UUID subjId, String taskName) { ctx.denyOnFlag(READ); final long topVer = ctx.affinity().affinityTopologyVersion(); if (!F.isEmpty(keys)) { - try { - final String uid = CU.uuid(); // Get meta UUID for this thread. + final String uid = CU.uuid(); // Get meta UUID for this thread. - assert keys != null; + assert keys != null; - if (keyCheck) - validateCacheKeys(keys); + if (keyCheck) + validateCacheKeys(keys); - for (K key : keys) { - if (key == null) - continue; + for (K key : keys) { + if (key == null) + continue; - // Skip primary or backup entries for near cache. - if (ctx.isNear() && ctx.affinity().localNode(key, topVer)) - continue; + // Skip primary or backup entries for near cache. + if (ctx.isNear() && ctx.affinity().localNode(key, topVer)) + continue; - while (true) { - try { - GridCacheEntryEx<K, V> entry = entryExSafe(key, topVer); + while (true) { + try { + GridCacheEntryEx<K, V> entry = entryExSafe(key, topVer); - if (entry == null) - break; + if (entry == null) + break; - // Get version before checking filer. - GridCacheVersion ver = entry.version(); + // Get version before checking filer. + GridCacheVersion ver = entry.version(); - if (ctx.isAll(entry, filter)) - // Tag entry with current version. - entry.addMeta(uid, ver); - else - ctx.evicts().touch(entry, topVer); + // Tag entry with current version. + entry.addMeta(uid, ver); - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry for reload (will retry): " + key); - } - catch (GridDhtInvalidPartitionException ignore) { - if (log.isDebugEnabled()) - log.debug("Got invalid partition for key (will skip): " + key); + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry for reload (will retry): " + key); + } + catch (GridDhtInvalidPartitionException ignore) { + if (log.isDebugEnabled()) + log.debug("Got invalid partition for key (will skip): " + key); - break; - } + break; } } + } - final Map<K, V> map = ret ? new HashMap<K, V>(keys.size(), 1.0f) : null; - - final Collection<? extends K> absentKeys = F.view(keys, CU.keyHasMeta(ctx, uid)); + final Map<K, V> map = ret ? new HashMap<K, V>(keys.size(), 1.0f) : null; - final Collection<K> loadedKeys = new GridConcurrentHashSet<>(); + final Collection<? extends K> absentKeys = F.view(keys, CU.keyHasMeta(ctx, uid)); - IgniteFuture<Object> readFut = - readThroughAllAsync(absentKeys, true, null, filter, subjId, taskName, new CI2<K, V>() { - /** Version for all loaded entries. */ - private GridCacheVersion nextVer = ctx.versions().next(); + final Collection<K> loadedKeys = new GridConcurrentHashSet<>(); - /** {@inheritDoc} */ - @Override public void apply(K key, V val) { - loadedKeys.add(key); + IgniteFuture<Object> readFut = + readThroughAllAsync(absentKeys, true, null, subjId, taskName, new CI2<K, V>() { + /** Version for all loaded entries. */ + private GridCacheVersion nextVer = ctx.versions().next(); - GridCacheEntryEx<K, V> entry = peekEx(key); + /** {@inheritDoc} */ + @Override public void apply(K key, V val) { + loadedKeys.add(key); - if (entry != null) { - try { - GridCacheVersion curVer = entry.removeMeta(uid); + GridCacheEntryEx<K, V> entry = peekEx(key); - // If entry passed the filter. - if (curVer != null) { - boolean wasNew = entry.isNewLocked(); + if (entry != null) { + try { + GridCacheVersion curVer = entry.removeMeta(uid); - entry.unswap(); + // If entry passed the filter. + if (curVer != null) { + boolean wasNew = entry.isNewLocked(); - boolean set = entry.versionedValue(val, curVer, nextVer); + entry.unswap(); - ctx.evicts().touch(entry, topVer); + boolean set = entry.versionedValue(val, curVer, nextVer); - if (map != null) { - if (set || wasNew) - map.put(key, val); - else { - try { - GridTuple<V> v = peek0(false, key, GLOBAL, filter); + ctx.evicts().touch(entry, topVer); - if (v != null) - map.put(key, val); - } - catch (GridCacheFilterFailedException ex) { - ex.printStackTrace(); + if (map != null) { + if (set || wasNew) + map.put(key, val); + else { + try { + GridTuple<V> v = peek0(false, key, GLOBAL); - assert false; - } + if (v != null) + map.put(key, val); } - } + catch (GridCacheFilterFailedException ex) { + ex.printStackTrace(); - if (log.isDebugEnabled()) { - log.debug("Set value loaded from store into entry [set=" + set + ", " + - "curVer=" + - curVer + ", newVer=" + nextVer + ", entry=" + entry + ']'); + assert false; + } } } - else { - if (log.isDebugEnabled()) { - log.debug("Current version was not found (either entry was removed or " + - "validation was not passed: " + entry); - } + + if (log.isDebugEnabled()) { + log.debug("Set value loaded from store into entry [set=" + set + ", " + + "curVer=" + + curVer + ", newVer=" + nextVer + ", entry=" + entry + ']'); } } - catch (GridCacheEntryRemovedException ignore) { + else { if (log.isDebugEnabled()) { - log.debug("Got removed entry for reload (will not store reloaded entry) " + - "[entry=" + entry + ']'); + log.debug("Current version was not found (either entry was removed or " + + "validation was not passed: " + entry); } } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) { + log.debug("Got removed entry for reload (will not store reloaded entry) " + + "[entry=" + entry + ']'); } } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } - }); + } + }); - return readFut.chain(new CX1<IgniteFuture<Object>, Map<K, V>>() { - @Override public Map<K, V> applyx(IgniteFuture<Object> e) throws IgniteCheckedException { - // Touch all not loaded keys. - for (K key : absentKeys) { - if (!loadedKeys.contains(key)) { - GridCacheEntryEx<K, V> entry = peekEx(key); + return readFut.chain(new CX1<IgniteFuture<Object>, Map<K, V>>() { + @Override public Map<K, V> applyx(IgniteFuture<Object> e) throws IgniteCheckedException { + // Touch all not loaded keys. + for (K key : absentKeys) { + if (!loadedKeys.contains(key)) { + GridCacheEntryEx<K, V> entry = peekEx(key); - if (entry != null) - ctx.evicts().touch(entry, topVer); - } + if (entry != null) + ctx.evicts().touch(entry, topVer); } + } - // Make sure there were no exceptions. - e.get(); + // Make sure there were no exceptions. + e.get(); - return map; - } - }); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx.kernalContext(), e); - } + return map; + } + }); } return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); @@ -1680,7 +1652,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Nullable @Override public V get(K key) throws IgniteCheckedException { - V val = get(key, true, null); + V val = get(key, true); if (ctx.config().getInterceptor() != null) val = (V)ctx.config().getInterceptor().onGet(key, val); @@ -1690,7 +1662,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteFuture<V> getAsync(final K key) { - IgniteFuture<V> fut = getAsync(key, true, null); + IgniteFuture<V> fut = getAsync(key, true); if (ctx.config().getInterceptor() != null) return fut.chain(new CX1<IgniteFuture<V>, V>() { @@ -1704,7 +1676,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { - Map<K, V> map = getAll(keys, true, null); + Map<K, V> map = getAll(keys, true); if (ctx.config().getInterceptor() != null) map = interceptGet(keys, map); @@ -1714,7 +1686,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys) { - IgniteFuture<Map<K, V>> fut = getAllAsync(keys, true, null); + IgniteFuture<Map<K, V>> fut = getAllAsync(keys, true); if (ctx.config().getInterceptor() != null) return fut.chain(new CX1<IgniteFuture<Map<K, V>>, Map<K, V>>() { @@ -1775,8 +1747,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, @Nullable GridCacheEntryEx<K, V> entry, @Nullable UUID subjId, String taskName, - boolean deserializePortable, - @Nullable IgnitePredicate<CacheEntry<K, V>>... filter + boolean deserializePortable ) { GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); @@ -1790,8 +1761,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, taskName, deserializePortable, forcePrimary, - accessExpiryPolicy(prj != null ? prj.expiry() : null), - filter); + accessExpiryPolicy(prj != null ? prj.expiry() : null)); } /** {@inheritDoc} */ @@ -1803,8 +1773,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final String taskName, final boolean deserializePortable, final boolean forcePrimary, - @Nullable IgniteCacheExpiryPolicy expiry, - @Nullable final IgnitePredicate<CacheEntry<K, V>>... filter + @Nullable IgniteCacheExpiryPolicy expiry ) { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); @@ -1871,7 +1840,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, subjId, null, taskName, - filter, expiry); GridCacheVersion ver = entry.version(); @@ -1961,22 +1929,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ", curVer=" + ver + ", newVer=" + nextVer + ", " + "entry=" + entry + ']'); - boolean touch = true; - // Don't put key-value pair into result map if value is null. - if (val != null) { - if (set || F.isEmptyOrNulls(filter)) - map.put(key, ctx.cloneOnFlag(val)); - else { - touch = false; - - // Try again, so we can return consistent values. - redos.add(key); - } - } - - if (touch && (tx0 == null || (!tx0.implicit() && - tx0.isolation() == READ_COMMITTED))) + if (val != null) + map.put(key, ctx.cloneOnFlag(val)); + + if (tx0 == null || (!tx0.implicit() && + tx0.isolation() == READ_COMMITTED)) ctx.evicts().touch(entry, topVer); break; @@ -2034,7 +1992,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (!redos.isEmpty()) // Future recursion. return getAllAsync(redos, forcePrimary, /*skip tx*/false, - /*entry*/null, subjId, taskName, deserializePortable, filter); + /*entry*/null, subjId, taskName, deserializePortable); // There were no misses. return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, @@ -2074,7 +2032,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { @Override public IgniteFuture<Map<K, V>> op(IgniteTxLocalAdapter<K, V> tx) { - return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, cached0, deserializePortable, filter)); + return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, cached0, deserializePortable)); } }); } @@ -2280,11 +2238,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) { @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override public EntryProcessor apply(K k) { - return entryProcessor; - } - }); + Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, + new C1<K, EntryProcessor<K, V, Object>>() { + @Override public EntryProcessor apply(K k) { + return entryProcessor; + } + }); IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut = tx.invokeAsync(ctx, invokeMap, args); @@ -2410,16 +2369,18 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut0 = (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)fut; - return fut0.chain(new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, Map<K, EntryProcessorResult<T>>>() { - @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut) - throws IgniteCheckedException { - GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get(); + return fut0.chain( + new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, Map<K, EntryProcessorResult<T>>>() { + @Override public Map<K, EntryProcessorResult<T>> applyx( + IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut) + throws IgniteCheckedException { + GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get(); - assert ret != null; + assert ret != null; - return ret.value() != null ? ret.value() : Collections.<K, EntryProcessorResult<T>>emptyMap(); - } - }); + return ret.value() != null ? ret.value() : Collections.<K, EntryProcessorResult<T>>emptyMap(); + } + }); } /** {@inheritDoc} */ @@ -3538,7 +3499,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final Collection<Map.Entry<K, V>> col = new ArrayList<>(ldr.perNodeBufferSize()); ctx.store().loadAllFromStore(null, keys, new CIX2<K, V>() { - @Override public void applyx(K key, V val) throws IgniteCheckedException { + @Override public void applyx(K key, V val) { if (ctx.portableEnabled()) { key = (K)ctx.marshalToPortable(key); val = (V)ctx.marshalToPortable(val); @@ -3681,7 +3642,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prj); try { - GridCacheAdapter.this.removex(item.getKey()); + removex(item.getKey()); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -4595,22 +4556,19 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * @param key Key. - * @param filter Filter to evaluate. * @return Cached value. * @throws IgniteCheckedException If failed. */ - @Nullable public V get(K key, boolean deserializePortable, @Nullable IgnitePredicate<CacheEntry<K, V>> filter) + @Nullable public V get(K key, boolean deserializePortable) throws IgniteCheckedException { - return getAllAsync(F.asList(key), deserializePortable, filter).get().get(key); + return getAllAsync(F.asList(key), deserializePortable).get().get(key); } /** * @param key Key. - * @param filter Filter to evaluate. * @return Read operation future. */ - public final IgniteFuture<V> getAsync(final K key, boolean deserializePortable, - @Nullable IgnitePredicate<CacheEntry<K, V>> filter) { + public final IgniteFuture<V> getAsync(final K key, boolean deserializePortable) { ctx.denyOnFlag(LOCAL); try { @@ -4620,37 +4578,34 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(ctx.kernalContext(), e); } - return getAllAsync(Collections.singletonList(key), deserializePortable, filter).chain(new CX1<IgniteFuture<Map<K, V>>, V>() { - @Override - public V applyx(IgniteFuture<Map<K, V>> e) throws IgniteCheckedException { - return e.get().get(key); - } - }); + return getAllAsync(Collections.singletonList(key), deserializePortable).chain( + new CX1<IgniteFuture<Map<K, V>>, V>() { + @Override + public V applyx(IgniteFuture<Map<K, V>> e) throws IgniteCheckedException { + return e.get().get(key); + } + }); } /** * @param keys Keys. - * @param filter Filter to evaluate. * @return Map of cached values. * @throws IgniteCheckedException If read failed. */ - public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializePortable, - IgnitePredicate<CacheEntry<K, V>> filter) throws IgniteCheckedException { + public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializePortable) throws IgniteCheckedException { ctx.denyOnFlag(LOCAL); checkJta(); - return getAllAsync(keys, deserializePortable, filter).get(); + return getAllAsync(keys, deserializePortable).get(); } /** * @param key Key. - * @param filter Filter to evaluate. * @return Reloaded value. * @throws IgniteCheckedException If failed. */ - @Nullable public V reload(K key, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) - throws IgniteCheckedException { + @Override @Nullable public V reload(K key) throws IgniteCheckedException { ctx.denyOnFlags(F.asList(LOCAL, READ)); A.notNull(key, "key"); @@ -4669,7 +4624,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (ctx.isNear() && ctx.affinity().localNode(key, topVer)) return null; - return ctx.cloneOnFlag(entryEx(key).innerReload(filter)); + return ctx.cloneOnFlag(entryEx(key).innerReload()); } catch (GridCacheEntryRemovedException ignored) { if (log.isDebugEnabled()) @@ -4680,70 +4635,26 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * @param keys Keys. - * @param filter Filter to evaluate. - * @throws IgniteCheckedException If failed. - */ - public void reloadAll(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { - reloadAll(keys, false, filter); - } - - /** - * @param keys Keys. - * @param filter Filter to evaluate. * @return Reload future. */ - public IgniteFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys) { UUID subjId = ctx.subjectIdPerCall(null); String taskName = ctx.kernalContext().job().currentTaskName(); - return reloadAllAsync(keys, false, subjId, taskName, filter); + return reloadAllAsync(keys, false, subjId, taskName); } /** * @param key Key. - * @param filter Filter to evaluate. * @return Reload future. */ - public IgniteFuture<V> reloadAsync(final K key, - @Nullable final IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteFuture<V> reloadAsync(final K key) { ctx.denyOnFlags(F.asList(LOCAL, READ)); return ctx.closures().callLocalSafe(ctx.projectSafe(new Callable<V>() { @Nullable @Override public V call() throws IgniteCheckedException { - return reload(key, filter); - } - }), true); - } - - /** - * @param filter Filter to evaluate. - * @throws IgniteCheckedException If reload failed. - */ - public final void reloadAll(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { - ctx.denyOnFlag(READ); - - Set<K> keys = keySet(); - - // Don't reload empty cache. - if (!keys.isEmpty()) - reloadAll(keys, filter); - } - - /** - * @param filter Filter to evaluate. - * @return Reload future. - */ - public IgniteFuture<?> reloadAllAsync(@Nullable final IgnitePredicate<CacheEntry<K, V>> filter) { - ctx.denyOnFlag(READ); - - return ctx.closures().callLocalSafe(ctx.projectSafe(new GPC() { - @Nullable @Override public Object call() throws IgniteCheckedException { - reloadAll(filter); - - return null; + return reload(key); } }), true); } @@ -4751,12 +4662,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * @param keys Keys. * @param deserializePortable Deserialize portable flag. - * @param filter Filter to evaluate. * @return Read future. */ public IgniteFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys, - boolean deserializePortable, - @Nullable IgnitePredicate<CacheEntry<K, V>> filter) { + boolean deserializePortable) { String taskName = ctx.kernalContext().job().currentTaskName(); if (ctx.portableEnabled() && !F.isEmpty(keys)) { @@ -4773,8 +4682,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, null, null, taskName, - deserializePortable, - filter); + deserializePortable); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 021167f..02aa601 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -282,7 +282,6 @@ public interface GridCacheEntryEx<K, V> { * @param subjId Subject ID initiated this read. * @param transformClo Transform closure to record event. * @param taskName Task name. - * @param filter Filter to check prior to getting the value. Note that filter check * together with getting the value is an atomic operation. * @param expiryPlc Expiry policy. * @return Cached value. @@ -301,7 +300,6 @@ public interface GridCacheEntryEx<K, V> { UUID subjId, Object transformClo, String taskName, - IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable IgniteCacheExpiryPolicy expiryPlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException; @@ -313,7 +311,7 @@ public interface GridCacheEntryEx<K, V> { * @throws IgniteCheckedException If reload failed. * @throws GridCacheEntryRemovedException If entry has been removed. */ - @Nullable public V innerReload(IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException, + @Nullable public V innerReload() throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5972041..9a2061e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -610,15 +610,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> * @param tx Transaction. * @param key Key. * @param reload flag. - * @param filter Filter. * @param subjId Subject ID. * @param taskName Task name. * @return Read value. * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable protected V readThrough(@Nullable IgniteTxEx<K, V> tx, K key, boolean reload, - IgnitePredicate<CacheEntry<K, V>>[] filter, UUID subjId, String taskName) throws IgniteCheckedException { + @Nullable protected V readThrough(@Nullable IgniteTxEx<K, V> tx, K key, boolean reload, UUID subjId, + String taskName) throws IgniteCheckedException { return cctx.store().loadFromStore(tx, key); } @@ -634,7 +633,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> UUID subjId, Object transformClo, String taskName, - IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable IgniteCacheExpiryPolicy expirePlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { cctx.denyOnFlag(LOCAL); @@ -643,14 +641,12 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> readSwap, readThrough, evt, - failFast, unmarshal, updateMetrics, tmp, subjId, transformClo, taskName, - filter, expirePlc); } @@ -660,16 +656,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> boolean readSwap, boolean readThrough, boolean evt, - boolean failFast, boolean unmarshal, boolean updateMetrics, boolean tmp, UUID subjId, Object transformClo, String taskName, - IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable IgniteCacheExpiryPolicy expiryPlc) - throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { + throws IgniteCheckedException, GridCacheEntryRemovedException { // Disable read-through if there is no store. if (readThrough && !cctx.readThrough()) readThrough = false; @@ -679,10 +673,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> V old; V ret = null; - if (!F.isEmptyOrNulls(filter) && !cctx.isAll( - (new GridCacheFilterEvaluationEntry<>(key, rawGetOrUnmarshal(true), this, true)), filter)) - return CU.<V>failed(failFast); - GridCacheVersion startVer; boolean expired = false; @@ -824,30 +814,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } - // Check before load. - if (!cctx.isAll(this, filter)) - return CU.<V>failed(failFast, ret); - - if (ret != null) { + if (ret != null) // If return value is consistent, then done. - if (F.isEmptyOrNulls(filter) || version().equals(startVer)) - return ret; - - // Try again (recursion). - return innerGet0(tx, - readSwap, - readThrough, - false, - failFast, - unmarshal, - updateMetrics, - tmp, - subjId, - transformClo, - taskName, - filter, - expiryPlc); - } + return ret; boolean loadedFromStore = false; @@ -864,20 +833,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } - ret = readThrough(tx0, key, false, filter, subjId, taskName); + ret = readThrough(tx0, key, false, subjId, taskName); loadedFromStore = true; } - boolean match = false; - synchronized (this) { long ttl = ttlExtras(); // If version matched, set value. if (startVer.equals(ver)) { - match = true; - if (ret != null) { // Detach value before index update. if (cctx.portableEnabled()) @@ -912,28 +877,12 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } - if (F.isEmptyOrNulls(filter) || match) - return ret; - - // Try again (recursion). - return innerGet0(tx, - readSwap, - readThrough, - false, - failFast, - unmarshal, - updateMetrics, - tmp, - subjId, - transformClo, - taskName, - filter, - expiryPlc); + return ret; } /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "TooBroadScope"}) - @Nullable @Override public final V innerReload(IgnitePredicate<CacheEntry<K, V>>[] filter) + @Nullable @Override public final V innerReload() throws IgniteCheckedException, GridCacheEntryRemovedException { cctx.denyOnFlag(READ); @@ -955,75 +904,65 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> String taskName = cctx.kernalContext().job().currentTaskName(); // Check before load. - if (cctx.isAll(this, filter)) { - V ret = readThrough(null, key, true, filter, cctx.localNodeId(), taskName); - - boolean touch = false; - - try { - synchronized (this) { - long ttl = ttlExtras(); - - // Generate new version. - GridCacheVersion nextVer = cctx.versions().nextForLoad(ver); + V ret = readThrough(null, key, true, cctx.localNodeId(), taskName); - // If entry was loaded during read step. - if (wasNew && !isNew()) - // Map size was updated on entry creation. - return ret; + boolean touch = false; - // If version matched, set value. - if (startVer.equals(ver)) { - releaseSwap(); + try { + synchronized (this) { + long ttl = ttlExtras(); - V old = rawGetOrUnmarshalUnlocked(false); + // Generate new version. + GridCacheVersion nextVer = cctx.versions().nextForLoad(ver); - long expTime = toExpireTime(ttl); + // If entry was loaded during read step. + if (wasNew && !isNew()) + // Map size was updated on entry creation. + return ret; - // Detach value before index update. - if (cctx.portableEnabled()) - ret = (V)cctx.kernalContext().portable().detachPortable(ret); + // If version matched, set value. + if (startVer.equals(ver)) { + releaseSwap(); - // Update indexes. - if (ret != null) { - updateIndex(ret, null, expTime, nextVer, old); + V old = rawGetOrUnmarshalUnlocked(false); - if (cctx.deferredDelete() && !isInternal() && !detached() && deletedUnlocked()) - deletedUnlocked(false); - } - else { - clearIndex(old); + long expTime = toExpireTime(ttl); - if (cctx.deferredDelete() && !isInternal() && !detached() && !deletedUnlocked()) - deletedUnlocked(true); - } + // Detach value before index update. + if (cctx.portableEnabled()) + ret = (V)cctx.kernalContext().portable().detachPortable(ret); - update(ret, null, expTime, ttl, nextVer); + // Update indexes. + if (ret != null) { + updateIndex(ret, null, expTime, nextVer, old); - touch = true; + if (cctx.deferredDelete() && !isInternal() && !detached() && deletedUnlocked()) + deletedUnlocked(false); + } + else { + clearIndex(old); - // If value was set - return, otherwise try again. - return ret; + if (cctx.deferredDelete() && !isInternal() && !detached() && !deletedUnlocked()) + deletedUnlocked(true); } - } - if (F.isEmptyOrNulls(filter)) { + update(ret, null, expTime, ttl, nextVer); + touch = true; + // If value was set - return, otherwise try again. return ret; } } - finally { - if (touch) - cctx.evicts().touch(this, cctx.affinity().affinityTopologyVersion()); - } - // Recursion. - return innerReload(filter); - } + touch = true; - // If filter didn't pass. - return null; + return ret; + } + finally { + if (touch) + cctx.evicts().touch(this, cctx.affinity().affinityTopologyVersion()); + } } /** @@ -1425,7 +1364,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheValueBytes oldBytes = valueBytesUnlocked(); if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { - old = readThrough(null, key, false, CU.<K, V>empty(), subjId, taskName); + old = readThrough(null, key, false, subjId, taskName); // Detach value before index update. if (cctx.portableEnabled()) @@ -1765,7 +1704,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheValueBytes oldBytes = valueBytesUnlocked(); if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { - old = readThrough(null, key, false, CU.<K, V>empty(), subjId, taskName); + old = readThrough(null, key, false, subjId, taskName); // Detach value before index update. if (cctx.portableEnabled()) @@ -1796,7 +1735,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> expiryPlc.ttlUpdated(key, getOrMarshalKeyBytes(), version(), - hasReaders() ? ((GridDhtCacheEntry) this).readers() : null); + hasReaders() ? ((GridDhtCacheEntry<K, V>)this).readers() : null); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index b1e564a..fa21788 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -634,37 +634,37 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public V reload(K key) throws IgniteCheckedException { - return cache.reload(key, entryFilter(false)); + return cache.reload(key); } /** {@inheritDoc} */ @Override public IgniteFuture<V> reloadAsync(K key) { - return cache.reloadAsync(key, entryFilter(false)); + return cache.reloadAsync(key); } /** {@inheritDoc} */ @Override public void reloadAll() throws IgniteCheckedException { - cache.reloadAll(entryFilter(false)); + cache.reloadAll(); } /** {@inheritDoc} */ @Override public IgniteFuture<?> reloadAllAsync() { - return cache.reloadAllAsync(entryFilter(false)); + return cache.reloadAllAsync(); } /** {@inheritDoc} */ @Override public void reloadAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { - cache.reloadAll(keys, entryFilter(false)); + cache.reloadAll(keys); } /** {@inheritDoc} */ @Override public IgniteFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys) { - return cache.reloadAllAsync(keys, entryFilter(false)); + return cache.reloadAllAsync(keys); } /** {@inheritDoc} */ @Override public V get(K key) throws IgniteCheckedException { - return cache.get(key, deserializePortables(), entryFilter(false)); + return cache.get(key, deserializePortables()); } /** {@inheritDoc} */ @@ -675,7 +675,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteFuture<V> getAsync(K key) { - return cache.getAsync(key, deserializePortables(), entryFilter(false)); + return cache.getAsync(key, deserializePortables()); } /** {@inheritDoc} */ @@ -725,12 +725,12 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { - return cache.getAll(keys, deserializePortables(), entryFilter(false)); + return cache.getAll(keys, deserializePortables()); } /** {@inheritDoc} */ @Override public IgniteFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends K> keys) { - return cache.getAllAsync(keys, deserializePortables(), entryFilter(false)); + return cache.getAllAsync(keys, deserializePortables()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 9d07c52..5e4effa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -577,10 +577,10 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> nearCached.updateOrEvict(xidVer, null, null, 0, 0, nodeId); } else if (op == RELOAD) { - V reloaded = cached.innerReload(CU.<K, V>empty()); + V reloaded = cached.innerReload(); if (nearCached != null) { - nearCached.innerReload(CU.<K, V>empty()); + nearCached.innerReload(); nearCached.updateOrEvict(cached.version(), reloaded, null, cached.expireTime(), cached.ttl(), nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 6418e68..876ce25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -477,13 +477,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * This method is used internally. Use - * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, long, UUID, int, boolean, IgnitePredicate[], IgniteCacheExpiryPolicy)} + * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, long, UUID, int, boolean, IgniteCacheExpiryPolicy)} * method instead to retrieve DHT value. * * @param keys {@inheritDoc} * @param forcePrimary {@inheritDoc} * @param skipTx {@inheritDoc} - * @param filter {@inheritDoc} * @return {@inheritDoc} */ @Override public IgniteFuture<Map<K, V>> getAllAsync( @@ -493,8 +492,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable GridCacheEntryEx<K, V> entry, @Nullable UUID subjId, String taskName, - boolean deserializePortable, - @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter + boolean deserializePortable ) { return getAllAsync(keys, true, @@ -504,15 +502,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap taskName, deserializePortable, forcePrimary, - null, - filter); + null); } /** {@inheritDoc} */ - @Override public V reload(K key, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) + @Override public V reload(K key) throws IgniteCheckedException { try { - return super.reload(key, filter); + return super.reload(key); } catch (GridDhtInvalidPartitionException ignored) { return null; @@ -525,7 +522,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param subjId Subject ID. * @param taskName Task name. * @param deserializePortable Deserialize portable flag. - * @param filter Optional filter. * @param expiry Expiry policy. * @return Get future. */ @@ -534,7 +530,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable UUID subjId, String taskName, boolean deserializePortable, - @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable IgniteCacheExpiryPolicy expiry ) { return getAllAsync(keys, @@ -545,8 +540,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap taskName, deserializePortable, false, - expiry, - filter); + expiry); } /** @@ -559,7 +553,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param deserializePortable Deserialize portable flag. - * @param filter Optional filter. * @param expiry Expiry policy. * @return DHT future. */ @@ -572,7 +565,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable UUID subjId, int taskNameHash, boolean deserializePortable, - IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable IgniteCacheExpiryPolicy expiry) { GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, msgId, @@ -582,7 +574,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap reload, /*tx*/null, topVer, - filter, subjId, taskNameHash, deserializePortable, @@ -614,7 +605,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap req.subjectId(), req.taskNameHash(), false, - req.filter(), expiryPlc); fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 170f85d..1673412 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; @@ -77,9 +76,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /** Transaction. */ private IgniteTxLocalEx<K, V> tx; - /** Filters. */ - private IgnitePredicate<CacheEntry<K, V>>[] filters; - /** Logger. */ private IgniteLogger log; @@ -114,7 +110,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @param reload Reload flag. * @param tx Transaction. * @param topVer Topology version. - * @param filters Filters. * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param deserializePortable Deserialize portable flag. @@ -129,7 +124,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col boolean reload, @Nullable IgniteTxLocalEx<K, V> tx, long topVer, - @Nullable IgnitePredicate<CacheEntry<K, V>>[] filters, @Nullable UUID subjId, int taskNameHash, boolean deserializePortable, @@ -145,7 +139,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col this.keys = keys; this.readThrough = readThrough; this.reload = reload; - this.filters = filters; this.tx = tx; this.topVer = topVer; this.subjId = subjId; @@ -353,8 +346,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col fut = cache().reloadAllAsync(keys.keySet(), true, subjId, - taskName, - filters); + taskName); } else { if (tx == null) { @@ -363,15 +355,13 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col subjId, taskName, deserializePortable, - filters, expiryPlc); } else { fut = tx.getAllAsync(cctx, keys.keySet(), null, - deserializePortable, - filters); + deserializePortable); } } } @@ -390,8 +380,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col return cache().reloadAllAsync(keys.keySet(), true, subjId, - taskName, - filters); + taskName); } else { if (tx == null) { @@ -400,15 +389,13 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col subjId, taskName, deserializePortable, - filters, expiryPlc); } else { return tx.getAllAsync(cctx, keys.keySet(), null, - deserializePortable, - filters); + deserializePortable); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index f6e69eb..7d16b92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -1008,7 +1008,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach CU.subjectId(tx, ctx.shared()), null, tx != null ? tx.resolveTaskName() : null, - CU.<K, V>empty(), null); assert e.lockedBy(mappedVer) || http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index b16d913..80684f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; @@ -78,9 +77,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** Version. */ private GridCacheVersion ver; - /** Filters. */ - private IgnitePredicate<CacheEntry<K, V>>[] filters; - /** Logger. */ private IgniteLogger log; @@ -117,7 +113,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param reload Reload flag. * @param forcePrimary If {@code true} then will force network trip to primary node even * if called on backup node. - * @param filters Filters. * @param subjId Subject ID. * @param taskName Task name. * @param deserializePortable Deserialize portable flag. @@ -130,7 +125,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M boolean readThrough, boolean reload, boolean forcePrimary, - @Nullable IgnitePredicate<CacheEntry<K, V>>[] filters, @Nullable UUID subjId, String taskName, boolean deserializePortable, @@ -146,7 +140,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M this.readThrough = readThrough; this.reload = reload; this.forcePrimary = forcePrimary; - this.filters = filters; this.subjId = subjId; this.deserializePortable = deserializePortable; this.taskName = taskName; @@ -325,7 +318,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M subjId, taskName == null ? 0 : taskName.hashCode(), deserializePortable, - filters, expiryPlc); final Collection<Integer> invalidParts = fut.invalidPartitions(); @@ -376,7 +368,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M readThrough, reload, topVer, - filters, subjId, taskName == null ? 0 : taskName.hashCode(), expiryPlc != null ? expiryPlc.forAccess() : -1L); @@ -439,7 +430,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M subjId, null, taskName, - filters, expiryPlc); colocated.context().evicts().touch(entry, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 683b7b9..882dd2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -268,8 +268,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final GridCacheEntryEx<K, V> entry, @Nullable UUID subjId, final String taskName, - final boolean deserializePortable, - @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter + final boolean deserializePortable ) { GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); @@ -284,7 +283,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return getAllAsync0(keys, false, forcePrimary, - filter, subjId0, taskName, deserializePortable, @@ -688,6 +686,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { @@ -838,7 +837,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param keys Keys to remove. * @param reload Reload flag. * @param forcePrimary Force primary flag. - * @param filter Filter. * @param subjId Subject ID. * @param taskName Task name. * @param deserializePortable Deserialize portable flag. @@ -848,7 +846,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, boolean reload, boolean forcePrimary, - @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, UUID subjId, String taskName, boolean deserializePortable, @@ -894,7 +891,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { subjId, null, taskName, - filter, expiry); // Entry was not in memory or in swap, so we remove it from cache. @@ -962,7 +958,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, reload, forcePrimary, - filter, subjId, taskName, deserializePortable, @@ -1071,8 +1066,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtFut = createDhtFuture(ver, req, res, completionCb, false); - GridCacheReturn<Object> retVal = null; - boolean replicate = ctx.isDrEnabled(); ExpiryPolicy plc = req.expiry() != null ? req.expiry() : ctx.expiry(); @@ -1080,6 +1073,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (plc != null) expiry = new UpdateExpiryPolicy(plc); + GridCacheReturn<Object> retVal = null; + if (writeThrough() && keys.size() > 1 && !ctx.dr().receiveEnabled()) { // This method can only be used when there are no replicated entries in the batch. UpdateBatchResult<K, V> updRes = updateWithBatch(node, @@ -1291,7 +1286,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), entryProcessor, taskName, - CU.<K, V>empty(), null); CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old); @@ -1422,7 +1416,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), null, taskName, - CU.<K, V>empty(), null); updated = (V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated); @@ -1456,7 +1449,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), null, taskName, - CU.<K, V>empty(), null); IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 5873a3f..e09686a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -158,8 +158,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable final GridCacheEntryEx<K, V> entry, @Nullable UUID subjId, String taskName, - final boolean deserializePortable, - @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter + final boolean deserializePortable ) { ctx.denyOnFlag(LOCAL); ctx.checkSecurity(GridSecurityPermission.CACHE_READ); @@ -172,7 +171,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { @Override public IgniteFuture<Map<K, V>> op(IgniteTxLocalAdapter<K, V> tx) { - return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, entry, deserializePortable, filter)); + return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, entry, deserializePortable)); } }); } @@ -191,7 +190,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte subjId, taskName, deserializePortable, - filter, accessExpiryPolicy(prj != null ? prj.expiry() : null)); } @@ -233,7 +231,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param subjId Subject ID. * @param taskName Task name. * @param deserializePortable Deserialize portable flag. - * @param filter Filter. * @param expiryPlc Expiry policy. * @return Loaded values. */ @@ -245,7 +242,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable UUID subjId, String taskName, boolean deserializePortable, - @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable IgniteCacheExpiryPolicy expiryPlc) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); @@ -285,7 +281,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte subjId, null, taskName, - filter, expiryPlc); // Entry was not in memory or in swap, so we remove it from cache. @@ -351,7 +346,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte readThrough, reload, forcePrimary, - filter, subjId, taskName, deserializePortable, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index d25225f..19686a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -368,8 +368,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @Nullable GridCacheEntryEx<K, V> entry, @Nullable UUID subjId, String taskName, - boolean deserializePortable, - @Nullable IgnitePredicate<CacheEntry<K, V>>... filter + boolean deserializePortable ) { ctx.denyOnFlag(LOCAL); ctx.checkSecurity(GridSecurityPermission.CACHE_READ); @@ -385,7 +384,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { keys, false, forcePrimary, - filter, subjId, taskName, deserializePortable, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 689f965..f332c12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -172,13 +172,12 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "RedundantCast"}) @Override public IgniteFuture<Object> readThroughAllAsync(Collection<? extends K> keys, boolean reload, - IgniteTxEx<K, V> tx, IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable UUID subjId, String taskName, + IgniteTxEx<K, V> tx, @Nullable UUID subjId, String taskName, IgniteBiInClosure<K, V> vis) { return (IgniteFuture)loadAsync(tx, keys, reload, false, - filter, subjId, taskName, true, @@ -186,21 +185,19 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public void reloadAll(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException { - dht().reloadAll(keys, filter); + @Override public void reloadAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { + dht().reloadAll(keys); - super.reloadAll(keys, filter); + super.reloadAll(keys); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) { + @Override public IgniteFuture<?> reloadAllAsync(@Nullable Collection<? extends K> keys) { GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext()); - fut.add(super.reloadAllAsync(keys, filter)); - fut.add(dht().reloadAllAsync(keys, filter)); + fut.add(super.reloadAllAsync(keys)); + fut.add(dht().reloadAllAsync(keys)); fut.markInitialized(); @@ -209,18 +206,18 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public V reload(K key, @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) + @Override public V reload(K key) throws IgniteCheckedException { V val; try { - val = dht().reload(key, filter); + val = dht().reload(key); } catch (GridDhtInvalidPartitionException ignored) { return null; } - V nearVal = super.reload(key, filter); + V nearVal = super.reload(key); return val == null ? nearVal : val; } @@ -245,25 +242,11 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda return fut; } - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public IgniteFuture<?> reloadAllAsync(@Nullable IgnitePredicate<CacheEntry<K, V>> filter) { - GridCompoundFuture fut = new GridCompoundFuture(ctx.kernalContext()); - - fut.add(super.reloadAllAsync()); - fut.add(dht().reloadAllAsync(filter)); - - fut.markInitialized(); - - return fut; - } - /** * @param tx Transaction. * @param keys Keys to load. * @param reload Reload flag. * @param forcePrimary Force primary flag. - * @param filter Filter. * @param subjId Subject ID. * @param taskName Task name. * @param deserializePortable Deserialize portable flag. @@ -274,7 +257,6 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda @Nullable Collection<? extends K> keys, boolean reload, boolean forcePrimary, - @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter, @Nullable UUID subjId, String taskName, boolean deserializePortable, @@ -295,7 +277,6 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda reload, forcePrimary, txx, - filter, subjId, taskName, deserializePortable, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a03fee32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 8628028..4d4281f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -22,7 +22,6 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; -import org.apache.ignite.lang.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.lang.*; @@ -315,12 +314,11 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { /** {@inheritDoc} */ @Override protected V readThrough(IgniteTxEx<K, V> tx, K key, boolean reload, - IgnitePredicate<CacheEntry<K, V>>[] filter, UUID subjId, String taskName) throws IgniteCheckedException { + UUID subjId, String taskName) throws IgniteCheckedException { return cctx.near().loadAsync(tx, F.asList(key), reload, /*force primary*/false, - filter, subjId, taskName, true,