Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-42 6d920419b -> d253a7b13


# ignite-42


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d253a7b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d253a7b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d253a7b1

Branch: refs/heads/ignite-42
Commit: d253a7b132236e9e4bfff759a30085cb7313b56f
Parents: 6d92041
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Jan 19 18:57:51 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Jan 19 18:57:51 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  2 +-
 .../processors/cache/GridCacheStoreManager.java | 64 +++++++++++---------
 .../dht/atomic/GridDhtAtomicCache.java          | 29 ++++++---
 .../local/atomic/GridLocalAtomicCache.java      | 26 +++++---
 4 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d253a7b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 257457a..1e4426e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -553,7 +553,7 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     /** {@inheritDoc} */
     @Override public void loadAll(Set<? extends K> keys,
         boolean replaceExistingValues,
-        CompletionListener completionLsnr) {
+        @Nullable CompletionListener completionLsnr) {
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d253a7b1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
index 7391328..ed95bbd 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
@@ -335,7 +335,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                 catch (ClassCastException e) {
                     handleClassCastException(e);
                 }
-                catch (IgniteException e) {
+                catch (Exception e) {
                     throw U.cast(e);
                 }
                 finally {
@@ -391,7 +391,7 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                     }
                 }, args);
             }
-            catch (IgniteException e) {
+            catch (Exception e) {
                 throw U.cast(e);
             }
             catch (AssertionError e) {
@@ -443,6 +443,9 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
             catch (ClassCastException e) {
                 handleClassCastException(e);
             }
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
             finally {
                 if (ses)
                     sesHolder.set(null);
@@ -495,30 +498,32 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                 if (log.isDebugEnabled())
                     log.debug("Storing values in cache store [map=" + map0 + 
']');
 
-                boolean ses = initSession(tx);
+                // TODO IGNITE-42.
+                Collection<Cache.Entry<? extends K, ? extends Object>> entries 
= new ArrayList<>(map.size());
 
-                try {
-                    /*
-                    C1<Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>>, 
Cache.Entry<? extends K, ?>> c =
-                        new C1<Map.Entry<K, IgniteBiTuple<V, 
GridCacheVersion>>, Cache.Entry<? extends K, ?>>() {
-                            @Override public Cache.Entry<? extends K, ?> 
apply(Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e) {
-                                return new CacheEntryImpl<>(e.getKey(), 
locStore ? e.getValue() : e.getValue().get1());
-                            }
-                        };
-
-
-                    Collection<Map.Entry<K, IgniteBiTuple<V, 
GridCacheVersion>>> col = map.entrySet();
-                    */
-                    Collection<Cache.Entry<? extends K, ? extends Object>> 
entries = new ArrayList<>(map.size());
+                for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : 
map.entrySet())
+                    entries.add(new CacheEntryImpl<>(e.getKey(), locStore ? 
e.getValue() : e.getValue().get1()));
 
-                    for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : 
map.entrySet())
-                        entries.add(new CacheEntryImpl<>(e.getKey(), locStore 
? e.getValue() : e.getValue().get1()));
+                boolean ses = initSession(tx);
 
+                try {
                     store.writeAll(entries);
                 }
                 catch (ClassCastException e) {
                     handleClassCastException(e);
                 }
+                catch (Exception e) {
+                    if (!entries.isEmpty()) {
+                        List<Object> keys = new ArrayList<>(entries.size());
+
+                        for (Cache.Entry<?, ?> entry : entries)
+                            keys.add(entry.getKey());
+
+                        throw new CacheStorePartialUpdateException(keys, e);
+                    }
+
+                    throw new IgniteCheckedException(e);
+                }
                 finally {
                     if (ses)
                         sesHolder.set(null);
@@ -560,6 +565,9 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
             catch (ClassCastException e) {
                 handleClassCastException(e);
             }
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
             finally {
                 if (ses)
                     sesHolder.set(null);
@@ -580,24 +588,20 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
      * @return {@code True} if there is a persistent storage.
      * @throws IgniteCheckedException If storage failed.
      */
-    public boolean removeAllFromStore(@Nullable IgniteTx tx, Collection<? 
extends K> keys) throws IgniteCheckedException {
+    @SuppressWarnings("unchecked")
+    public boolean removeAllFromStore(@Nullable IgniteTx tx, Collection<?> 
keys) throws IgniteCheckedException {
         if (F.isEmpty(keys))
             return true;
 
         if (keys.size() == 1) {
-            K key = keys.iterator().next();
+            Object key = keys.iterator().next();
 
-            return removeFromStore(tx, key);
+            return removeFromStore(tx, (K)key);
         }
 
         if (store != null) {
-            Collection<? extends K> keys0 = convertPortable ?
-                F.viewReadOnly(keys, new C1<K, K>() {
-                    @Override public K apply(K k) {
-                        return (K)cctx.unwrapPortableIfNeeded(k, false);
-                    }
-                }) :
-                keys;
+            Collection<Object> keys0 = convertPortable ?
+                cctx.unwrapPortablesIfNeeded((Collection<Object>)keys, false) 
: (Collection<Object>)keys;
 
             if (log.isDebugEnabled())
                 log.debug("Removing values from cache store [keys=" + keys0 + 
']');
@@ -611,8 +615,8 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
                 handleClassCastException(e);
             }
             catch (Exception e) {
-                //if (!keys.isEmpty())
-                    //throw new CacheStorePartialUpdateException(keys0, e);
+                if (!keys0.isEmpty())
+                    throw new CacheStorePartialUpdateException(keys0, e);
 
                 throw new IgniteCheckedException(e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d253a7b1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index b01778a..8a3d0d1 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1838,6 +1838,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), 
topVer);
 
+        CacheStorePartialUpdateException storeErr = null;
+
         try {
             GridCacheOperation op;
 
@@ -1851,11 +1853,16 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     }) :
                     putMap;
 
-                ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, new 
C1<V, IgniteBiTuple<V, GridCacheVersion>>() {
-                    @Override public IgniteBiTuple<V, GridCacheVersion> 
apply(V v) {
-                        return F.t(v, ver);
-                    }
-                }));
+                try {
+                    ctx.store().putAllToStore(null, F.viewReadOnly(storeMap, 
new C1<V, IgniteBiTuple<V, GridCacheVersion>>() {
+                        @Override public IgniteBiTuple<V, GridCacheVersion> 
apply(V v) {
+                            return F.t(v, ver);
+                        }
+                    }));
+                }
+                catch (CacheStorePartialUpdateException e) {
+                    storeErr = e;
+                }
 
                 op = UPDATE;
             }
@@ -1869,16 +1876,12 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     }) :
                     rmvKeys;
 
-                ctx.store().removeAllFromStore(null, storeKeys);
-                /*
                 try {
                     ctx.store().removeAllFromStore(null, storeKeys);
                 }
                 catch (CacheStorePartialUpdateException e) {
-                    if (e.failedKeys().size() == storeKeys.size())
-
+                    storeErr = e;
                 }
-                */
 
                 op = DELETE;
             }
@@ -1897,6 +1900,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     continue;
                 }
 
+                if (storeErr != null && 
storeErr.failedKeys().contains(entry.key()))
+                    continue;
+
                 try {
                     // We are holding java-level locks on entries at this 
point.
                     V writeVal = op == UPDATE ? putMap.get(entry.key()) : null;
@@ -2032,6 +2038,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e);
         }
 
+        if (storeErr != null)
+            res.addFailedKeys((Collection<K>)storeErr.failedKeys(), 
storeErr.getCause());
+
         return dhtFut;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d253a7b1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index a488d3c..978072e 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1270,18 +1270,30 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
         assert putMap == null ^ rmvKeys == null;
         GridCacheOperation op;
 
+        CacheStorePartialUpdateException storeErr = null;
+
         try {
             if (putMap != null) {
-                ctx.store().putAllToStore(null, F.viewReadOnly(putMap, new 
C1<V, IgniteBiTuple<V, GridCacheVersion>>() {
-                    @Override public IgniteBiTuple<V, GridCacheVersion> 
apply(V v) {
-                        return F.t(v, ver);
-                    }
-                }));
+                try {
+                    ctx.store().putAllToStore(null, F.viewReadOnly(putMap, new 
C1<V, IgniteBiTuple<V, GridCacheVersion>>() {
+                        @Override public IgniteBiTuple<V, GridCacheVersion> 
apply(V v) {
+                            return F.t(v, ver);
+                        }
+                    }));
+                }
+                catch (CacheStorePartialUpdateException e) {
+                    storeErr = e;
+                }
 
                 op = UPDATE;
             }
             else {
-                ctx.store().removeAllFromStore(null, rmvKeys);
+                try {
+                    ctx.store().removeAllFromStore(null, rmvKeys);
+                }
+                catch (CacheStorePartialUpdateException e) {
+                    storeErr = e;
+                }
 
                 op = DELETE;
             }
@@ -1302,7 +1314,7 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
 
             assert Thread.holdsLock(entry);
 
-            if (entry.obsolete())
+            if (entry.obsolete() || (storeErr != null && 
storeErr.failedKeys().contains(entry.key())))
                 continue;
 
             try {

Reply via email to