# ignite-297
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7e8e0b0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7e8e0b0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7e8e0b0f Branch: refs/heads/ignite-285 Commit: 7e8e0b0f4041b55f70b09b66eeea3992e1fbc607 Parents: 78b8df5 Author: sboikov <sboi...@gridgain.com> Authored: Wed Feb 18 13:12:34 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Feb 18 16:10:28 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheProjection.java | 25 ------- .../processors/cache/GridCacheAdapter.java | 17 ----- .../cache/GridCacheProjectionImpl.java | 17 +++-- .../processors/cache/GridCacheProxyImpl.java | 12 ---- .../processors/cache/GridCacheUtils.java | 7 +- .../dht/atomic/GridDhtAtomicCache.java | 9 +-- .../distributed/near/GridNearAtomicCache.java | 5 -- .../local/atomic/GridLocalAtomicCache.java | 71 +++++++++++++++++--- .../cache/transactions/IgniteTxManager.java | 6 +- .../inmemory/GridTestSwapSpaceSpi.java | 2 +- 10 files changed, 80 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java index 9a57d8e..950043b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java @@ -1626,31 +1626,6 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { public IgniteInternalFuture<?> removeAllAsync(); /** - * Asynchronously removes mappings from cache for entries for which the optionally passed in filters do - * pass. If passed in filters are {@code null}, then all entries in cache will be enrolled - * into transaction. - * <p> - * <b>USE WITH CARE</b> - if your cache has many entries that pass through the filter or if filter - * is empty, then transaction will quickly become very heavy and slow. - * <p> - * If write-through is enabled, the values will be removed from {@link CacheStore} - * via {@link CacheStore#removeAll(Transaction, Collection)} method. - * <h2 class="header">Transactions</h2> - * This method is transactional and will enlist the entry into ongoing transaction - * if there is one. - * <h2 class="header">Cache Flags</h2> - * This method is not available if any of the following flags are set on projection: - * {@link CacheFlag#LOCAL}, {@link CacheFlag#READ}. - * - * @param filter Filter used to supply keys for remove operation (if {@code null}, - * then nothing will be removed). - * @return Future for the remove operation. The future will complete whenever - * remove operation completes. - * @throws CacheFlagException If flags validation failed. - */ - public IgniteInternalFuture<?> removeAllAsync(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter); - - /** * Synchronously acquires lock on a cached object with given * key only if the passed in filter (if any) passes. This method * together with filter check will be executed as one atomic operation. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 46e15f4..c4cfd28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3633,23 +3633,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllAsync(final IgnitePredicate<Cache.Entry<K, V>>... filter) { - ctx.denyOnLocalRead(); - - final Set<? extends K> keys = keySet(filter); - - return asyncOp(new AsyncInOp(keys) { - @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter<K, V> tx) { - return tx.removeAllAsync(ctx, keys, null, false, null); - } - - @Override public String toString() { - return "removeAllAsync [filter=" + Arrays.toString(filter) + ']'; - } - }); - } - - /** {@inheritDoc} */ @Override public CacheMetrics metrics() { return new CacheMetricsSnapshot(metrics); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index eb854d5..ef03ae6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -1052,12 +1052,18 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public boolean evict(K key) { - return cache.evict(key, entryFilter(true)); + if (predicate() != null) + return cache.evict(key, entryFilter(true)); + else + return cache.evict(key); } /** {@inheritDoc} */ @Override public void evictAll(@Nullable Collection<? extends K> keys) { - cache.evictAll(keys, entryFilter(true)); + if (predicate() != null) + cache.evictAll(keys, entryFilter(true)); + else + cache.evictAll(keys); } /** {@inheritDoc} */ @@ -1215,12 +1221,9 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync() { - return removeAllAsync(new IgnitePredicate[0]); - } + assert predicate() == null; - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { - return cache.removeAllAsync(and(filter, false)); + return cache.removeAllAsync(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 2a653af..95de174 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1640,18 +1640,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.removeAllAsync(filter); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Override public boolean lock(K key, long timeout, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index e78577a..feb74cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1167,12 +1167,7 @@ public class GridCacheUtils { * @throws IgniteCheckedException If execution failed. */ public static <T> T outTx(Callable<T> cmd, GridCacheContext ctx) throws IgniteCheckedException { - IgniteInternalTx<?, ?> tx = ctx.tm().txx(); - - boolean inTx = tx != null && tx.user() && - (tx.state() != UNKNOWN && tx.state() != ROLLED_BACK && tx.state() != COMMITTED); - - if (inTx) + if (ctx.tm().inUserTx()) return ctx.closures().callLocalSafe(cmd, false).get(); else { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 918f2e6..369dab8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -541,11 +541,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllAsync(IgnitePredicate<Cache.Entry<K, V>>[] filter) { - return removeAllAsync(keySet(filter), filter); - } - - /** {@inheritDoc} */ @Override public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { removeAllDrAsync(drMap).get(); } @@ -1386,7 +1381,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, expiry); - firstEntryIdx = i + 1; + firstEntryIdx = i; putMap = null; entryProcessorMap = null; @@ -1428,7 +1423,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, expiry); - firstEntryIdx = i + 1; + firstEntryIdx = i; rmvKeys = null; entryProcessorMap = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index f409e94..458648f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -640,11 +640,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllAsync(IgnitePredicate<Cache.Entry<K, V>>[] filter) { - return dht.removeAllAsync(keySet(filter)); - } - - /** {@inheritDoc} */ @Override public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException { dht.removeAllDr(drMap); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 6a8a6cb..7f3d8bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -244,11 +244,17 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { @Override public boolean replace(K key, V oldVal, V newVal) throws IgniteCheckedException { A.notNull(oldVal, "oldVal"); + if (ctx.portableEnabled()) + oldVal = (V)ctx.marshalToPortable(oldVal); + return putx(key, newVal, ctx.equalsPeekArray(oldVal)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { + if (ctx.portableEnabled()) + oldVal = (V)ctx.marshalToPortable(oldVal); + return putxAsync(key, newVal, ctx.equalsPeekArray(oldVal)); } @@ -259,6 +265,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.denyOnLocalRead(); + if (ctx.portableEnabled()) + oldVal = (V)ctx.marshalToPortable(oldVal); + return (GridCacheReturn<V>)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(newVal), @@ -277,6 +286,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.denyOnLocalRead(); + if (ctx.portableEnabled()) + val = (V)ctx.marshalToPortable(val); + return (GridCacheReturn<V>)updateAllInternal(DELETE, Collections.singleton(key), null, @@ -295,6 +307,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.denyOnLocalRead(); + if (ctx.portableEnabled()) + val = (V)ctx.marshalToPortable(val); + return removeAllAsync0(F.asList(key), true, true, ctx.equalsPeekArray(val)); } @@ -305,6 +320,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.denyOnLocalRead(); + if (ctx.portableEnabled()) + oldVal = (V)ctx.marshalToPortable(oldVal); + return updateAllAsync0(F.asMap(key, newVal), null, null, @@ -442,6 +460,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.denyOnLocalRead(); + if (ctx.portableEnabled()) + val = (V)ctx.marshalToPortable(val); + return (Boolean)updateAllInternal(DELETE, Collections.singleton(key), null, @@ -455,6 +476,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) { + if (ctx.portableEnabled()) + val = (V)ctx.marshalToPortable(val); + return removexAsync(key, ctx.equalsPeekArray(val)); } @@ -476,11 +500,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeAllAsync(IgnitePredicate<Cache.Entry<K, V>>[] filter) { - return removeAllAsync(keySet(filter), filter); - } - - /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override @Nullable public V get(K key, boolean deserializePortable) throws IgniteCheckedException { @@ -488,6 +507,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { String taskName = ctx.kernalContext().job().currentTaskName(); + if (ctx.portableEnabled()) + key = (K)ctx.marshalToPortable(key); + Map<K, V> m = getAllInternal(Collections.singleton(key), ctx.isSwapOrOffheapEnabled(), ctx.readThrough(), @@ -496,7 +518,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { deserializePortable, false); - return m.get(key); + assert m.isEmpty() || m.size() == 1 : m.size(); + + return m.isEmpty() ? null : m.values().iterator().next(); } /** {@inheritDoc} */ @@ -509,6 +533,14 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { String taskName = ctx.kernalContext().job().currentTaskName(); + if (ctx.portableEnabled() && !F.isEmpty(keys)) { + keys = F.viewReadOnly(keys, new C1<K, K>() { + @Override public K apply(K k) { + return (K)ctx.marshalToPortable(k); + } + }); + } + return getAllInternal(keys, ctx.isSwapOrOffheapEnabled(), ctx.readThrough(), @@ -587,7 +619,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { for (K key : keys) { if (key == null) - continue; + throw new NullPointerException("Null key."); GridCacheEntryEx<K, V> entry = null; @@ -609,8 +641,16 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { taskName, expiry); - if (v != null) - vals.put(key, v); + if (v != null) { + K key0 = key; + + if (ctx.portableEnabled() && deserializePortable) { + v = (V)ctx.unwrapPortableIfNeeded(v, false); + key0 = (K)ctx.unwrapPortableIfNeeded(key, false); + } + + vals.put(key0, v); + } else success = false; } @@ -957,6 +997,13 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (val == null && op != DELETE) throw new NullPointerException("Null value."); + if (ctx.portableEnabled()) { + key = (K)ctx.marshalToPortable(key); + + if (op == UPDATE) + val = (V)ctx.marshalToPortable(val); + } + while (true) { GridCacheEntryEx<K, V> entry = null; @@ -1237,6 +1284,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (putMap == null) putMap = new LinkedHashMap<>(size, 1.0f); + if (ctx.portableEnabled()) + val = ctx.marshalToPortable(val); + putMap.put(entry.key(), (V)val); } else { @@ -1438,6 +1488,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (key == null) throw new NullPointerException("Null key."); + if (ctx.portableEnabled()) + key = (K)ctx.marshalToPortable(key); + GridCacheEntryEx<K, V> entry = entryEx(key); locked.add(entry); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index cf32dcc..f7a842a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -655,12 +655,14 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { @Nullable public IgniteInternalTx userTx() { IgniteInternalTx<K, V> tx = txContext(); - if (tx != null && tx.user() && tx.state() == ACTIVE) + if (tx != null && tx.user() && (tx.state() != UNKNOWN && tx.state() != ROLLED_BACK && tx.state() != COMMITTED)) return tx; tx = tx(Thread.currentThread().getId()); - return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null; + return tx != null && + tx.user() && + (tx.state() != UNKNOWN && tx.state() != ROLLED_BACK && tx.state() != COMMITTED) ? tx : null; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e8e0b0f/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java index ce125a4..870f21b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java @@ -76,7 +76,7 @@ public class GridTestSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceS @Override public long count(@Nullable String spaceName, Set<Integer> parts) throws IgniteSpiException { Space space = space(spaceName); - return space.count(parts); + return space != null ? space.count(parts) : 0; } /** {@inheritDoc} */