http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index bc317de..88200bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -285,7 +285,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgnitePredicate<Cache.Entry<K, V>> predicate() { + @Override public CacheEntryPredicate predicate() { return delegate.predicate(); } @@ -303,13 +303,8 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public CacheProjection<K, V> projection(@Nullable IgniteBiPredicate<K, V> p) { - return delegate.projection(p); - } - - /** {@inheritDoc} */ @Override public CacheProjection<K, V> projection( - @Nullable IgnitePredicate<Cache.Entry<K, V>> filter) { + @Nullable CacheEntryPredicate filter) { return delegate.projection(filter); } @@ -394,30 +389,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public void forEach(IgniteInClosure<Cache.Entry<K, V>> vis) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - delegate.forEach(vis); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ - @Override public boolean forAll(IgnitePredicate<Cache.Entry<K, V>> vis) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.forAll(vis); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Nullable @Override public V reload(K key) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -504,7 +475,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public V get(K key, @Nullable GridCacheEntryEx entry, boolean deserializePortable, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -660,7 +631,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Nullable @Override public V put(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Nullable @Override public V put(K key, V val, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -674,7 +645,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public V put(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -687,7 +658,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> putAsync(K key, V val, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -700,7 +671,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -713,7 +684,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public boolean putx(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -725,7 +696,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public boolean putx(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Override public boolean putx(K key, V val, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -849,7 +820,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -865,7 +836,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -998,7 +969,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public void putAll(@Nullable Map<? extends K, ? extends V> m, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1011,7 +982,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> putAllAsync(@Nullable Map<? extends K, ? extends V> m, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1035,7 +1006,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public Set<K> keySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Set<K> keySet(@Nullable CacheEntryPredicate... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1107,7 +1078,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> entrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1119,7 +1090,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(CacheEntryPredicate... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1390,7 +1361,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Nullable @Override public V remove(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Nullable @Override public V remove(K key, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -1404,7 +1375,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public V remove(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1416,7 +1387,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> removeAsync(K key, IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Override public IgniteInternalFuture<V> removeAsync(K key, CacheEntryPredicate[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1429,7 +1400,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1441,7 +1412,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public boolean removex(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Override public boolean removex(K key, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -1479,7 +1450,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public boolean removex(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1492,7 +1463,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> removexAsync(K key, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1505,7 +1476,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1590,7 +1561,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public void removeAll(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1603,7 +1574,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1652,7 +1623,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public boolean lock(K key, long timeout, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Override public boolean lock(K key, long timeout, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -1666,7 +1637,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAsync(K key, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1679,7 +1650,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1692,7 +1663,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1704,7 +1675,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public void unlock(K key, IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Override public void unlock(K key, CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1717,7 +1688,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public void unlockAll(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index a60feb6..01ccddd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -129,6 +129,9 @@ public class GridCacheUtils { /** Empty predicate array. */ private static final IgnitePredicate[] EMPTY_FILTER = new IgnitePredicate[0]; + /** Empty predicate array. */ + private static final CacheEntryPredicate[] EMPTY_FILTER0 = new CacheEntryPredicate[0]; + /** Always false predicat array. */ private static final IgnitePredicate[] ALWAYS_FALSE = new IgnitePredicate[] { new P1() { @@ -138,6 +141,24 @@ public class GridCacheUtils { } }; + /** */ + private static final CacheEntryPredicate[] ALWAYS_FALSE0 = new CacheEntryPredicate[] { + new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + return false; + } + } + }; + + /** */ + private static final CacheEntryPredicate[] ALWAYS_TRUE0 = new CacheEntryPredicate[] { + new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + return true; + } + } + }; + /** Read filter. */ private static final IgnitePredicate READ_FILTER = new P1<Object>() { @Override public boolean apply(Object e) { @@ -749,6 +770,14 @@ public class GridCacheUtils { } /** + * @return Empty filter. + */ + @SuppressWarnings({"unchecked"}) + public static CacheEntryPredicate[] empty0() { + return EMPTY_FILTER0; + } + + /** * @return Always false filter. */ @SuppressWarnings({"unchecked"}) @@ -757,6 +786,13 @@ public class GridCacheUtils { } /** + * @return Always false filter. + */ + public static CacheEntryPredicate[] alwaysFalse0() { + return ALWAYS_FALSE0; + } + + /** * @return Closure that converts tx entry to key. */ @SuppressWarnings({"unchecked"}) @@ -842,6 +878,28 @@ public class GridCacheUtils { } /** + * @param keyType Key type. + * @param valType Value type. + * @return Type filter. + */ + public static CacheEntryPredicate typeFilter0(final Class<?> keyType, final Class<?> valType) { + return new CacheEntrySerializablePredicate(new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + try { + Object val = CU.value(e.rawGetOrUnmarshal(true), e.context(), false); + + return val != null && + valType.isAssignableFrom(val.getClass()) && + keyType.isAssignableFrom(e.key().value(e.context(), false).getClass()); + } + catch (IgniteCheckedException err) { + throw new IgniteException(err); + } + } + }); + } + + /** * @return Boolean reducer. */ public static IgniteReducer<Boolean, Boolean> boolReducer() { @@ -1724,17 +1782,17 @@ public class GridCacheUtils { * @throws ClassNotFoundException If class not found. */ @SuppressWarnings("unchecked") - @Nullable public static <K, V> IgnitePredicate<Cache.Entry<K, V>>[] readEntryFilterArray(ObjectInput in) + @Nullable public static <K, V> CacheEntryPredicate[] readEntryFilterArray(ObjectInput in) throws IOException, ClassNotFoundException { int len = in.readInt(); - IgnitePredicate<Cache.Entry<K, V>>[] arr = null; + CacheEntryPredicate[] arr = null; if (len > 0) { - arr = new IgnitePredicate[len]; + arr = new CacheEntryPredicate[len]; for (int i = 0; i < len; i++) - arr[i] = (IgnitePredicate<Cache.Entry<K, V>>)in.readObject(); + arr[i] = (CacheEntryPredicate)in.readObject(); } return arr; @@ -1745,7 +1803,23 @@ public class GridCacheUtils { * @param n Node. * @return Predicate that evaulates to {@code true} if entry is primary for node. */ - public static <K, V> IgnitePredicate<Cache.Entry<K, V>> cachePrimary( + public static CacheEntryPredicate cachePrimary( + final CacheAffinity aff, + final ClusterNode n + ) { + return new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + return aff.isPrimary(n, e.key()); + } + }; + } + + /** + * @param aff Affinity. + * @param n Node. + * @return Predicate that evaulates to {@code true} if entry is primary for node. + */ + public static <K, V> IgnitePredicate<Cache.Entry<K, V>> cachePrimary0( final CacheAffinity<K> aff, final ClusterNode n ) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java index 9f9dcce..ddf4918 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java @@ -118,9 +118,7 @@ public class GridCacheValueCollection<K, V> extends GridSerializableCollection<V /** {@inheritDoc} */ @Override public void clear() { - ctx.cache().clearLocally0(F.viewReadOnly(map.values(), F.<K, V>cacheEntry2Key(), filter), CU.<K, V>empty()); - - map.clear(); + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b77b8db..af23dd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -702,7 +702,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @param filter Filter. * @return Entry set. */ - public Set<Entry<K, V>> entrySetx(IgnitePredicate<Entry<K, V>>... filter) { + public Set<Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1347,7 +1347,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V (CacheProjection<K1, V1>)(prj != null ? prj : delegate), (GridCacheContext<K1, V1>)ctx, null, - null, prj != null ? prj.flags() : null, prj != null ? prj.subjectId() : null, true, @@ -1390,7 +1389,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V (prj != null ? prj : delegate), ctx, null, - null, res, prj != null ? prj.subjectId() : null, true, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 42c3fea..c9f72f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -82,7 +82,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter TransactionIsolation isolation, boolean isInvalidate, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter + CacheEntryPredicate[] filter ) { assert tx != null; @@ -91,7 +91,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - IgnitePredicate<Cache.Entry<K, V>>... filter) { + CacheEntryPredicate... filter) { IgniteTxLocalEx tx = ctx.tm().userTxx(); // Return value flag is true because we choose to bring values for explicit locks. @@ -126,7 +126,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter boolean retval, @Nullable TransactionIsolation isolation, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter); + CacheEntryPredicate[] filter); /** * @param key Key to remove. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index c787261..a867c2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -205,7 +205,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext cacheCtx, boolean failFast, KeyCacheObject key, - IgnitePredicate<Cache.Entry<K, V>>[] filter) + CacheEntryPredicate[] filter) throws GridCacheFilterFailedException { assert false : "Method peek can only be called on user transaction: " + this; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index bb7d308..dfdda3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -803,7 +803,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override public void unlockAll(Collection<? extends K> keys, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { assert false; } @@ -1059,7 +1059,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap while (partIt.hasNext()) { GridDhtCacheEntry next = partIt.next(); - if (next.isInternal() || !next.visitable(CU.<K, V>empty())) + if (next.isInternal() || !next.visitable(CU.empty0())) continue; entry = next.wrapLazyValue(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 6fcc7f6..2d049cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -107,7 +107,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo private IgniteLogger log; /** Filter. */ - private IgnitePredicate<Cache.Entry<K, V>>[] filter; + private CacheEntryPredicate[] filter; /** Transaction. */ private GridDhtTxLocalAdapter tx; @@ -161,7 +161,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo GridDhtTxLocalAdapter tx, long threadId, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { super(cctx.kernalContext(), CU.boolReducer()); assert nearNodeId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 449702b..b394d49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -556,7 +556,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach boolean retval, TransactionIsolation isolation, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { return lockAllAsyncInternal( keys, timeout, @@ -591,7 +591,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach boolean retval, TransactionIsolation isolation, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { if (keys == null || keys.isEmpty()) return new GridDhtFinishedFuture<>(ctx.kernalContext(), true); @@ -665,7 +665,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach final GridCacheContext<K, V> cacheCtx, final ClusterNode nearNode, final GridNearLockRequest req, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter0) { + @Nullable final CacheEntryPredicate[] filter0) { final List<KeyCacheObject> keys = req.keys(); IgniteInternalFuture<Object> keyFut = null; @@ -691,7 +691,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (exx != null) return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx); - IgnitePredicate<Cache.Entry<K, V>>[] filter = filter0; + CacheEntryPredicate[] filter = filter0; // Set message into thread context. GridDhtTxLocal tx = null; @@ -710,7 +710,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Unmarshal filter first. if (filter == null) - filter = (IgnitePredicate[])req.filter(); + filter = req.filter(); GridDhtLockFuture<K, V> fut = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 060b02c..bd11c0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -553,7 +553,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { null, cached, null, - CU.empty(), + CU.empty0(), false, -1L, -1L, @@ -613,7 +613,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { final boolean read, final Set<KeyCacheObject> skipped, final long accessTtl, - @Nullable final IgnitePredicate<Cache.Entry<Object, Object>>[] filter) { + @Nullable final CacheEntryPredicate[] filter) { if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" + skipped + ']'); @@ -631,7 +631,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /*retval*/false, isolation, accessTtl, - (IgnitePredicate[])CU.empty()); + CU.empty0()); return new GridEmbeddedFuture<>( fut, @@ -648,7 +648,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /*retval*/false, /*read*/read, accessTtl, - filter == null ? CU.empty() : filter, + filter == null ? CU.empty0() : filter, /**computeInvoke*/false); return ret; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index e4212de..16cc223 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -308,26 +308,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public V put(K key, V val, @Nullable GridCacheEntryEx cached, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { return putAsync(key, val, cached, ttl, filter).get(); } /** {@inheritDoc} */ @Override public boolean putx(K key, V val, @Nullable GridCacheEntryEx cached, - long ttl, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + long ttl, @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return putxAsync(key, val, cached, ttl, filter).get(); } /** {@inheritDoc} */ @Override public boolean putx(K key, V val, - IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + CacheEntryPredicate[] filter) throws IgniteCheckedException { return putxAsync(key, val, filter).get(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx entry, - long ttl, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + long ttl, @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); return updateAllAsync0(F0.asMap(key, val), @@ -344,7 +344,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); return updateAllAsync0(F0.asMap(key, val), @@ -367,7 +367,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<V> putIfAbsentAsync(K key, V val) { A.notNull(key, "key", val, "val"); - return putAsync(key, val, ctx.<K, V>noPeekArray()); + return putAsync(key, val, ctx.noValArray()); } /** {@inheritDoc} */ @@ -379,7 +379,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<Boolean> putxIfAbsentAsync(K key, V val) { A.notNull(key, "key", val, "val"); - return putxAsync(key, val, ctx.<K, V>noPeekArray()); + return putxAsync(key, val, ctx.noValArray()); } /** {@inheritDoc} */ @@ -391,7 +391,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<V> replaceAsync(K key, V val) { A.notNull(key, "key", val, "val"); - return putAsync(key, val, ctx.hasPeekArray()); + return putAsync(key, val, ctx.hasValArray()); } /** {@inheritDoc} */ @@ -403,7 +403,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<Boolean> replacexAsync(K key, V val) { A.notNull(key, "key", val, "val"); - return putxAsync(key, val, ctx.hasPeekArray()); + return putxAsync(key, val, ctx.hasValArray()); } /** {@inheritDoc} */ @@ -415,10 +415,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); - if (ctx.portableEnabled()) - oldVal = (V)ctx.marshalToPortable(oldVal); - return putxAsync(key, newVal, ctx.equalsPeekArray(oldVal)); + return putxAsync(key, newVal, ctx.equalsValArray(oldVal)); } /** {@inheritDoc} */ @@ -436,18 +434,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { A.notNull(key, "key", val, "val"); - if (ctx.portableEnabled()) - val = (V)ctx.marshalToPortable(val); - - return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsPeekArray(val)); + return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsValArray(val)); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { - if (ctx.portableEnabled()) - oldVal = (V)ctx.marshalToPortable(oldVal); - return updateAllAsync0(F.asMap(key, newVal), null, null, @@ -456,18 +448,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, true, null, - ctx.equalsPeekArray(oldVal)); + ctx.equalsValArray(oldVal)); } /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> m, - IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + CacheEntryPredicate[] filter) throws IgniteCheckedException { putAllAsync(m, filter).get(); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { return updateAllAsync0(m, null, null, @@ -502,14 +494,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public V remove(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return removeAsync(key, entry, filter).get(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); return removeAllAsync0(Collections.singletonList(key), null, entry, true, false, filter); @@ -517,13 +509,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void removeAll(Collection<? extends K> keys, - IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + CacheEntryPredicate... filter) throws IgniteCheckedException { removeAllAsync(keys, filter).get(); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(Collection<? extends K> keys, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { A.notNull(keys, "keys"); return removeAllAsync0(keys, null, null, false, false, filter); @@ -531,14 +523,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public boolean removex(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return removexAsync(key, entry, filter).get(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); return removeAllAsync0(Collections.singletonList(key), null, entry, false, false, filter); @@ -553,10 +545,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) { A.notNull(key, "key", val, "val"); - if (ctx.portableEnabled()) - val = (V)ctx.marshalToPortable(val); - - return removexAsync(key, ctx.equalsPeekArray(val)); + return removexAsync(key, ctx.equalsValArray(val)); } /** {@inheritDoc} */ @@ -565,7 +554,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> localRemoveAll(IgnitePredicate<Cache.Entry<K, V>> filter) { + @Override public IgniteInternalFuture<?> localRemoveAll(CacheEntryPredicate filter) { return removeAllAsync(keySet(filter), null); } @@ -640,7 +629,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean retval, @Nullable TransactionIsolation isolation, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { return new FinishedLockFuture(new UnsupportedOperationException("Locks are not supported for " + "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)")); } @@ -785,7 +774,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean retval, final boolean rawRetval, @Nullable GridCacheEntryEx cached, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter + @Nullable final CacheEntryPredicate[] filter ) { if (map != null && keyCheck) validateCacheKeys(map.keySet()); @@ -812,7 +801,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { retval, rawRetval, prj != null ? prj.expiry() : null, - (IgnitePredicate[])filter, + filter, subjId, taskNameHash); @@ -842,7 +831,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable GridCacheEntryEx cached, final boolean retval, boolean rawRetval, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter + @Nullable final CacheEntryPredicate[] filter ) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); @@ -874,7 +863,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { retval, rawRetval, (filter != null && prj != null) ? prj.expiry() : null, - (IgnitePredicate[])filter, + filter, subjId, taskNameHash); @@ -2241,7 +2230,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { try { - return ctx.isAll(entry.wrapFilterLocked(), req.filter()); + return ctx.isAll(entry, req.filter()); } catch (IgniteCheckedException e) { res.addFailedKey(entry.key(), e); @@ -2465,7 +2454,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*metrics*/true, /*primary*/false, /*check version*/!req.forceTransformBackups(), - CU.empty(), + null, replicate ? DR_BACKUP : DR_NONE, ttl, expireTime, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 567bf67..9756544 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -112,7 +112,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem private long topVer; /** Optional filter. */ - private final IgnitePredicate<Cache.Entry<Object, Object>>[] filter; + private final CacheEntryPredicate[] filter; /** Write synchronization mode. */ private final CacheWriteSynchronizationMode syncMode; @@ -192,7 +192,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem final boolean retval, final boolean rawRetval, @Nullable ExpiryPolicy expiryPlc, - final IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + final CacheEntryPredicate[] filter, UUID subjId, int taskNameHash ) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index b0d356c..1ab0fa0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -116,11 +116,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri private byte[] expiryPlcBytes; /** Filter. */ - @GridDirectTransient - private IgnitePredicate<Cache.Entry<Object, Object>>[] filter; - - /** Filter bytes. */ - private byte[][] filterBytes; + private CacheEntryPredicate[] filter; /** Flag indicating whether request contains primary keys. */ private boolean hasPrimary; @@ -173,7 +169,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri boolean forceTransformBackups, @Nullable ExpiryPolicy expiryPlc, @Nullable Object[] invokeArgs, - @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter, + @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash ) { @@ -282,7 +278,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** * @return Filter. */ - @Nullable public IgnitePredicate<Cache.Entry<Object, Object>>[] filter() { + @Nullable public CacheEntryPredicate[] filter() { return filter; } @@ -511,7 +507,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri else prepareMarshalCacheObjects(vals, cctx); - filterBytes = marshalFilter(filter, ctx); + if (filter != null) { + for (CacheEntryPredicate p : filter) + p.prepareMarshal(cctx); + } + invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx); if (expiryPlc != null) @@ -531,7 +531,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri else finishUnmarshalCacheObjects(vals, cctx, ldr); - filter = unmarshalFilter(filterBytes, ctx, ldr); + if (filter != null) { + for (CacheEntryPredicate p : filter) + p.finishUnmarshal(cctx, ldr); + } + invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); if (expiryPlcBytes != null) @@ -590,7 +594,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 9: - if (!writer.writeObjectArray("filterBytes", filterBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) return false; writer.incrementState(); @@ -626,7 +630,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 15: - if (!writer.writeByte("op", op != null ? (byte) op.ordinal() : -1)) + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) return false; writer.incrementState(); @@ -644,7 +648,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 18: - if (!writer.writeByte("syncMode", syncMode != null ? (byte) syncMode.ordinal() : -1)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); @@ -738,7 +742,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 9: - filterBytes = reader.readObjectArray("filterBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); + filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index d0dc2e0..87c0591 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -378,7 +378,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean retval, @Nullable TransactionIsolation isolation, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter + CacheEntryPredicate[] filter ) { assert tx == null || tx instanceof GridNearTxLocal; @@ -413,7 +413,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte /** {@inheritDoc} */ @Override public void unlockAll(Collection<? extends K> keys, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { if (keys.isEmpty()) return; @@ -431,9 +431,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte GridDistributedCacheEntry entry = peekExx(cacheKey); - Cache.Entry<K, V> Entry = entry == null ? entry(key) : entry.<K, V>wrapLazyValue(); - - if (!ctx.isAll(Entry, filter)) + if (entry == null || !ctx.isAll(entry, filter)) break; // While. GridCacheMvccCandidate lock = @@ -617,7 +615,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte final boolean txRead, final long timeout, final long accessTtl, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter + @Nullable final CacheEntryPredicate[] filter ) { assert keys != null; @@ -690,7 +688,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte final boolean txRead, final long timeout, final long accessTtl, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable final CacheEntryPredicate[] filter) { int cnt = keys.size(); if (tx == null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 441c2fd..3c1b2fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -95,7 +95,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity private IgniteLogger log; /** Filter. */ - private IgnitePredicate<Cache.Entry<K, V>>[] filter; + private CacheEntryPredicate[] filter; /** Transaction. */ @GridToStringExclude @@ -139,7 +139,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity boolean retval, long timeout, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { super(cctx.kernalContext(), CU.boolReducer()); assert keys != null; @@ -609,7 +609,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity final ClusterNode node = map.node(); if (filter != null && filter.length != 0) - req.filter((IgnitePredicate[])filter, cctx); + req.filter(filter, cctx); if (node.isLocal()) lockLocally(mappedKeys, req.topologyVersion(), mappings); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 2358013..9aa407f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -227,7 +227,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*metrics*/true, /*primary*/false, /*check version*/true, - CU.empty(), + CU.empty0(), DR_NONE, ttl, expireTime, @@ -323,7 +323,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*metrics*/true, /*primary*/false, /*check version*/!req.forceTransformBackups(), - CU.empty(), + CU.empty0(), DR_NONE, ttl, expireTime, @@ -391,7 +391,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { V val, @Nullable GridCacheEntryEx cached, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter + @Nullable CacheEntryPredicate[] filter ) throws IgniteCheckedException { return dht.put(key, val, cached, ttl, filter); } @@ -401,14 +401,14 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { V val, @Nullable GridCacheEntryEx cached, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return dht.putx(key, val, cached, ttl, filter); } /** {@inheritDoc} */ @Override public boolean putx(K key, V val, - IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + CacheEntryPredicate[] filter) throws IgniteCheckedException { return dht.putx(key, val, filter); } @@ -418,7 +418,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { return dht.putAsync(key, val, entry, ttl, filter); } @@ -428,7 +428,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { V val, @Nullable GridCacheEntryEx entry, long ttl, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { return dht.putxAsync(key, val, entry, ttl, filter); } @@ -505,14 +505,14 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public void putAll(Map<? extends K, ? extends V> m, IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Override public void putAll(Map<? extends K, ? extends V> m, CacheEntryPredicate[] filter) throws IgniteCheckedException { dht.putAll(m, filter); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { return dht.putAllAsync(m, filter); } @@ -571,7 +571,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public V remove(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return dht.remove(key, entry, filter); } @@ -579,26 +579,26 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { return dht.removeAsync(key, entry, filter); } /** {@inheritDoc} */ - @Override public void removeAll(Collection<? extends K> keys, IgnitePredicate<Cache.Entry<K, V>>... filter) + @Override public void removeAll(Collection<? extends K> keys, CacheEntryPredicate... filter) throws IgniteCheckedException { dht.removeAll(keys, filter); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(Collection<? extends K> keys, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { return dht.removeAllAsync(keys, filter); } /** {@inheritDoc} */ @Override public boolean removex(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return dht.removex(key, entry, filter); } @@ -606,7 +606,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx entry, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { return dht.removexAsync(key, entry, filter); } @@ -636,7 +636,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> localRemoveAll(IgnitePredicate<Cache.Entry<K, V>> filter) { + @Override public IgniteInternalFuture<?> localRemoveAll(CacheEntryPredicate filter) { return dht.localRemoveAll(filter); } @@ -659,13 +659,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { boolean retval, @Nullable TransactionIsolation isolation, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { return dht.lockAllAsync(null, timeout, filter); } /** {@inheritDoc} */ @Override public void unlockAll(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { dht.unlockAll(keys, filter); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 3f1ae46..4d79f18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -359,12 +359,12 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @return Near entries. */ public Set<Cache.Entry<K, V>> nearEntries() { - return super.entrySet(CU.<K, V>empty()); + return super.entrySet(CU.empty0()); } /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> entrySet( - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { return new EntrySet(super.entrySet(filter), dht().entrySet(filter)); } @@ -374,41 +374,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> primaryEntrySet( - @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) { - final long topVer = ctx.affinity().affinityTopologyVersion(); - - Collection<Cache.Entry<K, V>> entries = - F.flatCollections( - F.viewReadOnly( - dht().topology().currentLocalPartitions(), - new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>() { - @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition p) { - return F.viewReadOnly( - p.entries(), - new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() { - @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry e) { - return e.wrapLazyValue(); - } - }, - new P1<GridDhtCacheEntry>() { - @Override public boolean apply(GridDhtCacheEntry e) { - return !e.obsoleteOrDeleted(); - } - }); - } - }, - new P1<GridDhtLocalPartition>() { - @Override public boolean apply(GridDhtLocalPartition p) { - return p.primary(topVer); - } - })); - - return new GridCacheEntrySet<>(ctx, entries, filter); - } - - /** {@inheritDoc} */ - @Override public Set<K> keySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Override public Set<K> keySet(@Nullable CacheEntryPredicate[] filter) { return new GridCacheKeySet<>(ctx, entrySet(filter), null); } @@ -416,31 +382,31 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param filter Entry filter. * @return Keys for near cache only. */ - public Set<K> nearKeySet(@Nullable IgnitePredicate<Cache.Entry<K, V>> filter) { + public Set<K> nearKeySet(@Nullable CacheEntryPredicate filter) { return super.keySet(filter); } /** {@inheritDoc} */ - @Override public Set<K> primaryKeySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Set<K> primaryKeySet(@Nullable CacheEntryPredicate... filter) { return new GridCacheKeySet<>(ctx, primaryEntrySet(filter), null); } /** {@inheritDoc} */ - @Override public Collection<V> values(IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Collection<V> values(CacheEntryPredicate... filter) { return new GridCacheValueCollection<>(ctx, entrySet(filter), ctx.vararg(F.<K, V>cacheHasPeekValue())); } /** {@inheritDoc} */ - @Override public Collection<V> primaryValues(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Collection<V> primaryValues(@Nullable CacheEntryPredicate... filter) { return new GridCacheValueCollection<>( ctx, entrySet(filter), ctx.vararg( - CU.<K, V>cachePrimary(ctx.grid().<K>affinity(ctx.name()), ctx.localNode()))); + CU.<K, V>cachePrimary0(ctx.grid().<K>affinity(ctx.name()), ctx.localNode()))); } /** {@inheritDoc} */ - @Override public boolean evict(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Override public boolean evict(K key, @Nullable CacheEntryPredicate[] filter) { // Use unary 'and' to make sure that both sides execute. return super.evict(key, filter) & dht().evict(key, filter); } @@ -450,13 +416,13 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param filter Optional filter. * @return {@code True} if evicted. */ - public boolean evictNearOnly(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + public boolean evictNearOnly(K key, @Nullable CacheEntryPredicate[] filter) { return super.evict(key, filter); } /** {@inheritDoc} */ @Override public void evictAll(Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { super.evictAll(keys, filter); dht().evictAll(keys, filter); @@ -464,7 +430,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @Override public boolean compact(K key, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { return super.compact(key, filter) | dht().compact(key, filter); } @@ -483,7 +449,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda */ @Nullable public V peekNearOnly(K key) { try { - GridTuple<V> peek = peek0(true, key, SMART, CU.<K, V>empty()); + GridTuple<V> peek = peek0(true, key, SMART, CU.empty0()); return peek != null ? peek.get() : null; } @@ -496,7 +462,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public V peek(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>> filter) { + @Override public V peek(K key, @Nullable CacheEntryPredicate filter) { try { GridTuple<V> res = peek0(false, key, SMART, filter); @@ -535,13 +501,13 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public boolean clearLocally0(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Override public boolean clearLocally0(K key, @Nullable CacheEntryPredicate[] filter) { return super.clearLocally0(key, filter) | dht().clearLocally0(key, filter); } /** {@inheritDoc} */ @Override public void clearLocally0(Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { super.clearLocally0(keys, filter); dht().clearLocally0(keys, filter); @@ -759,7 +725,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda currIter.remove(); try { - GridNearCacheAdapter.this.remove(currEntry.getKey(), CU.<K, V>empty()); + GridNearCacheAdapter.this.remove(currEntry.getKey(), CU.empty0()); } catch (IgniteCheckedException e) { throw new IgniteException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index c855b47..67001aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -97,7 +97,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B private IgniteLogger log; /** Filter. */ - private IgnitePredicate<Cache.Entry<K, V>>[] filter; + private CacheEntryPredicate[] filter; /** Transaction. */ @GridToStringExclude @@ -149,7 +149,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B boolean retval, long timeout, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { super(cctx.kernalContext(), CU.boolReducer()); assert keys != null; @@ -795,7 +795,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B try { entry = cctx.near().entryExx(key, topVer); - if (!cctx.isAll(entry.<K, V>wrapLazyValue(), filter)) { + if (!cctx.isAll(entry, filter)) { if (log.isDebugEnabled()) log.debug("Entry being locked did not pass filter (will not lock): " + entry); @@ -953,7 +953,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B final ClusterNode node = map.node(); if (filter != null && filter.length != 0) - req.filter((IgnitePredicate[])filter, cctx); + req.filter(filter, cctx); if (node.isLocal()) { req.miniId(IgniteUuid.randomUuid()); @@ -1035,7 +1035,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (inTx() && implicitTx() && tx.onePhaseCommit()) { boolean pass = res.filterResult(i); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty() : CU.alwaysFalse()); + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0()); } if (record) { @@ -1390,7 +1390,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (inTx() && implicitTx() && tx.onePhaseCommit()) { boolean pass = res.filterResult(i); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty() : CU.alwaysFalse()); + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0()); } entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index e521efb..d34f5a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -49,11 +49,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest { private IgniteUuid miniId; /** Filter. */ - private byte[][] filterBytes; - - /** Filter. */ @GridDirectTransient - private IgnitePredicate<Cache.Entry<Object, Object>>[] filter; + private CacheEntryPredicate[] filter; /** Implicit flag. */ private boolean implicitTx; @@ -225,7 +222,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** * @return Filter. */ - public IgnitePredicate<Cache.Entry<Object, Object>>[] filter() { + public CacheEntryPredicate[] filter() { return filter; } @@ -234,7 +231,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param ctx Context. * @throws IgniteCheckedException If failed. */ - public void filter(IgnitePredicate<Cache.Entry<Object, Object>>[] filter, GridCacheContext ctx) + public void filter(CacheEntryPredicate[] filter, GridCacheContext ctx) throws IgniteCheckedException { this.filter = filter; } @@ -307,16 +304,24 @@ public class GridNearLockRequest extends GridDistributedLockRequest { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (filterBytes == null) - filterBytes = marshalFilter(filter, ctx); + if (filter != null) { + GridCacheContext cctx = ctx.cacheContext(cacheId); + + for (CacheEntryPredicate p : filter) + p.prepareMarshal(cctx); + } } /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (filter == null && filterBytes != null) - filter = unmarshalFilter(filterBytes, ctx, ldr); + if (filter != null) { + GridCacheContext cctx = ctx.cacheContext(cacheId); + + for (CacheEntryPredicate p : filter) + p.finishUnmarshal(cctx, ldr); + } } /** {@inheritDoc} */ @@ -346,12 +351,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest { writer.incrementState(); - case 24: - if (!writer.writeObjectArray("filterBytes", filterBytes, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - case 25: if (!writer.writeBoolean("hasTransforms", hasTransforms)) return false; @@ -438,14 +437,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 24: - filterBytes = reader.readObjectArray("filterBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - case 25: hasTransforms = reader.readBoolean("hasTransforms"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 9b15b40..ee80a00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -416,7 +416,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> boolean retval, TransactionIsolation isolation, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter + CacheEntryPredicate[] filter ) { GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx, keys, @@ -467,7 +467,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> } /** {@inheritDoc} */ - @Override public void unlockAll(Collection<? extends K> keys, IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Override public void unlockAll(Collection<? extends K> keys, CacheEntryPredicate[] filter) { if (keys.isEmpty()) return; @@ -486,7 +486,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> GridDistributedCacheEntry entry = peekExx(cacheKey); - if (entry == null || !ctx.isAll(entry.<K, V>wrapLazyValue(), filter)) + if (entry == null || !ctx.isAll(entry, filter)) break; // While. try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ef2899a..6b16aa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -1096,7 +1096,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /*retval*/false, isolation, accessTtl, - CU.empty()); + CU.empty0()); return new GridEmbeddedFuture<>( fut, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index ccd4c8c..097590a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -292,7 +292,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { cached.unswap(); try { - if (cached.peek(GLOBAL, CU.empty()) == null && cached.evictInternal(false, xidVer, null)) { + if (cached.peek(GLOBAL, CU.empty0()) == null && cached.evictInternal(false, xidVer, null)) { evicted.add(entry.txKey()); return false; @@ -346,7 +346,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { else { cached.unswap(); - if (cached.peek(GLOBAL, CU.empty()) == null && cached.evictInternal(false, xidVer, null)) { + if (cached.peek(GLOBAL, CU.empty0()) == null && cached.evictInternal(false, xidVer, null)) { cached.context().cache().removeIfObsolete(key.key()); evicted.add(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 4c59437..5eb4284 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -112,13 +112,13 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { TransactionIsolation isolation, boolean invalidate, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { return lockAllAsync(keys, timeout, tx, filter); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { IgniteTxLocalEx tx = ctx.tm().localTx(); return lockAllAsync(ctx.cacheKeysView(keys), timeout, tx, filter); @@ -134,7 +134,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { public IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys, long timeout, @Nullable IgniteTxLocalEx tx, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), true); @@ -186,7 +186,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void unlockAll(Collection<? extends K> keys, - IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + CacheEntryPredicate[] filter) throws IgniteCheckedException { long topVer = ctx.affinity().affinityTopologyVersion(); for (K key : keys) {