Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 5de1cc2dc -> fdb269d5e
IGNITE-45 - GridCacheAdapter.tryPutIfAbsent method Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/56ce0d37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/56ce0d37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/56ce0d37 Branch: refs/heads/ignite-45 Commit: 56ce0d37ccf76ad0c0427d8b1748612cea0f5f23 Parents: 34046ec Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon Mar 23 18:39:38 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon Mar 23 18:39:38 2015 -0700 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 2 +- .../ignite/internal/MarshallerContextImpl.java | 9 +++- .../processors/cache/GridCacheAdapter.java | 14 ++++++ .../cache/GridCacheTryPutFailedException.java | 28 +++++++++++ .../dht/atomic/GridDhtAtomicCache.java | 49 +++++++++++++++----- .../dht/atomic/GridNearAtomicUpdateFuture.java | 34 ++++++++------ .../GridCacheMarshallingNodeJoinSelfTest.java | 3 -- 7 files changed, 108 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 4c70d1e..0cbdcf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1906,7 +1906,7 @@ public class IgnitionEx { cache.setName(CU.MARSH_CACHE_NAME); cache.setCacheMode(REPLICATED); - cache.setAtomicityMode(TRANSACTIONAL); + cache.setAtomicityMode(ATOMIC); cache.setSwapEnabled(false); cache.setRebalanceMode(SYNC); cache.setWriteSynchronizationMode(FULL_SYNC); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 6f6b4ad..4947dad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -62,7 +62,14 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { for (int i = 0; i < CACHE_UPDATE_RETRIES_CNT; i++) { try { - String old = cache0.putIfAbsent(id, clsName); + String old; + + try { + old = cache0.tryPutIfAbsent(id, clsName); + } + catch (GridCacheTryPutFailedException ignored) { + return false; + } if (old != null && !old.equals(clsName)) throw new IgniteException("Type ID collision occurred in OptimizedMarshaller. Use " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 2cf3e5a..2b65fed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2749,6 +2749,20 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, }); } + /** + * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException} + * if topology exchange is in progress. + * + * @param key Key. + * @param val value. + * @return Old value. + * @throws IgniteCheckedException In case of error. + */ + @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + // Supported only in ATOMIC cache. + throw new UnsupportedOperationException(); + } + /** {@inheritDoc} */ @Nullable @Override public V putIfAbsent(final K key, final V val) throws IgniteCheckedException { A.notNull(key, "key", val, "val"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java new file mode 100644 index 0000000..10bc35f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; + +/** + * Try put failed exception. + */ +public class GridCacheTryPutFailedException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index b76e815..35319ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -349,7 +349,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, false, - filter); + filter, + true); } /** {@inheritDoc} */ @@ -365,7 +366,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - filter); + filter, + true); + } + + /** {@inheritDoc} */ + @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + A.notNull(key, "key", val, "val"); + + return (V)updateAllAsync0(F0.asMap(key, val), + null, + null, + null, + null, + true, + false, + ctx.noValArray(), + false).get(); } /** {@inheritDoc} */ @@ -456,7 +473,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, true, - ctx.equalsValArray(oldVal)); + ctx.equalsValArray(oldVal), + true); } /** {@inheritDoc} */ @@ -475,7 +493,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - filter).chain(RET2NULL); + filter, + true).chain(RET2NULL); } /** {@inheritDoc} */ @@ -495,7 +514,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - null); + null, + true); } /** {@inheritDoc} */ @@ -678,7 +698,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - null); + null, + true); return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) @@ -721,7 +742,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - null); + null, + true); } /** {@inheritDoc} */ @@ -750,7 +772,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - null); + null, + true); } /** @@ -764,6 +787,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. * @param filter Cache entry filter for atomic updates. + * @param waitTopFut Whether to wait for topology future. * @return Completion future. */ @SuppressWarnings("ConstantConditions") @@ -775,7 +799,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, final boolean retval, final boolean rawRetval, - @Nullable final CacheEntryPredicate[] filter + @Nullable final CacheEntryPredicate[] filter, + final boolean waitTopFut ) { if (map != null && keyCheck) validateCacheKeys(map.keySet()); @@ -808,7 +833,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @Override public IgniteInternalFuture<Object> apply() { - updateFut.map(); + updateFut.map(waitTopFut); return updateFut; } @@ -871,7 +896,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @Override public IgniteInternalFuture<Object> apply() { - updateFut.map(); + updateFut.map(true); return updateFut; } @@ -2331,7 +2356,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), req.taskNameHash()); - updateFut.map(); + updateFut.map(true); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index f4d1d95..79d1779 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -292,9 +292,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * Performs future mapping. + * + * @param waitTopFut Whether to wait for topology future. */ - public void map() { - mapOnTopology(keys, false, null); + public void map(boolean waitTopFut) { + mapOnTopology(keys, false, null, waitTopFut); } /** {@inheritDoc} */ @@ -311,7 +313,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> retval = Collections.emptyMap(); if (super.onDone(retval, err)) { - cctx.mvcc().removeAtomicFuture(version()); + if (futVer != null) + cctx.mvcc().removeAtomicFuture(version()); return true; } @@ -329,7 +332,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (res.remapKeys() != null) { assert cctx.config().getAtomicWriteOrderMode() == PRIMARY; - mapOnTopology(res.remapKeys(), true, nodeId); + mapOnTopology(res.remapKeys(), true, nodeId, true); return; } @@ -407,16 +410,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param keys Keys to map. * @param remap Boolean flag indicating if this is partial future remap. * @param oldNodeId Old node ID if remap. + * @param waitTopFut Whether to wait for topology future. */ - private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) { + private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId, + final boolean waitTopFut) { cache.topology().readLock(); AffinityTopologyVersion topVer = null; try { if (cache.topology().stopping()) { - futVer = cctx.versions().next(cctx.affinity().affinityTopologyVersion()); - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + cache.name())); @@ -431,14 +434,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (futVer == null) // Assign future version in topology read lock before first exception may be thrown. futVer = cctx.versions().next(topVer); - } else { - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - mapOnTopology(keys, remap, oldNodeId); - } - }); + if (waitTopFut) { + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + mapOnTopology(keys, remap, oldNodeId, waitTopFut); + } + }); + } + else + onDone(new GridCacheTryPutFailedException()); return; } @@ -747,7 +753,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> */ private Collection<ClusterNode> mapKey( KeyCacheObject key, - AffinityTopologyVersion topVer, + AffinityTopologyVersion topVer, boolean fastMap ) { GridCacheAffinityManager affMgr = cctx.affinity(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56ce0d37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java index a7e51b3..e1569dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java @@ -23,7 +23,6 @@ import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -141,8 +140,6 @@ public class GridCacheMarshallingNodeJoinSelfTest extends GridCommonAbstractTest private static class Store extends CacheStoreAdapter<Integer, TestObject> implements Serializable { /** {@inheritDoc} */ @Override public TestObject load(Integer key) throws CacheLoaderException { - U.dumpStack("key=" + key); - return key > 0 ? new TestObject() : null; }