Repository: incubator-ignite Updated Branches: refs/heads/ignite-42 6d920419b -> d253a7b13
# ignite-42 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d253a7b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d253a7b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d253a7b1 Branch: refs/heads/ignite-42 Commit: d253a7b132236e9e4bfff759a30085cb7313b56f Parents: 6d92041 Author: sboikov <sboi...@gridgain.com> Authored: Mon Jan 19 18:57:51 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jan 19 18:57:51 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 2 +- .../processors/cache/GridCacheStoreManager.java | 64 +++++++++++--------- .../dht/atomic/GridDhtAtomicCache.java | 29 ++++++--- .../local/atomic/GridLocalAtomicCache.java | 26 +++++--- 4 files changed, 73 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d253a7b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 257457a..1e4426e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -553,7 +553,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, - CompletionListener completionLsnr) { + @Nullable CompletionListener completionLsnr) { // TODO IGNITE-1. throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d253a7b1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java index 7391328..ed95bbd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java @@ -335,7 +335,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { catch (ClassCastException e) { handleClassCastException(e); } - catch (IgniteException e) { + catch (Exception e) { throw U.cast(e); } finally { @@ -391,7 +391,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { } }, args); } - catch (IgniteException e) { + catch (Exception e) { throw U.cast(e); } catch (AssertionError e) { @@ -443,6 +443,9 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { catch (ClassCastException e) { handleClassCastException(e); } + catch (Exception e) { + throw new IgniteCheckedException(e); + } finally { if (ses) sesHolder.set(null); @@ -495,30 +498,32 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Storing values in cache store [map=" + map0 + ']'); - boolean ses = initSession(tx); + // TODO IGNITE-42. + Collection<Cache.Entry<? extends K, ? extends Object>> entries = new ArrayList<>(map.size()); - try { - /* - C1<Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>>, Cache.Entry<? extends K, ?>> c = - new C1<Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>>, Cache.Entry<? extends K, ?>>() { - @Override public Cache.Entry<? extends K, ?> apply(Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e) { - return new CacheEntryImpl<>(e.getKey(), locStore ? e.getValue() : e.getValue().get1()); - } - }; - - - Collection<Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>>> col = map.entrySet(); - */ - Collection<Cache.Entry<? extends K, ? extends Object>> entries = new ArrayList<>(map.size()); + for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : map.entrySet()) + entries.add(new CacheEntryImpl<>(e.getKey(), locStore ? e.getValue() : e.getValue().get1())); - for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : map.entrySet()) - entries.add(new CacheEntryImpl<>(e.getKey(), locStore ? e.getValue() : e.getValue().get1())); + boolean ses = initSession(tx); + try { store.writeAll(entries); } catch (ClassCastException e) { handleClassCastException(e); } + catch (Exception e) { + if (!entries.isEmpty()) { + List<Object> keys = new ArrayList<>(entries.size()); + + for (Cache.Entry<?, ?> entry : entries) + keys.add(entry.getKey()); + + throw new CacheStorePartialUpdateException(keys, e); + } + + throw new IgniteCheckedException(e); + } finally { if (ses) sesHolder.set(null); @@ -560,6 +565,9 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { catch (ClassCastException e) { handleClassCastException(e); } + catch (Exception e) { + throw new IgniteCheckedException(e); + } finally { if (ses) sesHolder.set(null); @@ -580,24 +588,20 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return {@code True} if there is a persistent storage. * @throws IgniteCheckedException If storage failed. */ - public boolean removeAllFromStore(@Nullable IgniteTx tx, Collection<? extends K> keys) throws IgniteCheckedException { + @SuppressWarnings("unchecked") + public boolean removeAllFromStore(@Nullable IgniteTx tx, Collection<?> keys) throws IgniteCheckedException { if (F.isEmpty(keys)) return true; if (keys.size() == 1) { - K key = keys.iterator().next(); + Object key = keys.iterator().next(); - return removeFromStore(tx, key); + return removeFromStore(tx, (K)key); } if (store != null) { - Collection<? extends K> keys0 = convertPortable ? - F.viewReadOnly(keys, new C1<K, K>() { - @Override public K apply(K k) { - return (K)cctx.unwrapPortableIfNeeded(k, false); - } - }) : - keys; + Collection<Object> keys0 = convertPortable ? + cctx.unwrapPortablesIfNeeded((Collection<Object>)keys, false) : (Collection<Object>)keys; if (log.isDebugEnabled()) log.debug("Removing values from cache store [keys=" + keys0 + ']'); @@ -611,8 +615,8 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { handleClassCastException(e); } catch (Exception e) { - //if (!keys.isEmpty()) - //throw new CacheStorePartialUpdateException(keys0, e); + if (!keys0.isEmpty()) + throw new CacheStorePartialUpdateException(keys0, e); throw new IgniteCheckedException(e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d253a7b1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index b01778a..8a3d0d1 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1838,6 +1838,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); + CacheStorePartialUpdateException storeErr = null; + try { GridCacheOperation op; @@ -1851,11 +1853,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }) : putMap; - ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new C1<V, IgniteBiTuple<V, GridCacheVersion>>() { - @Override public IgniteBiTuple<V, GridCacheVersion> apply(V v) { - return F.t(v, ver); - } - })); + try { + ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new C1<V, IgniteBiTuple<V, GridCacheVersion>>() { + @Override public IgniteBiTuple<V, GridCacheVersion> apply(V v) { + return F.t(v, ver); + } + })); + } + catch (CacheStorePartialUpdateException e) { + storeErr = e; + } op = UPDATE; } @@ -1869,16 +1876,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }) : rmvKeys; - ctx.store().removeAllFromStore(null, storeKeys); - /* try { ctx.store().removeAllFromStore(null, storeKeys); } catch (CacheStorePartialUpdateException e) { - if (e.failedKeys().size() == storeKeys.size()) - + storeErr = e; } - */ op = DELETE; } @@ -1897,6 +1900,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { continue; } + if (storeErr != null && storeErr.failedKeys().contains(entry.key())) + continue; + try { // We are holding java-level locks on entries at this point. V writeVal = op == UPDATE ? putMap.get(entry.key()) : null; @@ -2032,6 +2038,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e); } + if (storeErr != null) + res.addFailedKeys((Collection<K>)storeErr.failedKeys(), storeErr.getCause()); + return dhtFut; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d253a7b1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index a488d3c..978072e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1270,18 +1270,30 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { assert putMap == null ^ rmvKeys == null; GridCacheOperation op; + CacheStorePartialUpdateException storeErr = null; + try { if (putMap != null) { - ctx.store().putAllToStore(null, F.viewReadOnly(putMap, new C1<V, IgniteBiTuple<V, GridCacheVersion>>() { - @Override public IgniteBiTuple<V, GridCacheVersion> apply(V v) { - return F.t(v, ver); - } - })); + try { + ctx.store().putAllToStore(null, F.viewReadOnly(putMap, new C1<V, IgniteBiTuple<V, GridCacheVersion>>() { + @Override public IgniteBiTuple<V, GridCacheVersion> apply(V v) { + return F.t(v, ver); + } + })); + } + catch (CacheStorePartialUpdateException e) { + storeErr = e; + } op = UPDATE; } else { - ctx.store().removeAllFromStore(null, rmvKeys); + try { + ctx.store().removeAllFromStore(null, rmvKeys); + } + catch (CacheStorePartialUpdateException e) { + storeErr = e; + } op = DELETE; } @@ -1302,7 +1314,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { assert Thread.holdsLock(entry); - if (entry.obsolete()) + if (entry.obsolete() || (storeErr != null && storeErr.failedKeys().contains(entry.key()))) continue; try {