# ignite-51-filters
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6febd89a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6febd89a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6febd89a Branch: refs/heads/ignite-51-filters Commit: 6febd89af5768c7977ab26a3bea3b277a66b74b9 Parents: 90172c7 Author: sboikov <sboi...@gridgain.com> Authored: Tue Mar 3 18:53:16 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Mar 3 18:53:16 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheEntryPredicate.java | 42 ++ .../cache/CacheEntryPredicateAdapter.java | 66 +++ .../cache/CacheEntryPredicateContainsValue.java | 99 ++++ .../cache/CacheEntryPredicateHasValue.java | 28 ++ .../cache/CacheEntryPredicateNoValue.java | 28 ++ .../cache/CacheEntrySerializablePredicate.java | 100 ++++ .../processors/cache/CacheProjection.java | 80 +-- .../processors/cache/EvictableEntryImpl.java | 2 +- .../processors/cache/GridCacheAdapter.java | 275 ++++------- .../cache/GridCacheClearAllRunnable.java | 2 +- .../cache/GridCacheConcurrentMap.java | 64 +-- .../processors/cache/GridCacheContext.java | 77 +-- .../cache/GridCacheDeploymentManager.java | 107 ++-- .../processors/cache/GridCacheEntryEx.java | 22 +- .../cache/GridCacheEvictionManager.java | 29 +- .../processors/cache/GridCacheKeySet.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 62 +-- .../processors/cache/GridCacheMessage.java | 18 +- .../processors/cache/GridCacheProjectionEx.java | 24 +- .../cache/GridCacheProjectionImpl.java | 493 ++++--------------- .../processors/cache/GridCacheProxyImpl.java | 93 ++-- .../processors/cache/GridCacheUtils.java | 84 +++- .../cache/GridCacheValueCollection.java | 4 +- .../processors/cache/IgniteCacheProxy.java | 4 +- .../GridDistributedCacheAdapter.java | 6 +- .../GridDistributedTxRemoteAdapter.java | 2 +- .../distributed/dht/GridDhtCacheAdapter.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 4 +- .../dht/GridDhtTransactionalCacheAdapter.java | 10 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 8 +- .../dht/atomic/GridDhtAtomicCache.java | 69 ++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 30 +- .../dht/colocated/GridDhtColocatedCache.java | 12 +- .../colocated/GridDhtColocatedLockFuture.java | 6 +- .../distributed/near/GridNearAtomicCache.java | 36 +- .../distributed/near/GridNearCacheAdapter.java | 68 +-- .../distributed/near/GridNearLockFuture.java | 12 +- .../distributed/near/GridNearLockRequest.java | 39 +- .../near/GridNearTransactionalCache.java | 6 +- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../distributed/near/GridNearTxRemote.java | 4 +- .../processors/cache/local/GridLocalCache.java | 8 +- .../cache/local/GridLocalLockFuture.java | 4 +- .../local/atomic/GridLocalAtomicCache.java | 102 ++-- .../query/GridCacheDistributedQueryManager.java | 4 +- .../cache/query/GridCacheLocalQueryFuture.java | 3 +- .../cache/query/GridCacheQueriesImpl.java | 4 +- .../cache/query/GridCacheQueryAdapter.java | 8 +- .../cache/query/GridCacheQueryInfo.java | 7 +- .../cache/query/GridCacheQueryManager.java | 42 +- .../cache/query/GridCacheQueryRequest.java | 6 +- .../cache/transactions/IgniteInternalTx.java | 2 +- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxEntry.java | 10 +- .../transactions/IgniteTxLocalAdapter.java | 52 +- .../cache/transactions/IgniteTxLocalEx.java | 4 +- .../org/apache/ignite/internal/util/F0.java | 12 + .../ignite/internal/util/lang/GridFunc.java | 52 -- .../cache/GridCacheAbstractFullApiSelfTest.java | 6 +- .../processors/cache/GridCacheTestEntryEx.java | 22 +- .../distributed/GridCacheEventAbstractTest.java | 12 +- ...achePartitionedMultiNodeFullApiSelfTest.java | 10 +- ...dCacheAbstractReduceFieldsQuerySelfTest.java | 9 +- .../GridCacheAbstractFieldsQuerySelfTest.java | 34 -- .../cache/GridCacheAbstractQuerySelfTest.java | 20 +- .../cache/GridCacheCrossCacheQuerySelfTest.java | 17 - .../cache/spring/SpringDynamicCacheManager.java | 10 +- 68 files changed, 1225 insertions(+), 1366 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicate.java new file mode 100644 index 0000000..ecccd7b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicate.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.io.*; + +/** + * + */ +public interface CacheEntryPredicate extends IgnitePredicate<GridCacheEntryEx>, Message, Serializable { + /** + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException; + + /** + * @param ctx Context. + * @param ldr Class loader. + * @throws IgniteCheckedException If failed. + */ + public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java new file mode 100644 index 0000000..c11de54 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; + +/** + * + */ +public abstract class CacheEntryPredicateAdapter implements CacheEntryPredicate { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { + assert false; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { + assert false; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + assert false; + + return 0; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + assert false; + + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + assert false; + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java new file mode 100644 index 0000000..7aff5a0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; + +/** + * + */ +public class CacheEntryPredicateContainsValue implements CacheEntryPredicate { + /** */ + @GridToStringInclude + private CacheObject val; + + /** + * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}. + */ + public CacheEntryPredicateContainsValue() { + // No-op. + } + + /** + * + * @param val Value to compare with. + */ + public CacheEntryPredicateContainsValue(CacheObject val) { + assert val != null; + + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean apply(GridCacheEntryEx entry) { + try { + CacheObject val = entry.rawGetOrUnmarshal(true); + + return F.eq(this.val.value(entry.context(), false), CU.value(val, entry.context(), false)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { + val.finishUnmarshal(ctx, ldr); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { + val.prepareMarshal(ctx.cacheObjectContext()); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + return false; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 0; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheEntryPredicateContainsValue.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java new file mode 100644 index 0000000..173c6e9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * + */ +public class CacheEntryPredicateHasValue extends CacheEntryPredicateAdapter { + /** {@inheritDoc} */ + @Override public boolean apply(GridCacheEntryEx e) { + return e.hasValue(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java new file mode 100644 index 0000000..6a4df21 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * + */ +public class CacheEntryPredicateNoValue extends CacheEntryPredicateAdapter { + /** {@inheritDoc} */ + @Override public boolean apply(GridCacheEntryEx e) { + return !e.hasValue(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java new file mode 100644 index 0000000..73b0789 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; + +/** + * + */ +public class CacheEntrySerializablePredicate implements CacheEntryPredicate { + /** */ + @GridToStringInclude + @GridDirectTransient + private CacheEntryPredicate[] p; + + /** */ + private byte[] bytes; + + /** + * + */ + public CacheEntrySerializablePredicate() { + // No-op. + } + + /** + * @param p Predicate. + */ + public CacheEntrySerializablePredicate(CacheEntryPredicate... p) { + assert p != null; + + this.p = p; + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { + assert bytes != null; + + p = ctx.marshaller().unmarshal(bytes, ldr); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { + assert p != null; + + bytes = ctx.marshaller().marshal(p); + } + + /** {@inheritDoc} */ + @Override public boolean apply(GridCacheEntryEx e) { + assert p != null; + + for (CacheEntryPredicate p0 : p) { + if (!p0.apply(e)) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + return false; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 0; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java index 74dfebd..24fd33f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java @@ -235,22 +235,6 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { public <K1, V1> CacheProjection<K1, V1> projection(Class<? super K1> keyType, Class<? super V1> valType); /** - * Gets cache projection based on given key-value predicate. Whenever makes sense, - * this predicate will be used to pre-filter cache operations. If - * operation passed pre-filtering, this filter will be passed through - * to cache operations as well. - * <p> - * For example, for {@link #putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} method only - * elements that pass the filter will be given to {@code Cache.putAll(m, filter)} - * where it will be checked once again prior to put. - * - * @param p Key-value predicate for this projection. If {@code null}, then the - * same projection is returned. - * @return Projection for given key-value predicate. - */ - public CacheProjection<K, V> projection(@Nullable IgniteBiPredicate<K, V> p); - - /** * Gets cache projection based on given entry filter. This filter will be simply passed through * to all cache operations on this projection. Unlike {@link #projection(org.apache.ignite.lang.IgniteBiPredicate)} * method, this filter will <b>not</b> be used for pre-filtering. @@ -260,7 +244,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * will be {@code 'anded'}. * @return Projection based on given filter. */ - public CacheProjection<K, V> projection(@Nullable IgnitePredicate<Cache.Entry<K, V>> filter); + public CacheProjection<K, V> projection(@Nullable CacheEntryPredicate filter); /** * Gets cache projection base on this one, but with the specified flags turned on. @@ -370,30 +354,6 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { public boolean containsValue(V val); /** - * Executes visitor closure on each cache element. - * <h2 class="header">Transactions</h2> - * This method is not transactional and will not enlist keys into transaction simply - * because they were visited. However, if you perform transactional operations on the - * visited entries, those operations will enlist the entry into transaction. - * - * @param vis Closure which will be invoked for each cache entry. - */ - public void forEach(IgniteInClosure<Cache.Entry<K, V>> vis); - - /** - * Tests whether the predicate holds for all entries. If cache is empty, - * then {@code true} is returned. - * <h2 class="header">Transactions</h2> - * This method is not transactional and will not enlist keys into transaction simply - * because they were visited. However, if you perform transactional operations on the - * visited entries, those operations will enlist the entry into transaction. - * - * @param vis Predicate to test for each cache entry. - * @return {@code True} if the given predicate holds for all visited entries, {@code false} otherwise. - */ - public boolean forAll(IgnitePredicate<Cache.Entry<K, V>> vis); - - /** * Reloads a single key from persistent storage. This method * delegates to {@link CacheStore#load(Transaction, Object)} * method. @@ -647,7 +607,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws IgniteCheckedException If put operation failed. * @throws CacheFlagException If projection flags validation failed. */ - @Nullable public V put(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) + @Nullable public V put(K key, V val, @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** @@ -683,7 +643,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws NullPointerException If either key or value are {@code null}. * @throws CacheFlagException If projection flags validation failed. */ - public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable CacheEntryPredicate... filter); /** * Stores given key-value pair in cache. If filters are provided, then entries will @@ -715,7 +675,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws IgniteCheckedException If put operation failed. * @throws CacheFlagException If projection flags validation failed. */ - public boolean putx(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) + public boolean putx(K key, V val, @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** @@ -747,7 +707,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws NullPointerException If either key or value are {@code null}. * @throws CacheFlagException If projection flags validation failed. */ - public IgniteInternalFuture<Boolean> putxAsync(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + public IgniteInternalFuture<Boolean> putxAsync(K key, V val, @Nullable CacheEntryPredicate... filter); /** * Stores given key-value pair in cache only if cache had no previous mapping for it. If cache @@ -1042,7 +1002,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws CacheFlagException If projection flags validation failed. */ public void putAll(@Nullable Map<? extends K, ? extends V> m, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** * Asynchronously stores given key-value pairs in cache. If filters are provided, then entries will @@ -1065,7 +1025,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws CacheFlagException If projection flags validation failed. */ public IgniteInternalFuture<?> putAllAsync(@Nullable Map<? extends K, ? extends V> m, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + @Nullable CacheEntryPredicate... filter); /** * Set of keys cached on this node. You can remove elements from this set, but you cannot add elements @@ -1097,7 +1057,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * that filter is checked atomically together with get operation. * @return Key set for this cache projection. */ - public Set<K> keySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + public Set<K> keySet(@Nullable CacheEntryPredicate... filter); /** * Set of keys for which this node is primary. @@ -1418,7 +1378,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws IgniteCheckedException If remove operation failed. * @throws CacheFlagException If projection flags validation failed. */ - @Nullable public V remove(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) + @Nullable public V remove(K key, @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** @@ -1449,7 +1409,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws NullPointerException if the key is {@code null}. * @throws CacheFlagException If projection flags validation failed. */ - public IgniteInternalFuture<V> removeAsync(K key, IgnitePredicate<Cache.Entry<K, V>>... filter); + public IgniteInternalFuture<V> removeAsync(K key, CacheEntryPredicate... filter); /** * Removes given key mapping from cache. @@ -1475,7 +1435,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws IgniteCheckedException If remove failed. * @throws CacheFlagException If projection flags validation failed. */ - public boolean removex(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) + public boolean removex(K key, @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** @@ -1503,7 +1463,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws CacheFlagException If projection flags validation failed. */ public IgniteInternalFuture<Boolean> removexAsync(K key, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + @Nullable CacheEntryPredicate... filter); /** * Removes given key mapping from cache if one exists and value is equal to the passed in value. @@ -1571,7 +1531,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws CacheFlagException If flags validation failed. */ public void removeAll(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** * Asynchronously removes given key mappings from cache for entries for which the optionally @@ -1594,7 +1554,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws CacheFlagException If flags validation failed. */ public IgniteInternalFuture<?> removeAllAsync(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + @Nullable CacheEntryPredicate... filter); /** * Removes mappings from cache for entries for which the optionally passed in filters do @@ -1653,7 +1613,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws IgniteCheckedException If lock acquisition resulted in error. * @throws CacheFlagException If flags validation failed. */ - public boolean lock(K key, long timeout, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) + public boolean lock(K key, long timeout, @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** @@ -1680,7 +1640,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws CacheFlagException If flags validation failed. */ public IgniteInternalFuture<Boolean> lockAsync(K key, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + @Nullable CacheEntryPredicate... filter); /** * All or nothing synchronous lock for passed in keys. This method @@ -1706,7 +1666,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws CacheFlagException If flags validation failed. */ public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** * All or nothing synchronous lock for passed in keys. This method @@ -1732,7 +1692,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws CacheFlagException If flags validation failed. */ public IgniteInternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); + @Nullable CacheEntryPredicate... filter); /** * Unlocks given key only if current thread owns the lock. If optional filter @@ -1752,7 +1712,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws IgniteCheckedException If unlock execution resulted in error. * @throws CacheFlagException If flags validation failed. */ - public void unlock(K key, IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + public void unlock(K key, CacheEntryPredicate... filter) throws IgniteCheckedException; /** * Unlocks given keys only if current thread owns the locks. Only the keys @@ -1775,7 +1735,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws CacheFlagException If flags validation failed. */ public void unlockAll(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException; + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException; /** * Checks if any node owns a lock for this key. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java index d4706b7..81abb7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java @@ -112,7 +112,7 @@ public class EvictableEntryImpl<K, V> implements EvictableEntry<K, V> { return null; try { - CacheObject val = e.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()); + CacheObject val = e.peek(GridCachePeekMode.GLOBAL, CU.empty0()); return val != null ? val.<V>value(cached.context(), false) : null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index cd8052d..2670854 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -377,7 +377,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public IgnitePredicate<Cache.Entry<K, V>> predicate() { + @Override public CacheEntryPredicate predicate() { return null; } @@ -387,7 +387,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ctx, null, null, - null, subjId, false, null); @@ -403,7 +402,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, null, - null, EnumSet.copyOf(F.asList(flags)), null, false, @@ -436,7 +434,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, null, null, null, - null, ctx.portableEnabled(), null ); @@ -455,7 +452,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, null, null, null, - null, ctx.portableEnabled(), plc); } @@ -477,8 +473,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>((CacheProjection<K1, V1>)this, (GridCacheContext<K1, V1>)ctx, - CU.<K1, V1>typeFilter(keyType, valType), - /*filter*/null, + CU.typeFilter0(keyType, valType), /*flags*/null, /*clientId*/null, false, @@ -488,33 +483,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public CacheProjection<K, V> projection(IgniteBiPredicate<K, V> p) { - if (p == null) - return this; - - if (ctx.deploymentEnabled()) { - try { - ctx.deploy().registerClasses(p); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, - ctx, - p, - null, - null, - null, - false, - null); - - return new GridCacheProxyImpl<>(ctx, prj, prj); - } - - /** {@inheritDoc} */ - @Override public CacheProjection<K, V> projection(IgnitePredicate<Cache.Entry<K, V>> filter) { + @Override public CacheProjection<K, V> projection(CacheEntryPredicate filter) { if (filter == null) return this; @@ -530,7 +499,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>( this, ctx, - null, filter, null, null, @@ -566,7 +534,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, TransactionIsolation isolation, boolean invalidate, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter); + CacheEntryPredicate[] filter); /** * Post constructor initialization for subclasses. @@ -925,7 +893,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public V peek(K key) { - return peek(key, (IgnitePredicate<Cache.Entry<K, V>>) null); + return peek(key, (CacheEntryPredicate) null); } /** {@inheritDoc} */ @@ -942,7 +910,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @throws GridCacheFilterFailedException If filter failed. */ @Nullable protected GridTuple<V> peek0(boolean failFast, K key, GridCachePeekMode mode, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws GridCacheFilterFailedException { + @Nullable CacheEntryPredicate... filter) throws GridCacheFilterFailedException { A.notNull(key, "key"); if (keyCheck) @@ -1037,7 +1005,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, @Nullable protected GridTuple<V> peek0(boolean failFast, K key, @Nullable Collection<GridCachePeekMode> modes, IgniteInternalTx tx) throws IgniteCheckedException, GridCacheFilterFailedException { if (F.isEmpty(modes)) - return F.t(peek(key, (IgnitePredicate<Cache.Entry<K, V>>)null)); + return F.t(peek(key, (CacheEntryPredicate)null)); assert modes != null; @@ -1138,25 +1106,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return ret; } - /** {@inheritDoc} */ - @Override public void forEach(IgniteInClosure<Cache.Entry<K, V>> vis) { - A.notNull(vis, "vis"); - - for (Cache.Entry<K, V> e : entrySet()) - vis.apply(e); - } - - /** {@inheritDoc} */ - @Override public boolean forAll(IgnitePredicate<Cache.Entry<K, V>> vis) { - A.notNull(vis, "vis"); - - for (Cache.Entry<K, V> e : entrySet()) - if (!vis.apply(e)) - return false; - - return true; - } - /** * Undeploys and removes all entries for class loader. * @@ -1287,21 +1236,21 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> entrySet() { - return entrySet((IgnitePredicate<Cache.Entry<K, V>>[])null); + return entrySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> entrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { return map.entriesx(filter); } /** {@inheritDoc} */ - @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(CacheEntryPredicate... filter) { return map.entriesx( - F.and( + F0.and0( filter, - CU.<K, V>cachePrimary(ctx.grid().<K>affinity(ctx.name()), ctx.localNode()))); + CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()))); } /** {@inheritDoc} */ @@ -1311,32 +1260,32 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> primaryEntrySet() { - return primaryEntrySet((IgnitePredicate<Cache.Entry<K, V>>[]) null); + return primaryEntrySet((CacheEntryPredicate[]) null); } /** {@inheritDoc} */ @Override public Set<K> keySet() { - return keySet((IgnitePredicate<Cache.Entry<K, V>>[]) null); + return keySet((CacheEntryPredicate[]) null); } /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { - return primaryKeySet((IgnitePredicate<Cache.Entry<K, V>>[]) null); + return primaryKeySet((CacheEntryPredicate[]) null); } /** {@inheritDoc} */ @Override public Collection<V> values() { - return values((IgnitePredicate<Cache.Entry<K, V>>[]) null); + return values((CacheEntryPredicate[]) null); } /** {@inheritDoc} */ - public Collection<V> values(IgnitePredicate<Cache.Entry<K, V>>... filter) { + public Collection<V> values(CacheEntryPredicate... filter) { return map.values(filter); } /** {@inheritDoc} */ @Override public Collection<V> primaryValues() { - return primaryValues((IgnitePredicate<Cache.Entry<K, V>>[])null); + return primaryValues((CacheEntryPredicate[])null); } /** @@ -1437,19 +1386,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param keys Keys. * @param readers Readers flag. */ - public void clearLocally(Collection<? extends K> keys, boolean readers) { + public void clearLocally(Collection<KeyCacheObject> keys, boolean readers) { if (F.isEmpty(keys)) return; - if (keyCheck) - validateCacheKeys(keys); - GridCacheVersion obsoleteVer = ctx.versions().next(); - for (K key : keys) { - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - - GridCacheEntryEx e = peekEx(cacheKey); + for (KeyCacheObject key : keys) { + GridCacheEntryEx e = peekEx(key); try { if (e != null) @@ -1471,7 +1415,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @return {@code True} if cleared. */ private boolean clearLocally(GridCacheVersion obsoleteVer, K key, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { try { KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); @@ -1551,7 +1495,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public boolean compact(K key) throws IgniteCheckedException { - return compact(key, (IgnitePredicate<Cache.Entry<K, V>>[])null); + return compact(key, (CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -1575,7 +1519,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @return {@code True} if entry was evicted. */ private boolean evictx(K key, GridCacheVersion ver, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); GridCacheEntryEx entry = peekEx(cacheKey); @@ -1595,7 +1539,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public V get(K key, @Nullable GridCacheEntryEx entry, boolean deserializePortable, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { String taskName = ctx.kernalContext().job().currentTaskName(); return getAllAsync(F.asList(key), !ctx.config().isReadFromBackup(), /*skip tx*/false, entry, null, taskName, @@ -2046,7 +1990,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public boolean evict(K key) { - return evict(key, (IgnitePredicate<Cache.Entry<K, V>>[])null); + return evict(key, (CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -2056,7 +2000,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public void evictAll(Collection<? extends K> keys) { - evictAll(keys, (IgnitePredicate<Cache.Entry<K, V>>[])null); + evictAll(keys, (CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -2500,7 +2444,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public V put(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) + @Override public V put(K key, V val, @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { return put(key, val, null, -1, filter); } @@ -2510,7 +2454,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final V val, @Nullable final GridCacheEntryEx cached, final long ttl, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Nullable final CacheEntryPredicate[] filter) throws IgniteCheckedException { boolean statsEnabled = ctx.config().isStatisticsEnabled(); @@ -2546,7 +2490,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public boolean putx(final K key, final V val, @Nullable final GridCacheEntryEx cached, - final long ttl, @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + final long ttl, @Nullable final CacheEntryPredicate... filter) throws IgniteCheckedException { A.notNull(key, "key", val, "val"); if (keyCheck) @@ -2569,7 +2513,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> putAsync(K key, V val, - @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { + @Nullable CacheEntryPredicate[] filter) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -2584,7 +2528,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> putAsync(final K key, final V val, @Nullable final GridCacheEntryEx entry, - final long ttl, @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) { + final long ttl, @Nullable final CacheEntryPredicate... filter) { A.notNull(key, "key", val, "val"); if (keyCheck) @@ -2608,7 +2552,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public boolean putx(final K key, final V val, - final IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + final CacheEntryPredicate[] filter) throws IgniteCheckedException { boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; @@ -2897,7 +2841,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -2913,7 +2857,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> putxAsync(final K key, final V val, @Nullable final GridCacheEntryEx entry, final long ttl, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable final CacheEntryPredicate... filter) { A.notNull(key, "key", val, "val"); if (keyCheck) @@ -2948,7 +2892,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return ctx.cloneOnFlag(syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - CacheObject prev = tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noPeekArray()).get().value(); + CacheObject prev = tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value(); return CU.value(prev, ctx, true); } @@ -2976,7 +2920,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteInternalFuture<V> fut = ctx.wrapClone(asyncOp(new AsyncOp<V>(key) { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noPeekArray()) + return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn<CacheObject>>, V>)RET2VAL); } @@ -3008,7 +2952,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, Boolean stored = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noPeekArray()).get().success(); + return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).get().success(); } @Override public String toString() { @@ -3039,7 +2983,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>(key) { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noPeekArray()).chain( + return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn<CacheObject>>, Boolean>)RET2FLAG); } @@ -3067,7 +3011,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return ctx.cloneOnFlag(syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - CacheObject prev = tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.hasPeekArray()).get().value(); + CacheObject prev = tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.hasValArray()).get().value(); return CU.value(prev, ctx, true); } @@ -3095,7 +3039,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteInternalFuture<V> fut = ctx.wrapClone(asyncOp(new AsyncOp<V>(key) { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.hasPeekArray()).chain( + return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.hasValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn<CacheObject>>, V>)RET2VAL); } @@ -3123,7 +3067,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasPeekArray()).get().success(); + return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).get().success(); } @Override public String toString() { @@ -3145,7 +3089,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return asyncOp(new AsyncOp<Boolean>(key) { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasPeekArray()).chain( + return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn<CacheObject>>, Boolean>)RET2FLAG); } @@ -3174,12 +3118,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - V oldVal0 = oldVal; - - if (ctx.portableEnabled()) - oldVal0 = (V)ctx.marshalToPortable(oldVal); - - return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsPeekArray(oldVal0)).get() + return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsValArray(oldVal)).get() .success(); } @@ -3218,12 +3157,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } } - V oldVal0 = oldVal; - - if (ctx.portableEnabled()) - oldVal0 = (V)ctx.marshalToPortable(oldVal); - - return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsPeekArray(oldVal0)).chain( + return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsValArray(oldVal)).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn<CacheObject>>, Boolean>)RET2FLAG); } @@ -3240,7 +3174,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public void putAll(@Nullable final Map<? extends K, ? extends V> m, - final IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { + final CacheEntryPredicate[] filter) throws IgniteCheckedException { boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; @@ -3271,7 +3205,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> putAllAsync(final Map<? extends K, ? extends V> m, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable final CacheEntryPredicate... filter) { if (F.isEmpty(m)) return new GridFinishedFuture<Object>(ctx.kernalContext()); @@ -3294,14 +3228,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Nullable @Override public V remove(K key, IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Nullable @Override public V remove(K key, CacheEntryPredicate[] filter) throws IgniteCheckedException { return remove(key, null, filter); } /** {@inheritDoc} */ @Override public V remove(final K key, @Nullable final GridCacheEntryEx entry, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable final CacheEntryPredicate... filter) throws IgniteCheckedException { boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; @@ -3337,7 +3271,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<V> removeAsync(K key, IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public IgniteInternalFuture<V> removeAsync(K key, CacheEntryPredicate... filter) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -3352,7 +3286,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> removeAsync(final K key, @Nullable final GridCacheEntryEx entry, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable final CacheEntryPredicate... filter) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -3384,7 +3318,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public void removeAll(final Collection<? extends K> keys, - final IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + final CacheEntryPredicate... filter) throws IgniteCheckedException { boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; @@ -3426,7 +3360,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable final Collection<? extends K> keys, - final IgnitePredicate<Cache.Entry<K, V>>... filter) { + final CacheEntryPredicate... filter) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -3456,23 +3390,23 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public boolean removex(final K key, final IgnitePredicate<Cache.Entry<K, V>>... filter) + @Override public boolean removex(final K key, final CacheEntryPredicate... filter) throws IgniteCheckedException { boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; - boolean removed = removex(key, null, filter); + boolean rmv = removex(key, null, filter); - if (statsEnabled && removed) + if (statsEnabled && rmv) metrics0().addRemoveTimeNanos(System.nanoTime() - start); - return removed; + return rmv; } /** {@inheritDoc} */ @Override public boolean removex(final K key, @Nullable final GridCacheEntryEx entry, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable final CacheEntryPredicate... filter) throws IgniteCheckedException { boolean statsEnabled = ctx.config().isStatisticsEnabled(); long start = statsEnabled ? System.nanoTime() : 0L; @@ -3484,7 +3418,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (keyCheck) validateCacheKey(key); - boolean removed = syncOp(new SyncOp<Boolean>(true) { + boolean rmv = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.removeAllAsync(ctx, Collections.singletonList(key), entry, false, filter).get().success(); } @@ -3494,14 +3428,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } }); - if (statsEnabled && removed) + if (statsEnabled && rmv) metrics0().addRemoveTimeNanos(System.nanoTime() - start); - return removed; + return rmv; } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> removexAsync(K key, IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public IgniteInternalFuture<Boolean> removexAsync(K key, CacheEntryPredicate... filter) { A.notNull(key, "key"); return removexAsync(key, null, filter); @@ -3509,7 +3443,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> removexAsync(final K key, @Nullable final GridCacheEntryEx entry, - @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable final CacheEntryPredicate... filter) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -3557,7 +3491,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, Collections.singletonList(key), null, true, - ctx.vararg(F.<K, V>cacheContainsPeek(val))).get(); + ctx.equalsValArray(val)).get(); CacheObject val = (CacheObject)ret.value(); @@ -3632,7 +3566,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ctx.deploy().registerClass(oldVal); GridCacheReturn ret = - tx.putAllAsync(ctx, F.t(key, newVal), true, null, -1, ctx.equalsPeekArray(oldVal)).get(); + tx.putAllAsync(ctx, F.t(key, newVal), true, null, -1, ctx.equalsValArray(oldVal)).get(); CacheObject val = (CacheObject)ret.value(); @@ -3671,7 +3605,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, Collections.singletonList(key), null, true, - ctx.vararg(F.<K, V>cacheContainsPeek(val))); + ctx.equalsValArray(val)); return fut.chain(new CX1<IgniteInternalFuture<GridCacheReturn<CacheObject>>, GridCacheReturn<V>>() { @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<GridCacheReturn<CacheObject>> fut) @@ -3721,7 +3655,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, true, null, -1, - ctx.equalsPeekArray(oldVal)); + ctx.equalsValArray(oldVal)); return fut.chain(new CX1<IgniteInternalFuture<GridCacheReturn<CacheObject>>, GridCacheReturn<V>>() { @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<GridCacheReturn<CacheObject>> fut) @@ -3765,15 +3699,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ctx.deploy().registerClass(val); K key0 = key; - V val0 = val; - if (ctx.portableEnabled()) { + if (ctx.portableEnabled()) key0 = (K)ctx.marshalToPortable(key); - val0 = (V)ctx.marshalToPortable(val); - } return tx.removeAllAsync(ctx, Collections.singletonList(key0), null, false, - ctx.vararg(F.<K, V>cacheContainsPeek(val0))).get().success(); + ctx.equalsValArray(val)).get().success(); } @Override public String toString() { @@ -3815,12 +3746,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } K key0 = key; - V val0 = val; if (ctx.portableEnabled()) { try { key0 = (K)ctx.marshalToPortable(key); - val0 = (V)ctx.marshalToPortable(val); } catch (IgniteException e) { return new GridFinishedFuture<>(ctx.kernalContext(), e); @@ -3828,7 +3757,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } return tx.removeAllAsync(ctx, Collections.singletonList(key0), null, false, - ctx.vararg(F.<K, V>cacheContainsPeek(val0))).chain( + ctx.equalsValArray(val)).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn<CacheObject>>, Boolean>)RET2FLAG); } @@ -3847,7 +3776,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param filter Filter. * @return Future. */ - public IgniteInternalFuture<?> localRemoveAll(final IgnitePredicate<Cache.Entry<K, V>> filter) { + public IgniteInternalFuture<?> localRemoveAll(final CacheEntryPredicate filter) { ctx.denyOnLocalRead(); final Set<? extends K> keys = filter != null ? keySet(filter) : keySet(); @@ -3894,7 +3823,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public boolean lock(K key, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { A.notNull(key, "key"); return lockAll(Collections.singletonList(key), timeout, filter); @@ -3902,7 +3831,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { if (F.isEmpty(keys)) return true; @@ -3932,7 +3861,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAsync(K key, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); if (keyCheck) @@ -3942,7 +3871,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public void unlock(K key, IgnitePredicate<Cache.Entry<K, V>>... filter) + @Override public void unlock(K key, CacheEntryPredicate... filter) throws IgniteCheckedException { A.notNull(key, "key"); @@ -5103,7 +5032,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param filter Filters to evaluate. */ public void clearLocally0(Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { ctx.denyOnFlag(READ); ctx.checkSecurity(GridSecurityPermission.CACHE_REMOVE); @@ -5124,7 +5053,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param filter Filters to evaluate. * @return {@code True} if cleared. */ - public boolean clearLocally0(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + public boolean clearLocally0(K key, @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); if (keyCheck) @@ -5142,7 +5071,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @return {@code True} if compacted. * @throws IgniteCheckedException If failed. */ - public boolean compact(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) + public boolean compact(K key, @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { ctx.denyOnFlag(READ); @@ -5175,7 +5104,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param filter Filters to evaluate. * @return {@code True} if evicted. */ - public boolean evict(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + public boolean evict(K key, @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); if (keyCheck) @@ -5191,7 +5120,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param filter Filters to evaluate. */ public void evictAll(Collection<? extends K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { A.notNull(keys, "keys"); ctx.denyOnFlag(READ); @@ -5223,7 +5152,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param filter Filter to evaluate. * @return Peeked value. */ - public V peek(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>> filter) { + public V peek(K key, @Nullable CacheEntryPredicate filter) { try { GridTuple<V> peek = peek0(false, key, SMART, filter); @@ -5242,44 +5171,27 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param filter Filters to evaluate. * @return Entry set. */ - public Set<Cache.Entry<K, V>> entrySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + public Set<Cache.Entry<K, V>> entrySet(@Nullable CacheEntryPredicate... filter) { return map.entries(filter); } /** - * @param keys Keys. - * @param keyFilter Key filter. - * @param filter Entry filter. - * @return Entry set. - */ - public Set<Cache.Entry<K, V>> entrySet(@Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<K> keyFilter, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { - if (F.isEmpty(keys)) - return emptySet(); - - if (keyCheck) - validateCacheKeys(keys); - - return new GridCacheEntrySet<>(ctx, F.viewReadOnly(keys, CU.cacheKey2Entry(ctx), keyFilter), filter); - } - - /** * @param filter Filters to evaluate. * @return Primary entry set. */ public Set<Cache.Entry<K, V>> primaryEntrySet( - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { return map.entries( - F.and( + F0.and0( filter, - CU.<K, V>cachePrimary(ctx.grid().<K>affinity(ctx.name()), ctx.localNode()))); + CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()))); } /** * @param filter Filters to evaluate. * @return Key set. */ - @Override public Set<K> keySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Override public Set<K> keySet(@Nullable CacheEntryPredicate... filter) { return map.keySet(filter); } @@ -5287,22 +5199,22 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param filter Primary key set. * @return Primary key set. */ - public Set<K> primaryKeySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + public Set<K> primaryKeySet(@Nullable CacheEntryPredicate... filter) { return map.keySet( - F.and( + F0.and0( filter, - CU.<K, V>cachePrimary(ctx.grid().<K>affinity(ctx.name()), ctx.localNode()))); + CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()))); } /** * @param filter Filters to evaluate. * @return Primary values. */ - public Collection<V> primaryValues(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + public Collection<V> primaryValues(@Nullable CacheEntryPredicate... filter) { return map.values( - F.and( + F0.and0( filter, - CU.<K, V>cachePrimary(ctx.grid().<K>affinity(ctx.name()), ctx.localNode()))); + CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()))); } /** @@ -5311,7 +5223,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @throws IgniteCheckedException If failed. */ public void compactAll(@Nullable Iterable<K> keys, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException { + @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException { ctx.denyOnFlag(READ); if (keys != null) { @@ -5399,7 +5311,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, while (true) { try { - // TODO IGNITE-51. KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); // Do not reload near entries, they will be reloaded in DHT cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java index 6b288b5..5a7b183 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java @@ -133,7 +133,7 @@ public class GridCacheClearAllRunnable<K, V> implements Runnable { */ protected void clearEntry(GridCacheEntryEx e) { try { - e.clear(obsoleteVer, false, CU.<K, V>empty()); + e.clear(obsoleteVer, false, CU.empty0()); } catch (IgniteCheckedException ex) { U.error(log, "Failed to clearLocally entry from cache (will continue to clearLocally other entries): " + e, ex); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index 70b41c2..d9dce4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -88,15 +88,15 @@ public class GridCacheConcurrentMap { private final LongAdder mapSize = new LongAdder(); /** Filters cache internal entry. */ - private static final P1<Cache.Entry<?, ?>> NON_INTERNAL = - new P1<Cache.Entry<?, ?>>() { - @Override public boolean apply(Cache.Entry<?, ?> entry) { - return !(entry.getKey() instanceof GridCacheInternal); + private static final CacheEntryPredicate NON_INTERNAL = + new CacheEntrySerializablePredicate(new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx entry) { + return !entry.isInternal(); } - }; + }); /** Non-internal predicate array. */ - public static final IgnitePredicate[] NON_INTERNAL_ARR = new P1[] {NON_INTERNAL}; + public static final CacheEntryPredicate[] NON_INTERNAL_ARR = new CacheEntryPredicate[] {NON_INTERNAL}; /** Filters obsolete cache map entry. */ private final IgnitePredicate<GridCacheMapEntry> obsolete = @@ -315,17 +315,17 @@ public class GridCacheConcurrentMap { /** * @return Non-internal predicate. */ - private static <K, V> IgnitePredicate<Cache.Entry<K, V>>[] nonInternal() { - return (IgnitePredicate<Cache.Entry<K,V>>[])NON_INTERNAL_ARR; + private static CacheEntryPredicate[] nonInternal() { + return NON_INTERNAL_ARR; } /** * @param filter Filter to add to non-internal-key filter. * @return Non-internal predicate. */ - private static <K, V> IgnitePredicate<Cache.Entry<K, V>>[] nonInternal( - IgnitePredicate<Cache.Entry<K, V>>[] filter) { - return F.asArray(F0.and((IgnitePredicate<Cache.Entry<K, V>>[])NON_INTERNAL_ARR, filter)); + private static CacheEntryPredicate[] nonInternal( + CacheEntryPredicate[] filter) { + return F.asArray(F0.and0(NON_INTERNAL_ARR, filter)); } /** @@ -397,7 +397,7 @@ public class GridCacheConcurrentMap { * @param filter Filter. * @return a collection view of the values contained in this map. */ - public <K, V> Collection<V> allValues(IgnitePredicate<Cache.Entry<K, V>>[] filter) { + public <K, V> Collection<V> allValues(CacheEntryPredicate[] filter) { checkWeakQueue(); return new Values<>(this, filter); @@ -567,7 +567,7 @@ public class GridCacheConcurrentMap { * @return Set of the mappings contained in this map. */ @SuppressWarnings({"unchecked"}) - public <K, V> Set<Cache.Entry<K, V>> entries(IgnitePredicate<Cache.Entry<K, V>>... filter) { + public <K, V> Set<Cache.Entry<K, V>> entries(CacheEntryPredicate... filter) { checkWeakQueue(); return new EntrySet<>(this, filter); @@ -580,7 +580,7 @@ public class GridCacheConcurrentMap { * @return Set of the mappings contained in this map. */ @SuppressWarnings({"unchecked"}) - public <K, V> Set<Cache.Entry<K, V>> entriesx(IgnitePredicate<Cache.Entry<K, V>>... filter) { + public <K, V> Set<Cache.Entry<K, V>> entriesx(CacheEntryPredicate... filter) { checkWeakQueue(); return new EntrySet<>(this, filter, true); @@ -618,7 +618,7 @@ public class GridCacheConcurrentMap { public Set<GridCacheEntryEx> allEntries0() { checkWeakQueue(); - return new Set0<>(this, CU.empty()); + return new Set0<>(this, CU.empty0()); } /** @@ -627,7 +627,7 @@ public class GridCacheConcurrentMap { * @param filter Filter. * @return Set of the keys contained in this map. */ - public <K, V> Set<K> keySet(IgnitePredicate<Cache.Entry<K, V>>... filter) { + public <K, V> Set<K> keySet(CacheEntryPredicate... filter) { checkWeakQueue(); return new KeySet<>(this, filter); @@ -639,7 +639,7 @@ public class GridCacheConcurrentMap { * @param filter Filter. * @return Collection view of the values contained in this map. */ - public <K, V> Collection<V> values(IgnitePredicate<Cache.Entry<K, V>>... filter) { + public <K, V> Collection<V> values(CacheEntryPredicate... filter) { checkWeakQueue(); return allValues(filter); @@ -1552,7 +1552,7 @@ public class GridCacheConcurrentMap { private GridCacheMapEntry cur; /** Iterator filter. */ - private IgnitePredicate<Cache.Entry<K, V>>[] filter; + private CacheEntryPredicate[] filter; /** Outer cache map. */ private GridCacheConcurrentMap map; @@ -1585,7 +1585,7 @@ public class GridCacheConcurrentMap { */ @SuppressWarnings({"unchecked"}) Iterator0(GridCacheConcurrentMap map, boolean isVal, - IgnitePredicate<Cache.Entry<K, V>>[] filter, int id, int totalCnt) { + CacheEntryPredicate[] filter, int id, int totalCnt) { this.filter = filter; this.isVal = isVal; this.id = id; @@ -1764,7 +1764,7 @@ public class GridCacheConcurrentMap { @SuppressWarnings({"unchecked"}) @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { ctx = (GridCacheContext<K, V>)in.readObject(); - filter = (IgnitePredicate<Cache.Entry<K, V>>[])in.readObject(); + filter = (CacheEntryPredicate[])in.readObject(); isVal = in.readBoolean(); id = in.readInt(); totalCnt = in.readInt(); @@ -1790,7 +1790,7 @@ public class GridCacheConcurrentMap { private static final long serialVersionUID = 0L; /** Filter. */ - private IgnitePredicate<Cache.Entry<K, V>>[] filter; + private CacheEntryPredicate[] filter; /** Base map. */ private GridCacheConcurrentMap map; @@ -1818,7 +1818,7 @@ public class GridCacheConcurrentMap { * @param map Base map. * @param filter Filter. */ - private Set0(GridCacheConcurrentMap map, IgnitePredicate<Cache.Entry<K, V>>[] filter) { + private Set0(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) { assert map != null; this.map = map; @@ -1871,7 +1871,7 @@ public class GridCacheConcurrentMap { try { return e != null && !e.obsolete() && (!e.deleted() || e.lockedByThread()) && - F.isAll(e.<K, V>wrapLazyValue(), filter); + F.isAll(e, filter); } catch (GridCacheEntryRemovedException ignore) { return false; @@ -1940,7 +1940,7 @@ public class GridCacheConcurrentMap { /** {@inheritDoc} */ @Override public void clear() { - ctx.cache().clearLocally0(new KeySet<>(map, filter), CU.<K, V>empty()); + ctx.cache().clearLocally0(new KeySet<K, V>(map, filter)); } /** {@inheritDoc} */ @@ -1952,7 +1952,7 @@ public class GridCacheConcurrentMap { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { ctx = (GridCacheContext<K, V>)in.readObject(); - filter = (IgnitePredicate<Cache.Entry<K, V>>[])in.readObject(); + filter = (CacheEntryPredicate[])in.readObject(); } /** @@ -2003,7 +2003,7 @@ public class GridCacheConcurrentMap { */ EntryIterator( GridCacheConcurrentMap map, - IgnitePredicate<Cache.Entry<K, V>>[] filter, + CacheEntryPredicate[] filter, GridCacheContext<K, V> ctx, GridCacheProjectionImpl<K, V> prjPerCall, CacheFlag[] forcedFlags) { @@ -2092,7 +2092,7 @@ public class GridCacheConcurrentMap { */ private ValueIterator( GridCacheConcurrentMap map, - IgnitePredicate<Cache.Entry<K, V>>[] filter, + CacheEntryPredicate[] filter, GridCacheContext ctx, boolean clone) { it = new Iterator0<>(map, true, filter, -1, -1); @@ -2163,7 +2163,7 @@ public class GridCacheConcurrentMap { * @param map Cache map. * @param filter Filter. */ - private KeyIterator(GridCacheConcurrentMap map, IgnitePredicate<Cache.Entry<K, V>>[] filter) { + private KeyIterator(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) { it = new Iterator0<>(map, false, filter, -1, -1); } @@ -2215,7 +2215,7 @@ public class GridCacheConcurrentMap { * @param map Base map. * @param filter Key filter. */ - private KeySet(GridCacheConcurrentMap map, IgnitePredicate<Cache.Entry<K, V>>[] filter) { + private KeySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) { assert map != null; set = new Set0<>(map, nonInternal(filter)); @@ -2283,7 +2283,7 @@ public class GridCacheConcurrentMap { * @param map Base map. * @param filter Value filter. */ - private Values(GridCacheConcurrentMap map, IgnitePredicate<Cache.Entry<K, V>>[] filter) { + private Values(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) { assert map != null; set = new Set0<>(map, nonInternal(filter)); @@ -2343,7 +2343,7 @@ public class GridCacheConcurrentMap { * @param map Base map. * @param filter Key filter. */ - private EntrySet(GridCacheConcurrentMap map, IgnitePredicate<Cache.Entry<K, V>>[] filter) { + private EntrySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) { this(map, filter, false); } @@ -2352,7 +2352,7 @@ public class GridCacheConcurrentMap { * @param filter Key filter. * @param internal Whether to allow internal entries. */ - private EntrySet(GridCacheConcurrentMap map, IgnitePredicate<Cache.Entry<K, V>>[] filter, + private EntrySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter, boolean internal) { assert map != null;