http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index bcf0f0c..eb31100 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -147,11 +147,11 @@ public class GridCacheContext<K, V> implements Externalizable { /** Grid cache. */ private GridCacheAdapter<K, V> cache; - /** No-value filter array. */ - private IgnitePredicate<Cache.Entry<K, V>>[] noValArr; + /** No value filter array. */ + private CacheEntryPredicate[] noValArr0; - /** Has-value filter array. */ - private IgnitePredicate<Cache.Entry<K, V>>[] hasValArr; + /** Has value filter array. */ + private CacheEntryPredicate[] hasValArr0; /** No-peek-value filter array. */ private IgnitePredicate<Cache.Entry<Object, Object>>[] noPeekArr; @@ -159,9 +159,6 @@ public class GridCacheContext<K, V> implements Externalizable { /** Has-peek-value filter array. */ private IgnitePredicate<Cache.Entry<K, V>>[] hasPeekArr; - /** No-op filter array. */ - private IgnitePredicate<Cache.Entry<K, V>>[] trueArr; - /** Cached local rich node. */ private ClusterNode locNode; @@ -290,11 +287,11 @@ public class GridCacheContext<K, V> implements Externalizable { log = ctx.log(getClass()); - noValArr = new IgnitePredicate[]{F.cacheNoGetValue()}; - hasValArr = new IgnitePredicate[]{F.cacheHasGetValue()}; noPeekArr = new IgnitePredicate[]{F.cacheNoPeekValue()}; hasPeekArr = new IgnitePredicate[]{F.cacheHasPeekValue()}; - trueArr = new IgnitePredicate[]{F.alwaysTrue()}; + + noValArr0 = new CacheEntryPredicate[]{new CacheEntryPredicateNoValue()}; + hasValArr0 = new CacheEntryPredicate[]{new CacheEntryPredicateHasValue()}; cacheObjCtx = new CacheObjectContext(ctx); @@ -964,33 +961,34 @@ public class GridCacheContext<K, V> implements Externalizable { public CacheJtaManagerAdapter<K, V> jta() { return jtaMgr; } + /** * @return No get-value filter. */ - public IgnitePredicate<Cache.Entry<K, V>>[] noGetArray() { - return noValArr; + @SuppressWarnings("unchecked") + public <K, V> IgnitePredicate<Cache.Entry<K, V>>[] noPeekArray() { + return (IgnitePredicate<Cache.Entry<K, V>>[])((IgnitePredicate[])noPeekArr); } /** * @return Has get-value filer. */ - public IgnitePredicate<Cache.Entry<K, V>>[] hasGetArray() { - return hasValArr; + public IgnitePredicate<Cache.Entry<K, V>>[] hasPeekArray() { + return hasPeekArr; } /** - * @return No get-value filter. + * @return No value filter. */ - @SuppressWarnings("unchecked") - public <K, V> IgnitePredicate<Cache.Entry<K, V>>[] noPeekArray() { - return (IgnitePredicate<Cache.Entry<K, V>>[])((IgnitePredicate[])noPeekArr); + public CacheEntryPredicate[] noValArray() { + return noValArr0; } /** - * @return Has get-value filer. + * @return Has value filter. */ - public IgnitePredicate<Cache.Entry<K, V>>[] hasPeekArray() { - return hasPeekArr; + public CacheEntryPredicate[] hasValArray() { + return noValArr0; } /** @@ -1005,17 +1003,11 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return Empty filter. - */ - public IgnitePredicate<Cache.Entry<K, V>> truex() { - return F.alwaysTrue(); - } - - /** - * @return No-op array. + * @param val Value to check. + * @return Predicate array that checks for value. */ - public IgnitePredicate<Cache.Entry<K, V>>[] trueArray() { - return trueArr; + public CacheEntryPredicate[] equalsValArray(V val) { + return new CacheEntryPredicate[]{new CacheEntryPredicateContainsValue(toCacheObject(val))}; } /** @@ -1098,6 +1090,29 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @param e Entry. + * @param p Predicates. + * @return {@code True} if predicates passed. + * @throws IgniteCheckedException If failed. + */ + public boolean isAll(GridCacheEntryEx e, CacheEntryPredicate[] p) throws IgniteCheckedException { + if (p == null || p.length == 0) + return true; + + try { + for (CacheEntryPredicate p0 : p) { + if (p0 != null && !p0.apply(e)) + return false; + } + } + catch (RuntimeException ex) { + throw U.cast(ex); + } + + return true; + } + + /** * Forces LOCAL flag. * * @return Previously forced flags.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java index f04ef90..b365b45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java @@ -240,61 +240,16 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap private void onUndeploy0(final ClassLoader ldr, final GridCacheContext<K, V> cacheCtx) { GridCacheAdapter<K, V> cache = cacheCtx.cache(); - Set<K> keySet = cache.keySet(cacheCtx.vararg( - new P1<Cache.Entry<K, V>>() { - @Override public boolean apply(Cache.Entry<K, V> e) { - return cacheCtx.isNear() ? undeploy(e, cacheCtx.near()) || undeploy(e, cacheCtx.near().dht()) : - undeploy(e, cacheCtx.cache()); - } - - /** - * @param e Entry. - * @param cache Cache. - * @return {@code True} if entry should be undeployed. - */ - private boolean undeploy(Cache.Entry<K, V> e, GridCacheAdapter<K, V> cache) { - // TODO IGNITE-51. - K k = e.getKey(); - - GridCacheEntryEx entry = cache.peekEx(cacheCtx.toCacheKeyObject(e.getKey())); - - if (entry == null) - return false; - - CacheObject v; - - try { - v = entry.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()); - } - catch (GridCacheEntryRemovedException ignore) { - return false; - } - catch (IgniteException ignore) { - // Peek can throw runtime exception if unmarshalling failed. - return true; - } + Collection<KeyCacheObject> keys = new ArrayList<>(); - assert k != null : "Key cannot be null for cache entry: " + e; + for (GridCacheEntryEx e : cache.entries()) { + boolean undeploy = cacheCtx.isNear() ? + undeploy(ldr, e, cacheCtx.near()) || undeploy(ldr, e, cacheCtx.near().dht()) : + undeploy(ldr, e, cacheCtx.cache()); - ClassLoader keyLdr = U.detectObjectClassLoader(k); - ClassLoader valLdr = U.detectObjectClassLoader(v); - - boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr); - - if (log.isDebugEnabled()) - log.debug("Finished examining entry [entryCls=" + e.getClass() + - ", key=" + k + ", keyCls=" + k.getClass() + - ", valCls=" + (v != null ? v.getClass() : "null") + - ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']'); - - return res; - } - })); - - Collection<K> keys = new ArrayList<>(); - - for (K k : keySet) - keys.add(k); + if (undeploy) + keys.add(e.key()); + } if (log.isDebugEnabled()) log.debug("Finished searching keys for undeploy [keysCnt=" + keys.size() + ']'); @@ -333,6 +288,52 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param ldr Class loader. + * @param e Entry. + * @param cache Cache. + * @return {@code True} if need to undeploy. + */ + private boolean undeploy(ClassLoader ldr, GridCacheEntryEx e, GridCacheAdapter cache) { + KeyCacheObject key = e.key(); + + GridCacheEntryEx entry = cache.peekEx(key); + + if (entry == null) + return false; + + CacheObject v; + + try { + v = entry.peek(GridCachePeekMode.GLOBAL, CU.empty0()); + } + catch (GridCacheEntryRemovedException ignore) { + return false; + } + catch (IgniteException ignore) { + // Peek can throw runtime exception if unmarshalling failed. + return true; + } + + assert key != null : "Key cannot be null for cache entry: " + e; + + Object key0 = key.value(cache.context(), false); + Object val0 = CU.value(v, cache.context(), false); + + ClassLoader keyLdr = U.detectObjectClassLoader(key0); + ClassLoader valLdr = U.detectObjectClassLoader(val0); + + boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr); + + if (log.isDebugEnabled()) + log.debug("Finished examining entry [entryCls=" + e.getClass() + + ", key=" + key0 + ", keyCls=" + key0.getClass() + + ", valCls=" + (val0 != null ? val0.getClass() : "null") + + ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']'); + + return res; + } + + /** * @param sndId Sender node ID. * @param ldrId Loader ID. * @param userVer User version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 5196965..5fb1346 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -212,7 +212,7 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException If swap could not be released. * @throws GridCacheEntryRemovedException If entry was removed. */ - public <K, V> boolean invalidate(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + public <K, V> boolean invalidate(@Nullable CacheEntryPredicate[] filter) throws GridCacheEntryRemovedException, IgniteCheckedException; /** @@ -223,7 +223,7 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException If operation failed. * @return {@code true} if entry was not being used and could be removed. */ - public <K, V> boolean compact(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + public <K, V> boolean compact(@Nullable CacheEntryPredicate[] filter) throws GridCacheEntryRemovedException, IgniteCheckedException; /** @@ -234,7 +234,7 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException In case of error. */ public <K, V> boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException; /** * Evicts entry when batch evict is performed. When called, does not write entry data to swap, but instead @@ -360,7 +360,7 @@ public interface GridCacheEntryEx { boolean evt, boolean metrics, long topVer, - IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + CacheEntryPredicate[] filter, GridDrType drType, long drExpireTime, @Nullable GridCacheVersion explicitVer, @@ -396,7 +396,7 @@ public interface GridCacheEntryEx { boolean evt, boolean metrics, long topVer, - IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + CacheEntryPredicate[] filter, GridDrType drType, @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, @@ -449,7 +449,7 @@ public interface GridCacheEntryEx { boolean metrics, boolean primary, boolean checkVer, - @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + @Nullable CacheEntryPredicate[] filter, GridDrType drType, long conflictTtl, long conflictExpireTime, @@ -490,7 +490,7 @@ public interface GridCacheEntryEx { @Nullable ExpiryPolicy expiryPlc, boolean evt, boolean metrics, - @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + @Nullable CacheEntryPredicate[] filter, boolean intercept, @Nullable UUID subjId, String taskName @@ -507,7 +507,7 @@ public interface GridCacheEntryEx { * @return {@code True} if entry was not being used, passed the filter and could be removed. */ public <K, V> boolean clear(GridCacheVersion ver, boolean readers, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException; /** * This locks is called by transaction manager during prepare step @@ -578,7 +578,7 @@ public interface GridCacheEntryEx { * @return Value. * @throws GridCacheEntryRemovedException If entry has been removed. */ - @Nullable public <K, V> CacheObject peek(GridCachePeekMode mode, IgnitePredicate<Cache.Entry<K, V>>... filter) + @Nullable public <K, V> CacheObject peek(GridCachePeekMode mode, CacheEntryPredicate... filter) throws GridCacheEntryRemovedException; /** @@ -609,7 +609,7 @@ public interface GridCacheEntryEx { * @throws GridCacheEntryRemovedException If entry has been removed. */ @Nullable public <K, V> CacheObject peek(Collection<GridCachePeekMode> modes, - IgnitePredicate<Cache.Entry<K, V>>... filter) throws GridCacheEntryRemovedException; + CacheEntryPredicate... filter) throws GridCacheEntryRemovedException; /** * @param failFast Fail-fast flag. @@ -624,7 +624,7 @@ public interface GridCacheEntryEx { @SuppressWarnings({"RedundantTypeArguments"}) @Nullable public <K, V> GridTuple<CacheObject> peek0(boolean failFast, GridCachePeekMode mode, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter, + @Nullable CacheEntryPredicate[] filter, @Nullable IgniteInternalTx tx) throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index e34cb66..5a40751 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -654,7 +654,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V GridCacheAdapter<K, V> cache, GridCacheEntryEx entry, GridCacheVersion obsoleteVer, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter, + @Nullable CacheEntryPredicate[] filter, boolean explicit ) throws IgniteCheckedException { assert cache != null; @@ -855,7 +855,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V * @throws IgniteCheckedException In case of error. */ public boolean evict(@Nullable GridCacheEntryEx entry, @Nullable GridCacheVersion obsoleteVer, - boolean explicit, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + boolean explicit, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { if (entry == null) return true; @@ -995,7 +995,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V * @param filter Filter. * @throws GridCacheEntryRemovedException If entry got removed. */ - private void enqueue(GridCacheEntryEx entry, IgnitePredicate<Cache.Entry<K, V>>[] filter) + private void enqueue(GridCacheEntryEx entry, CacheEntryPredicate[] filter) throws GridCacheEntryRemovedException { Node<EvictionInfo> node = entry.meta(meta); @@ -1240,16 +1240,21 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V * @param info Eviction info. * @return Version aware filter. */ - private IgnitePredicate<Cache.Entry<K, V>>[] versionFilter(final EvictionInfo info) { + private CacheEntryPredicate[] versionFilter(final EvictionInfo info) { // If version has changed since we started the whole process // then we should not evict entry. - return cctx.vararg(new P1<Cache.Entry<K, V>>() { - @Override public boolean apply(Cache.Entry<K, V> e) { - GridCacheVersion ver = (GridCacheVersion)((CacheVersionedEntryImpl)e).version(); + return new CacheEntryPredicate[]{new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + try { + GridCacheVersion ver = e.version(); - return info.version().equals(ver) && F.isAll(info.filter()); + return info.version().equals(ver) && F.isAll(info.filter()); + } + catch (GridCacheEntryRemovedException err) { + return false; + } } - }); + }}; } /** @@ -1465,7 +1470,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V private GridCacheVersion ver; /** Filter to pass before entry will be evicted. */ - private IgnitePredicate<Cache.Entry<K, V>>[] filter; + private CacheEntryPredicate[] filter; /** * @param entry Entry. @@ -1473,7 +1478,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V * @param filter Filter. */ EvictionInfo(GridCacheEntryEx entry, GridCacheVersion ver, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { assert entry != null; assert ver != null; @@ -1499,7 +1504,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V /** * @return Filter. */ - IgnitePredicate<Cache.Entry<K, V>>[] filter() { + CacheEntryPredicate[] filter() { return filter; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java index df1bc8e..ef6e2cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java @@ -69,9 +69,7 @@ public class GridCacheKeySet<K, V> extends GridSerializableSet<K> { /** {@inheritDoc} */ @Override public void clear() { - ctx.cache().clearLocally0(F.viewReadOnly(map.values(), F.<K, V>cacheEntry2Key(), filter), CU.<K, V>empty()); - - map.clear(); + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 6dd4cae..518df69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -983,7 +983,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { boolean evt, boolean metrics, long topVer, - IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + CacheEntryPredicate[] filter, GridDrType drType, long drExpireTime, @Nullable GridCacheVersion explicitVer, @@ -1133,7 +1133,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { boolean evt, boolean metrics, long topVer, - IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + CacheEntryPredicate[] filter, GridDrType drType, @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, @@ -1324,7 +1324,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { @Nullable ExpiryPolicy expiryPlc, boolean evt, boolean metrics, - @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + @Nullable CacheEntryPredicate[] filter, boolean intercept, @Nullable UUID subjId, String taskName @@ -1391,16 +1391,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Apply metrics. if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { // PutIfAbsent methods mustn't update hit/miss statistics - if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray()) + if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noValArray()) cctx.cache().metrics0().onRead(old != null); } // Check filter inside of synchronization. if (!F.isEmpty(filter)) { - boolean pass = cctx.isAll(wrapFilterLocked(), filter); + boolean pass = cctx.isAll(this, filter); if (!pass) { - if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked()) + if (expiryPlc != null && !readThrough && filter != cctx.noValArray() && hasValueUnlocked()) updateTtl(expiryPlc); return new T3<>(false, retval ? CU.value(old, cctx, false) : null, null); @@ -1617,7 +1617,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { boolean metrics, boolean primary, boolean verCheck, - @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + @Nullable CacheEntryPredicate[] filter, GridDrType drType, long explicitTtl, long explicitExpireTime, @@ -1834,16 +1834,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Apply metrics. if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { // PutIfAbsent methods mustn't update hit/miss statistics - if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray()) + if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noValArray()) cctx.cache().metrics0().onRead(oldVal != null); } // Check filter inside of synchronization. if (!F.isEmptyOrNulls(filter)) { - boolean pass = cctx.isAll(wrapFilterLocked(), filter); + boolean pass = cctx.isAll(this, filter); if (!pass) { - if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noPeekArray()) + if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noValArray()) updateTtl(expiryPlc); return new GridCacheUpdateAtomicResult(false, @@ -2314,7 +2314,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** {@inheritDoc} */ @Override public <K, V> boolean clear(GridCacheVersion ver, boolean readers, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { cctx.denyOnFlag(READ); boolean ret; @@ -2559,7 +2559,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } /** {@inheritDoc} */ - @Override public <K, V> boolean invalidate(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Override public <K, V> boolean invalidate(@Nullable CacheEntryPredicate[] filter) throws GridCacheEntryRemovedException, IgniteCheckedException { if (F.isEmptyOrNulls(filter)) { synchronized (this) { @@ -2599,7 +2599,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } /** {@inheritDoc} */ - @Override public <K, V> boolean compact(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Override public <K, V> boolean compact(@Nullable CacheEntryPredicate[] filter) throws GridCacheEntryRemovedException, IgniteCheckedException { // For optimistic checking. GridCacheVersion startVer; @@ -2799,7 +2799,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** {@inheritDoc} */ @Nullable @Override public <K, V> CacheObject peek(GridCachePeekMode mode, - IgnitePredicate<Cache.Entry<K, V>>[] filter) + CacheEntryPredicate[] filter) throws GridCacheEntryRemovedException { try { GridTuple<CacheObject> peek = peek0(false, mode, filter, cctx.tm().localTxx()); @@ -2851,7 +2851,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** {@inheritDoc} */ @Override public <K, V> CacheObject peek(Collection<GridCachePeekMode> modes, - IgnitePredicate<Cache.Entry<K, V>>[] filter) + CacheEntryPredicate[] filter) throws GridCacheEntryRemovedException { assert modes != null; @@ -2887,7 +2887,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { */ @SuppressWarnings({"RedundantTypeArguments"}) @Nullable @Override public <K, V> GridTuple<CacheObject> peek0(boolean failFast, GridCachePeekMode mode, - IgnitePredicate<Cache.Entry<K, V>>[] filter, @Nullable IgniteInternalTx tx) + CacheEntryPredicate[] filter, @Nullable IgniteInternalTx tx) throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException { assert tx == null || tx.local(); @@ -2999,7 +2999,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * @throws IgniteCheckedException If unexpected cache failure occurred. */ @Nullable private <K, V> GridTuple<CacheObject> peekTxThenGlobal(boolean failFast, - IgnitePredicate<Cache.Entry<K, V>>[] filter, + CacheEntryPredicate[] filter, IgniteInternalTx tx) throws GridCacheFilterFailedException, GridCacheEntryRemovedException, IgniteCheckedException { @@ -3022,7 +3022,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * @throws GridCacheFilterFailedException If filter failed. */ @Nullable private <K, V> GridTuple<CacheObject> peekTx(boolean failFast, - IgnitePredicate<Cache.Entry<K, V>>[] filter, + CacheEntryPredicate[] filter, @Nullable IgniteInternalTx tx) throws GridCacheFilterFailedException { return tx == null ? null : tx.peek(cctx, failFast, key, filter); } @@ -3040,7 +3040,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { @SuppressWarnings({"RedundantTypeArguments"}) @Nullable private <K, V> GridTuple<CacheObject> peekGlobal(boolean failFast, long topVer, - IgnitePredicate<Cache.Entry<K, V>>[] filter, + CacheEntryPredicate[] filter, @Nullable IgniteCacheExpiryPolicy expiryPlc ) throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException { @@ -3070,7 +3070,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { updateTtl(expiryPlc); } - if (!cctx.isAll(this.<K, V>wrap(), filter)) + if (!cctx.isAll(this, filter)) return F.t(CU.<CacheObject>failed(failFast)); if (F.isEmptyOrNulls(filter) || ver.equals(version())) @@ -3095,11 +3095,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { */ @SuppressWarnings({"unchecked"}) @Nullable private <K, V> GridTuple<CacheObject> peekSwap(boolean failFast, - IgnitePredicate<Cache.Entry<K, V>>[] filter) + CacheEntryPredicate[] filter) throws IgniteCheckedException, GridCacheFilterFailedException { - if (!cctx.isAll(this.<K, V>wrap(), filter)) - return F.t((CacheObject)CU.failed(failFast)); + if (!cctx.isAll(this, filter)) + return F.t(CU.failed(failFast)); synchronized (this) { if (checkExpired()) @@ -3119,9 +3119,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * @throws GridCacheFilterFailedException If filter failed. */ @SuppressWarnings({"unchecked"}) - @Nullable private <K, V> CacheObject peekDb(boolean failFast, IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Nullable private <K, V> CacheObject peekDb(boolean failFast, CacheEntryPredicate[] filter) throws IgniteCheckedException, GridCacheFilterFailedException { - if (!cctx.isAll(this.<K, V>wrap(), filter)) + if (!cctx.isAll(this, filter)) return CU.failed(failFast); synchronized (this) { @@ -3812,7 +3812,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** {@inheritDoc} */ @Override public <K, V> boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { boolean marked = false; try { @@ -3853,7 +3853,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { v = ver; } - if (!cctx.isAll(/*version needed for sync evicts*/this.<K, V>wrapVersioned(), filter)) + if (!cctx.isAll(/*version needed for sync evicts*/this, filter)) return false; synchronized (this) { @@ -3949,10 +3949,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * @param filter Entry filter. * @return {@code True} if entry is visitable. */ - public <K, V> boolean visitable(IgnitePredicate<Cache.Entry<K, V>>[] filter) { + public <K, V> boolean visitable(CacheEntryPredicate[] filter) { try { - if (obsoleteOrDeleted() || (filter != CU.<K, V>empty() && - !cctx.isAll(this.<K, V> wrapLazyValue(), filter))) + if (obsoleteOrDeleted() || (filter != CU.empty0() && + !cctx.isAll(this, filter))) return false; } catch (IgniteCheckedException e) { @@ -4416,7 +4416,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return null; try { - return CU.value(e.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()), cctx, false); + return CU.value(e.peek(GridCachePeekMode.GLOBAL, CU.empty0()), cctx, false); } catch (GridCacheEntryRemovedException ignored) { // No-op. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 2810e20..0e72011 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -346,10 +346,16 @@ public abstract class GridCacheMessage implements Message { for (IgniteTxEntry e : txEntries) { e.marshal(ctx, transferExpiry); + if (e.filters() != null) { + GridCacheContext cctx = ctx.cacheContext(e.cacheId()); + + for (CacheEntryPredicate p : e.filters()) + p.prepareMarshal(cctx); + } + if (ctx.deploymentEnabled()) { prepareObject(e.key(), ctx); prepareObject(e.value(), ctx); - prepareFilter(e.filters(), ctx); } } } @@ -376,8 +382,16 @@ public abstract class GridCacheMessage implements Message { assert ctx != null; if (txEntries != null) { - for (IgniteTxEntry e : txEntries) + for (IgniteTxEntry e : txEntries) { e.unmarshal(ctx, near, ldr); + + if (e.filters() != null) { + GridCacheContext cctx = ctx.cacheContext(e.cacheId()); + + for (CacheEntryPredicate p : e.filters()) + p.finishUnmarshal(cctx, ldr); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java index fe72bef..08a3fb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java @@ -49,7 +49,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * * @return Filter on which this projection is based on. */ - @Nullable public IgnitePredicate<Cache.Entry<K, V>> predicate(); + @Nullable public CacheEntryPredicate predicate(); /** * Internal method that is called from {@link GridCacheEntryImpl}. @@ -63,7 +63,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If failed. */ @Nullable public V put(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** * Internal method that is called from {@link GridCacheEntryImpl}. @@ -76,7 +76,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @return Put operation future. */ public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + @Nullable CacheEntryPredicate... filter); /** * Internal method that is called from {@link GridCacheEntryImpl}. @@ -90,7 +90,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If failed. */ public boolean putx(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** * Internal method that is called from {@link GridCacheEntryImpl}. @@ -103,7 +103,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @return Putx operation future. */ public IgniteInternalFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + @Nullable CacheEntryPredicate... filter); /** * Store DR data. @@ -134,7 +134,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If failed. */ @Nullable public V remove(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** * Internal method that is called from {@link GridCacheEntryImpl}. @@ -145,7 +145,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @return Put operation future. */ public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + @Nullable CacheEntryPredicate... filter); /** * Removes DR data. @@ -176,7 +176,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If failed. */ public boolean removex(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** * Internal method that is called from {@link GridCacheEntryImpl}. @@ -187,7 +187,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @return Putx operation future. */ public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + @Nullable CacheEntryPredicate... filter); /** * Asynchronously stores given key-value pair in cache only if only if the previous value is equal to the @@ -293,7 +293,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @throws IgniteCheckedException If failed. */ @Nullable public V get(K key, @Nullable GridCacheEntryEx entry, boolean deserializePortable, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** * Gets value from cache. Will go to primary node even if this is a backup. @@ -372,7 +372,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @param filter Filter. * @return Entry set. */ - public Set<Cache.Entry<K, V>> entrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter); + public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter); /** * Gets set of primary entries containing internal entries. @@ -380,7 +380,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> { * @param filter Optional filter. * @return Primary entry set. */ - public Set<Cache.Entry<K, V>> primaryEntrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter); + public Set<Cache.Entry<K, V>> primaryEntrySetx(CacheEntryPredicate... filter); /** * @return {@link ExpiryPolicy} associated with this projection. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 3e8930b..7a27cff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -51,21 +51,9 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** */ private static final long serialVersionUID = 0L; - /** Key-value filter taking null values. */ + /** Entry filter. */ @GridToStringExclude - private KeyValueFilter<K, V> withNullKvFilter; - - /** Key-value filter not allowing null values. */ - @GridToStringExclude - private KeyValueFilter<K, V> noNullKvFilter; - - /** Entry filter built with {@link #withNullKvFilter}. */ - @GridToStringExclude - private FullFilter<K, V> withNullEntryFilter; - - /** Entry filter built with {@link #noNullKvFilter}. */ - @GridToStringExclude - private FullFilter<K, V> noNullEntryFilter; + private CacheEntryPredicate filter; /** Base cache. */ private GridCacheAdapter<K, V> cache; @@ -99,7 +87,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** * @param parent Parent projection. * @param cctx Cache context. - * @param kvFilter Key-value filter. * @param entryFilter Entry filter. * @param flags Flags for new projection */ @@ -107,8 +94,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V public GridCacheProjectionImpl( CacheProjection<K, V> parent, GridCacheContext<K, V> cctx, - @Nullable IgniteBiPredicate<K, V> kvFilter, - @Nullable IgnitePredicate<? super Cache.Entry<K, V>> entryFilter, + @Nullable CacheEntryPredicate entryFilter, @Nullable Set<CacheFlag> flags, @Nullable UUID subjId, boolean keepPortable, @@ -127,13 +113,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V this.flags = Collections.unmodifiableSet(f); - withNullKvFilter = new KeyValueFilter<>(kvFilter, false); - - noNullKvFilter = new KeyValueFilter<>(kvFilter, true); - - withNullEntryFilter = new FullFilter<>(withNullKvFilter, entryFilter); - - noNullEntryFilter = new FullFilter<>(noNullKvFilter, entryFilter); + this.filter = entryFilter; this.subjId = subjId; @@ -156,22 +136,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** - * @param noNulls Flag indicating whether filter should accept nulls or not. - * @return Entry filter for the flag. - */ - IgnitePredicate<Cache.Entry<K, V>> entryFilter(boolean noNulls) { - return noNulls ? noNullEntryFilter : withNullEntryFilter; - } - - /** - * @param noNulls Flag indicating whether filter should accept nulls or not. - * @return Key-value filter for the flag. - */ - IgniteBiPredicate<K, V> kvFilter(boolean noNulls) { - return noNulls ? noNullKvFilter : withNullKvFilter; - } - - /** * @return Keep portable flag. */ public boolean isKeepPortable() { @@ -189,102 +153,43 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V * {@code Ands} passed in filter with projection filter. * * @param filter filter to {@code and}. - * @param noNulls Flag indicating whether filter should accept nulls or not. * @return {@code Anded} filter array. */ - IgnitePredicate<Cache.Entry<K, V>> and( - IgnitePredicate<Cache.Entry<K, V>> filter, boolean noNulls) { - IgnitePredicate<Cache.Entry<K, V>> entryFilter = entryFilter(noNulls); + CacheEntryPredicate and(CacheEntryPredicate filter) { + CacheEntryPredicate entryFilter = this.filter; if (filter == null) return entryFilter; - return F0.and(entryFilter, filter); - } - - /** - * {@code Ands} passed in filter with projection filter. - * - * @param filter filter to {@code and}. - * @param noNulls Flag indicating whether filter should accept nulls or not. - * @return {@code Anded} filter array. - */ - @SuppressWarnings({"unchecked"}) - IgniteBiPredicate<K, V> and(final IgniteBiPredicate<K, V> filter, boolean noNulls) { - final IgniteBiPredicate<K, V> kvFilter = kvFilter(noNulls); - - if (filter == null) - return kvFilter; - - return new P2<K, V>() { - @Override public boolean apply(K k, V v) { - return F.isAll2(k, v, kvFilter) && filter.apply(k, v); - } - }; - } - - /** - * {@code Ands} passed in filter with projection filter. - * - * @param filter filter to {@code and}. - * @param noNulls Flag indicating whether filter should accept nulls or not. - * @return {@code Anded} filter array. - */ - @SuppressWarnings({"unchecked"}) - IgniteBiPredicate<K, V> and(final IgniteBiPredicate<K, V>[] filter, boolean noNulls) { - final IgniteBiPredicate<K, V> kvFilter = kvFilter(noNulls); - - if (filter == null) - return kvFilter; - - return new P2<K, V>() { - @Override public boolean apply(K k, V v) { - return F.isAll2(k, v, kvFilter) && F.isAll2(k, v, filter); - } - }; + return F0.and0(entryFilter, filter); } /** * {@code Ands} two passed in filters. * * @param f1 First filter. - * @param nonNulls Flag indicating whether nulls should be included. * @return {@code Anded} filter. */ - private IgnitePredicate<Cache.Entry<K, V>> and(@Nullable final IgnitePredicate<Cache.Entry<K, V>>[] f1, - boolean nonNulls) { - IgnitePredicate<Cache.Entry<K, V>> entryFilter = entryFilter(nonNulls); + private CacheEntryPredicate and(@Nullable final CacheEntryPredicate[] f1) { + CacheEntryPredicate entryFilter = filter; if (F.isEmpty(f1)) return entryFilter; - return F0.and(entryFilter, f1); - } - - /** - * @param e Entry to verify. - * @param noNulls Flag indicating whether filter should accept nulls or not. - * @return {@code True} if filter passed. - */ - boolean isAll(Cache.Entry<K, V> e, boolean noNulls) { - CacheFlag[] f = cctx.forceLocalRead(); - - try { - return F.isAll(e, entryFilter(noNulls)); - } - finally { - cctx.forceFlags(f); - } + return F0.and0(entryFilter, f1); } /** * @param k Key. * @param v Value. - * @param noNulls Flag indicating whether filter should accept nulls or not. * @return {@code True} if filter passed. */ - boolean isAll(K k, V v, boolean noNulls) { - IgniteBiPredicate<K, V> p = kvFilter(noNulls); + boolean isAll(K k, V v) { + if (k == null || v == null) + return false; + + // TODO IGNITE-51. + IgniteBiPredicate<K, V> p = null; if (p != null) { CacheFlag[] f = cctx.forceLocalRead(); @@ -303,10 +208,9 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** * @param map Map. - * @param noNulls Flag indicating whether filter should accept nulls or not. * @return {@code True} if filter passed. */ - Map<? extends K, ? extends V> isAll(Map<? extends K, ? extends V> map, boolean noNulls) { + Map<? extends K, ? extends V> isAll(Map<? extends K, ? extends V> map) { if (F.isEmpty(map)) return Collections.<K, V>emptyMap(); @@ -317,7 +221,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V K k = e.getKey(); V v = e.getValue(); - if (!isAll(k, v, noNulls)) { + if (!isAll(k, v)) { failed = true; break; @@ -333,44 +237,13 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V K k = e.getKey(); V v = e.getValue(); - if (isAll(k, v, noNulls)) + if (isAll(k, v)) cp.put(k, v); } return cp; } - /** - * Entry projection-filter-aware visitor. - * - * @param vis Visitor. - * @return Projection-filter-aware visitor. - */ - private IgniteInClosure<Cache.Entry<K, V>> visitor(final IgniteInClosure<Cache.Entry<K, V>> vis) { - return new CI1<Cache.Entry<K, V>>() { - @Override public void apply(Cache.Entry<K, V> e) { - if (isAll(e, true)) - vis.apply(e); - } - }; - } - - /** - * Entry projection-filter-aware visitor. - * - * @param vis Visitor. - * @return Projection-filter-aware visitor. - */ - private IgnitePredicate<Cache.Entry<K, V>> visitor(final IgnitePredicate<Cache.Entry<K, V>> vis) { - return new P1<Cache.Entry<K, V>>() { - @Override public boolean apply(Cache.Entry<K, V> e) { - // If projection filter didn't pass, go to the next element. - // Otherwise, delegate to the visitor. - return !isAll(e, true) || vis.apply(e); - } - }; - } - /** {@inheritDoc} */ @SuppressWarnings( {"unchecked", "RedundantCast"}) @Override public <K1, V1> GridCache<K1, V1> cache() { @@ -388,8 +261,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, - noNullKvFilter.kvFilter, - noNullEntryFilter.entryFilter, + filter, flags, subjId, keepPortable, @@ -427,8 +299,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>( (CacheProjection<K1, V1>)this, (GridCacheContext<K1, V1>)cctx, - CU.<K1, V1>typeFilter(keyType, valType), - (IgnitePredicate<Cache.Entry>)noNullEntryFilter.entryFilter, + CU.typeFilter0(keyType, valType), flags, subjId, keepPortable, @@ -438,44 +309,13 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public CacheProjection<K, V> projection(IgniteBiPredicate<K, V> p) { - if (p == null) - return new GridCacheProxyImpl<>(cctx, this, this); - - IgniteBiPredicate<K, V> kvFilter = p; - - if (noNullKvFilter.kvFilter != null) - kvFilter = and(p, true); - - if (cctx.deploymentEnabled()) { - try { - cctx.deploy().registerClasses(p); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, - cctx, - kvFilter, - noNullEntryFilter.entryFilter, - flags, - subjId, - keepPortable, - expiryPlc); - - return new GridCacheProxyImpl<>(cctx, prj, prj); - } - - /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) - @Override public CacheProjection<K, V> projection(IgnitePredicate<Cache.Entry<K, V>> filter) { + @Override public CacheProjection<K, V> projection(CacheEntryPredicate filter) { if (filter == null) return new GridCacheProxyImpl<>(cctx, this, this); - if (noNullEntryFilter.entryFilter != null) - filter = and(filter, true); + if (this.filter != null) + filter = and(filter); if (cctx.deploymentEnabled()) { try { @@ -488,7 +328,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, - noNullKvFilter.kvFilter, filter, flags, subjId, @@ -513,8 +352,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, - noNullKvFilter.kvFilter, - noNullEntryFilter.entryFilter, + filter, res, subjId, keepPortable, @@ -537,8 +375,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, - noNullKvFilter.kvFilter, - noNullEntryFilter.entryFilter, + filter, res, subjId, keepPortable, @@ -553,8 +390,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>( (CacheProjection<K1, V1>)this, (GridCacheContext<K1, V1>)cctx, - (IgniteBiPredicate<K1, V1>)noNullKvFilter.kvFilter, - (IgnitePredicate<Cache.Entry>)noNullEntryFilter.entryFilter, + filter, flags, subjId, true, @@ -602,7 +438,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public int nearSize() { return cctx.config().getCacheMode() == PARTITIONED && isNearEnabled(cctx) ? - cctx.near().nearKeySet(entryFilter(true)).size() : 0; + cctx.near().nearKeySet(filter).size() : 0; } /** {@inheritDoc} */ @@ -641,16 +477,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public void forEach(IgniteInClosure<Cache.Entry<K, V>> vis) { - cache.forEach(visitor(vis)); - } - - /** {@inheritDoc} */ - @Override public boolean forAll(IgnitePredicate<Cache.Entry<K, V>> vis) { - return cache.forAll(visitor(vis)); - } - - /** {@inheritDoc} */ @Override public V reload(K key) throws IgniteCheckedException { return cache.reload(key); } @@ -687,8 +513,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public V get(K key, @Nullable GridCacheEntryEx entry, boolean deserializePortable, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { - return cache.get(key, entry, deserializePortable, and(filter, false)); + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { + return cache.get(key, entry, deserializePortable, and(filter)); } /** {@inheritDoc} */ @@ -752,44 +578,44 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public V put(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Override public V put(K key, V val, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { return putAsync(key, val, filter).get(); } /** {@inheritDoc} */ @Override public V put(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return cache.put(key, val, entry, ttl, filter); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> putAsync(K key, V val, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { return putAsync(key, val, null, -1, filter); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { A.notNull(key, "key", val, "val"); // Check k-v predicate first. - if (!isAll(key, val, true)) + if (!isAll(key, val)) return new GridFinishedFuture<>(cctx.kernalContext()); - return cache.putAsync(key, val, entry, ttl, and(filter, false)); + return cache.putAsync(key, val, entry, ttl, and(filter)); } /** {@inheritDoc} */ @Override public boolean putx(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return cache.putx(key, val, entry, ttl, filter); } /** {@inheritDoc} */ @Override public boolean putx(K key, V val, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { return putxAsync(key, val, filter).get(); } @@ -847,20 +673,20 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { return putxAsync(key, val, null, -1, filter); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx entry, - long ttl, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + long ttl, @Nullable CacheEntryPredicate[] filter) { A.notNull(key, "key", val, "val"); // Check k-v predicate first. - if (!isAll(key, val, true)) + if (!isAll(key, val)) return new GridFinishedFuture<>(cctx.kernalContext(), false); - return cache.putxAsync(key, val, entry, ttl, and(filter, false)); + return cache.putxAsync(key, val, entry, ttl, and(filter)); } /** {@inheritDoc} */ @@ -870,7 +696,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> putIfAbsentAsync(K key, V val) { - return putAsync(key, val, cctx.<K, V>noPeekArray()); + return putAsync(key, val, cctx.noValArray()); } /** {@inheritDoc} */ @@ -880,7 +706,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> putxIfAbsentAsync(K key, V val) { - return putxAsync(key, val, cctx.<K, V>noPeekArray()); + return putxAsync(key, val, cctx.noValArray()); } /** {@inheritDoc} */ @@ -890,7 +716,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> replaceAsync(K key, V val) { - return putAsync(key, val, cctx.hasPeekArray()); + return putAsync(key, val, cctx.hasValArray()); } /** {@inheritDoc} */ @@ -900,7 +726,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> replacexAsync(K key, V val) { - return putxAsync(key, val, cctx.hasPeekArray()); + return putxAsync(key, val, cctx.hasValArray()); } /** {@inheritDoc} */ @@ -910,66 +736,66 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { - IgnitePredicate<Cache.Entry<K, V>> fltr = and(F.<K, V>cacheContainsPeek(oldVal), false); + CacheEntryPredicate fltr = and(cctx.equalsValArray(oldVal)); return cache.putxAsync(key, newVal, fltr); } /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> m, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { putAllAsync(m, filter).get(); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { - m = isAll(m, true); + @Nullable CacheEntryPredicate[] filter) { + m = isAll(m); if (F.isEmpty(m)) return new GridFinishedFuture<>(cctx.kernalContext()); - return cache.putAllAsync(m, and(filter, false)); + return cache.putAllAsync(m, and(filter)); } /** {@inheritDoc} */ @Override public Set<K> keySet() { - return cache.keySet(entryFilter(true)); + return cache.keySet(filter); } /** {@inheritDoc} */ - @Override public Set<K> keySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Set<K> keySet(@Nullable CacheEntryPredicate... filter) { return cache.keySet(filter); } /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { - return cache.primaryKeySet(entryFilter(true)); + return cache.primaryKeySet(filter); } /** {@inheritDoc} */ @Override public Collection<V> values() { - return cache.values(entryFilter(true)); + return cache.values(filter); } /** {@inheritDoc} */ @Override public Collection<V> primaryValues() { - return cache.primaryValues(entryFilter(true)); + return cache.primaryValues(filter); } /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> entrySet() { - return cache.entrySet(entryFilter(true)); + return cache.entrySet(filter); } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> entrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) { - return cache.entrySetx(F.and(filter, entryFilter(true))); + @Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { + return cache.entrySetx(F0.and0(filter, this.filter)); } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) { - return cache.primaryEntrySetx(F.and(filter, entryFilter(true))); + @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(CacheEntryPredicate... filter) { + return cache.primaryEntrySetx(F0.and0(filter, this.filter)); } /** {@inheritDoc} */ @@ -980,7 +806,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> primaryEntrySet() { - return cache.primaryEntrySet(entryFilter(true)); + return cache.primaryEntrySet(filter); } /** {@inheritDoc} */ @@ -1000,8 +826,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public IgnitePredicate<Cache.Entry<K, V>> predicate() { - return withNullEntryFilter.hasFilter() ? withNullEntryFilter : null; + @Override public CacheEntryPredicate predicate() { + return filter; } /** {@inheritDoc} */ @@ -1016,7 +842,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public V peek(K key) { - return cache.peek(key, entryFilter(true)); + return cache.peek(key, filter); } /** {@inheritDoc} */ @@ -1037,14 +863,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V @Override public V peek(K key, @Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException { V val = cache.peek(key, modes); - return isAll(key, val, true) ? val : null; + return isAll(key, val) ? val : null; } /** {@inheritDoc} */ @Nullable @Override public Cache.Entry<K, V> entry(K key) { V val = peek(key); - if (!isAll(key, val, false)) + if (!isAll(key, val)) return null; return cache.entry(key); @@ -1053,7 +879,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public boolean evict(K key) { if (predicate() != null) - return cache.evict(key, entryFilter(true)); + return cache.evict(key, filter); else return cache.evict(key); } @@ -1061,7 +887,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public void evictAll(@Nullable Collection<? extends K> keys) { if (predicate() != null) - cache.evictAll(keys, entryFilter(true)); + cache.evictAll(keys, filter); else cache.evictAll(keys); } @@ -1093,12 +919,12 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public boolean clearLocally(K key) { - return cache.clearLocally0(key, entryFilter(true)); + return cache.clearLocally0(key, filter); } /** {@inheritDoc} */ @Override public boolean compact(K key) throws IgniteCheckedException { - return cache.compact(key, entryFilter(false)); + return cache.compact(key, filter); } /** {@inheritDoc} */ @@ -1108,30 +934,30 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public V remove(K key, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { return removeAsync(key, filter).get(); } /** {@inheritDoc} */ @Override public V remove(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return removeAsync(key, entry, filter).get(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> removeAsync(K key, IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Override public IgniteInternalFuture<V> removeAsync(K key, CacheEntryPredicate[] filter) { return removeAsync(key, null, filter); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { - return cache.removeAsync(key, entry, and(filter, false)); + @Nullable CacheEntryPredicate... filter) { + return cache.removeAsync(key, entry, and(filter)); } /** {@inheritDoc} */ @Override public boolean removex(K key, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { return removexAsync(key, filter).get(); } @@ -1147,20 +973,20 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public boolean removex(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return removexAsync(key, entry, filter).get(); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> removexAsync(K key, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { return removexAsync(key, null, filter); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { - return cache.removexAsync(key, entry, and(filter, false)); + @Nullable CacheEntryPredicate... filter) { + return cache.removexAsync(key, entry, and(filter)); } /** {@inheritDoc} */ @@ -1168,7 +994,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); // Check k-v predicate first. - if (!isAll(key, newVal, true)) + if (!isAll(key, newVal)) return new GridFinishedFuture<>(cctx.kernalContext(), new GridCacheReturn<V>(false)); return cache.replacexAsync(key, oldVal, newVal); @@ -1186,7 +1012,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { - return !isAll(key, val, true) ? new GridFinishedFuture<>(cctx.kernalContext(), + return !isAll(key, val) ? new GridFinishedFuture<>(cctx.kernalContext(), new GridCacheReturn<V>(false)) : cache.removexAsync(key, val); } @@ -1197,20 +1023,20 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) { - return !isAll(key, val, true) ? new GridFinishedFuture<>(cctx.kernalContext(), false) : + return !isAll(key, val) ? new GridFinishedFuture<>(cctx.kernalContext(), false) : cache.removeAsync(key, val); } /** {@inheritDoc} */ @Override public void removeAll(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { - cache.removeAll(keys, and(filter, false)); + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { + cache.removeAll(keys, and(filter)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { - return cache.removeAllAsync(keys, and(filter, false)); + @Nullable CacheEntryPredicate[] filter) { + return cache.removeAllAsync(keys, and(filter)); } /** {@inheritDoc} */ @@ -1233,37 +1059,37 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public boolean lock(K key, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { - return cache.lock(key, timeout, and(filter, false)); + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { + return cache.lock(key, timeout, and(filter)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAsync(K key, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { - return cache.lockAsync(key, timeout, and(filter, false)); + @Nullable CacheEntryPredicate[] filter) { + return cache.lockAsync(key, timeout, and(filter)); } /** {@inheritDoc} */ @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { - return cache.lockAll(keys, timeout, and(filter, false)); + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { + return cache.lockAll(keys, timeout, and(filter)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { - return cache.lockAllAsync(keys, timeout, and(filter, false)); + @Nullable CacheEntryPredicate[] filter) { + return cache.lockAllAsync(keys, timeout, and(filter)); } /** {@inheritDoc} */ - @Override public void unlock(K key, IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { - cache.unlock(key, and(filter, false)); + @Override public void unlock(K key, CacheEntryPredicate[] filter) throws IgniteCheckedException { + cache.unlock(key, and(filter)); } /** {@inheritDoc} */ @Override public void unlockAll(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { - cache.unlockAll(keys, and(filter, false)); + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { + cache.unlockAll(keys, and(filter)); } /** {@inheritDoc} */ @@ -1319,7 +1145,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { - return cache.entrySet(entryFilter(true)).iterator(); + return cache.entrySet(filter).iterator(); } /** {@inheritDoc} */ @@ -1343,8 +1169,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V return new GridCacheProjectionImpl<>( this, cctx, - noNullKvFilter.kvFilter, - noNullEntryFilter.entryFilter, + filter, flags, subjId, true, @@ -1355,11 +1180,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(cctx); - out.writeObject(noNullEntryFilter); - out.writeObject(withNullEntryFilter); - - out.writeObject(noNullKvFilter); - out.writeObject(withNullKvFilter); + out.writeObject(filter); U.writeCollection(out, flags); @@ -1371,11 +1192,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { cctx = (GridCacheContext<K, V>)in.readObject(); - noNullEntryFilter = (FullFilter<K, V>)in.readObject(); - withNullEntryFilter = (FullFilter<K, V>)in.readObject(); - - noNullKvFilter = (KeyValueFilter<K, V>)in.readObject(); - withNullKvFilter = (KeyValueFilter<K, V>)in.readObject(); + filter = (CacheEntryPredicate)in.readObject(); flags = U.readSet(in); @@ -1390,106 +1207,4 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V @Override public String toString() { return S.toString(GridCacheProjectionImpl.class, this); } - - /** - * @param <K> Key type. - * @param <V> Value type. - */ - public static class FullFilter<K, V> implements IgnitePredicate<Cache.Entry<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Key filter. */ - private KeyValueFilter<K, V> kvFilter; - - /** Entry filter. */ - private IgnitePredicate<? super Cache.Entry<K, V>> entryFilter; - - /** - * @param kvFilter Key-value filter. - * @param entryFilter Entry filter. - */ - private FullFilter(KeyValueFilter<K, V> kvFilter, IgnitePredicate<? super Cache.Entry<K, V>> entryFilter) { - this.kvFilter = kvFilter; - this.entryFilter = entryFilter; - } - - /** - * @return {@code True} if has non-null key value or entry filter. - */ - boolean hasFilter() { - return (kvFilter != null && kvFilter.filter() != null) || entryFilter != null; - } - - /** - * @return Key-value filter. - */ - public KeyValueFilter<K, V> keyValueFilter() { - return kvFilter; - } - - /** - * @return Entry filter. - */ - public IgnitePredicate<? super Cache.Entry<K, V>> entryFilter() { - return entryFilter; - } - - /** {@inheritDoc} */ - @Override public boolean apply(Cache.Entry<K, V> e) { - if (kvFilter != null) { - if (!kvFilter.apply(e.getKey(), e.getValue())) - return false; - } - - return F.isAll(e, entryFilter); - } - } - - /** - * @param <K> Key type. - * @param <V> Value type. - */ - public static class KeyValueFilter<K, V> implements IgniteBiPredicate<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Key filter. */ - private IgniteBiPredicate<K, V> kvFilter; - - /** No nulls flag. */ - private boolean noNulls; - - /** - * @param kvFilter Key-value filter. - * @param noNulls Filter without null-values. - */ - private KeyValueFilter(IgniteBiPredicate<K, V> kvFilter, boolean noNulls) { - this.kvFilter = kvFilter; - this.noNulls = noNulls; - } - - /** - * @return Key-value filter. - */ - public IgniteBiPredicate<K, V> filter() { - return kvFilter; - } - - /** {@inheritDoc} */ - @Override public boolean apply(K k, V v) { - if (k == null) // Should never happen, but just in case. - return false; - - if (v == null) - return !noNulls; - - if (kvFilter != null) { - if (!kvFilter.apply(k, v)) - return false; - } - - return true; - } - } }