Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 5de1cc2dc -> fdb269d5e


IGNITE-45 - GridCacheAdapter.tryPutIfAbsent method


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/56ce0d37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/56ce0d37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/56ce0d37

Branch: refs/heads/ignite-45
Commit: 56ce0d37ccf76ad0c0427d8b1748612cea0f5f23
Parents: 34046ec
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Mon Mar 23 18:39:38 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Mon Mar 23 18:39:38 2015 -0700

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |  2 +-
 .../ignite/internal/MarshallerContextImpl.java  |  9 +++-
 .../processors/cache/GridCacheAdapter.java      | 14 ++++++
 .../cache/GridCacheTryPutFailedException.java   | 28 +++++++++++
 .../dht/atomic/GridDhtAtomicCache.java          | 49 +++++++++++++++-----
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 34 ++++++++------
 .../GridCacheMarshallingNodeJoinSelfTest.java   |  3 --
 7 files changed, 108 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 4c70d1e..0cbdcf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1906,7 +1906,7 @@ public class IgnitionEx {
 
                 cache.setName(CU.MARSH_CACHE_NAME);
                 cache.setCacheMode(REPLICATED);
-                cache.setAtomicityMode(TRANSACTIONAL);
+                cache.setAtomicityMode(ATOMIC);
                 cache.setSwapEnabled(false);
                 cache.setRebalanceMode(SYNC);
                 cache.setWriteSynchronizationMode(FULL_SYNC);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 6f6b4ad..4947dad 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -62,7 +62,14 @@ public class MarshallerContextImpl extends 
MarshallerContextAdapter {
 
             for (int i = 0; i < CACHE_UPDATE_RETRIES_CNT; i++) {
                 try {
-                    String old = cache0.putIfAbsent(id, clsName);
+                    String old;
+
+                    try {
+                        old = cache0.tryPutIfAbsent(id, clsName);
+                    }
+                    catch (GridCacheTryPutFailedException ignored) {
+                        return false;
+                    }
 
                     if (old != null && !old.equals(clsName))
                         throw new IgniteException("Type ID collision occurred 
in OptimizedMarshaller. Use " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 2cf3e5a..2b65fed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2749,6 +2749,20 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
         });
     }
 
+    /**
+     * Tries to put value in cache. Will fail with {@link 
GridCacheTryPutFailedException}
+     * if topology exchange is in progress.
+     *
+     * @param key Key.
+     * @param val value.
+     * @return Old value.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @Nullable public V tryPutIfAbsent(K key, V val) throws 
IgniteCheckedException {
+        // Supported only in ATOMIC cache.
+        throw new UnsupportedOperationException();
+    }
+
     /** {@inheritDoc} */
     @Nullable @Override public V putIfAbsent(final K key, final V val) throws 
IgniteCheckedException {
         A.notNull(key, "key", val, "val");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java
new file mode 100644
index 0000000..10bc35f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+
+/**
+ * Try put failed exception.
+ */
+public class GridCacheTryPutFailedException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index b76e815..35319ab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -349,7 +349,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             null,
             true,
             false,
-            filter);
+            filter,
+            true);
     }
 
     /** {@inheritDoc} */
@@ -365,7 +366,23 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             null,
             false,
             false,
-            filter);
+            filter,
+            true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public V tryPutIfAbsent(K key, V val) throws 
IgniteCheckedException {
+        A.notNull(key, "key", val, "val");
+
+        return (V)updateAllAsync0(F0.asMap(key, val),
+            null,
+            null,
+            null,
+            null,
+            true,
+            false,
+            ctx.noValArray(),
+            false).get();
     }
 
     /** {@inheritDoc} */
@@ -456,7 +473,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             null,
             true,
             true,
-            ctx.equalsValArray(oldVal));
+            ctx.equalsValArray(oldVal),
+            true);
     }
 
     /** {@inheritDoc} */
@@ -475,7 +493,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             null,
             false,
             false,
-            filter).chain(RET2NULL);
+            filter,
+            true).chain(RET2NULL);
     }
 
     /** {@inheritDoc} */
@@ -495,7 +514,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             null,
             false,
             false,
-            null);
+            null,
+            true);
     }
 
     /** {@inheritDoc} */
@@ -678,7 +698,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             null,
             false,
             false,
-            null);
+            null,
+            true);
 
         return fut.chain(new CX1<IgniteInternalFuture<Map<K, 
EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
             @Override public EntryProcessorResult<T> 
applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut)
@@ -721,7 +742,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             null,
             false,
             false,
-            null);
+            null,
+            true);
     }
 
     /** {@inheritDoc} */
@@ -750,7 +772,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             null,
             false,
             false,
-            null);
+            null,
+            true);
     }
 
     /**
@@ -764,6 +787,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param retval Return value required flag.
      * @param rawRetval Return {@code GridCacheReturn} instance.
      * @param filter Cache entry filter for atomic updates.
+     * @param waitTopFut Whether to wait for topology future.
      * @return Completion future.
      */
     @SuppressWarnings("ConstantConditions")
@@ -775,7 +799,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
         final boolean retval,
         final boolean rawRetval,
-        @Nullable final CacheEntryPredicate[] filter
+        @Nullable final CacheEntryPredicate[] filter,
+        final boolean waitTopFut
     ) {
         if (map != null && keyCheck)
             validateCacheKeys(map.keySet());
@@ -808,7 +833,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         return asyncOp(new CO<IgniteInternalFuture<Object>>() {
             @Override public IgniteInternalFuture<Object> apply() {
-                updateFut.map();
+                updateFut.map(waitTopFut);
 
                 return updateFut;
             }
@@ -871,7 +896,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         return asyncOp(new CO<IgniteInternalFuture<Object>>() {
             @Override public IgniteInternalFuture<Object> apply() {
-                updateFut.map();
+                updateFut.map(true);
 
                 return updateFut;
             }
@@ -2331,7 +2356,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             req.subjectId(),
             req.taskNameHash());
 
-        updateFut.map();
+        updateFut.map(true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index f4d1d95..79d1779 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -292,9 +292,11 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
     /**
      * Performs future mapping.
+     *
+     * @param waitTopFut Whether to wait for topology future.
      */
-    public void map() {
-        mapOnTopology(keys, false, null);
+    public void map(boolean waitTopFut) {
+        mapOnTopology(keys, false, null, waitTopFut);
     }
 
     /** {@inheritDoc} */
@@ -311,7 +313,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             retval = Collections.emptyMap();
 
         if (super.onDone(retval, err)) {
-            cctx.mvcc().removeAtomicFuture(version());
+            if (futVer != null)
+                cctx.mvcc().removeAtomicFuture(version());
 
             return true;
         }
@@ -329,7 +332,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
         if (res.remapKeys() != null) {
             assert cctx.config().getAtomicWriteOrderMode() == PRIMARY;
 
-            mapOnTopology(res.remapKeys(), true, nodeId);
+            mapOnTopology(res.remapKeys(), true, nodeId, true);
 
             return;
         }
@@ -407,16 +410,16 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
      * @param keys Keys to map.
      * @param remap Boolean flag indicating if this is partial future remap.
      * @param oldNodeId Old node ID if remap.
+     * @param waitTopFut Whether to wait for topology future.
      */
-    private void mapOnTopology(final Collection<?> keys, final boolean remap, 
final UUID oldNodeId) {
+    private void mapOnTopology(final Collection<?> keys, final boolean remap, 
final UUID oldNodeId,
+        final boolean waitTopFut) {
         cache.topology().readLock();
 
         AffinityTopologyVersion topVer = null;
 
         try {
             if (cache.topology().stopping()) {
-                futVer = 
cctx.versions().next(cctx.affinity().affinityTopologyVersion());
-
                 onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
                     cache.name()));
 
@@ -431,14 +434,17 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 if (futVer == null)
                     // Assign future version in topology read lock before 
first exception may be thrown.
                     futVer = cctx.versions().next(topVer);
-
             }
             else {
-                fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                        mapOnTopology(keys, remap, oldNodeId);
-                    }
-                });
+                if (waitTopFut) {
+                    fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                            mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+                        }
+                    });
+                }
+                else
+                    onDone(new GridCacheTryPutFailedException());
 
                 return;
             }
@@ -747,7 +753,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
      */
     private Collection<ClusterNode> mapKey(
         KeyCacheObject key,
-        AffinityTopologyVersion topVer, 
+        AffinityTopologyVersion topVer,
         boolean fastMap
     ) {
         GridCacheAffinityManager affMgr = cctx.affinity();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
index a7e51b3..e1569dc 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.cache.store.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -141,8 +140,6 @@ public class GridCacheMarshallingNodeJoinSelfTest extends 
GridCommonAbstractTest
     private static class Store extends CacheStoreAdapter<Integer, TestObject> 
implements Serializable {
         /** {@inheritDoc} */
         @Override public TestObject load(Integer key) throws 
CacheLoaderException {
-            U.dumpStack("key=" + key);
-
             return key > 0 ? new TestObject() : null;
         }
 

Reply via email to