Repository: incubator-ignite Updated Branches: refs/heads/ignite-417 314cc899a -> 06201200f
# IGNITE-417 removeAll() throws IllegalStateException if remote node stops during removeAll() execution Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/06201200 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/06201200 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/06201200 Branch: refs/heads/ignite-417 Commit: 06201200fecd98a714206091d368dfe1150dfb48 Parents: 314cc89 Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Mon Mar 9 23:24:49 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Mon Mar 9 23:24:49 2015 +0300 ---------------------------------------------------------------------- .../GridDistributedCacheAdapter.java | 21 +++++++++--------- .../dataload/IgniteDataLoaderImpl.java | 23 ++++++++------------ .../GridCacheRemoveAllMultithreadedTest.java | 4 ++++ 3 files changed, 24 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06201200/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index f8f8b92..98606f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -244,17 +244,17 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter return; } - if (res.contains(-1L)) { - if (attemptCnt >= MAX_REMOVE_ALL_ATTEMPTS) - opFut.onDone(new IgniteCheckedException("Failed to remove all entries.")); - else - removeAllAsync(opFut, attemptCnt + 1); + if (!res.contains(-1L)) { + opFut.onDone(); return; } - if (topVer != ctx.affinity().affinityTopologyVersion()) + if (topVer != ctx.affinity().affinityTopologyVersion()) { removeAllAsync(opFut, attemptCnt); + + return; + } } catch (ClusterGroupEmptyException ignore) { if (log.isDebugEnabled()) @@ -270,14 +270,17 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter return; } - - removeAllAsync(opFut, attemptCnt + 1); } catch (Error e) { opFut.onDone(e); throw e; } + + if (attemptCnt >= MAX_REMOVE_ALL_ATTEMPTS) + opFut.onDone(new IgniteCheckedException("Failed to remove all entries.")); + else + removeAllAsync(opFut, attemptCnt + 1); } }); } @@ -348,8 +351,6 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) { - ((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0); - dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched()); for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06201200/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index ed3bbcb..1352c50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -41,6 +41,7 @@ import org.apache.ignite.lang.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.integration.*; import java.io.*; import java.util.*; import java.util.Map.*; @@ -516,6 +517,14 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } } catch (IgniteCheckedException e1) { + CacheWriterException cwe = e1.getCause(CacheWriterException.class); + + if (cwe != null) { + resFut.onDone(cwe); + + return; + } + if (log.isDebugEnabled()) log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); @@ -761,20 +770,6 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay close(false); } - /** - * @return Max remap count. - */ - public int maxRemapCount() { - return maxRemapCnt; - } - - /** - * @param maxRemapCnt New max remap count. - */ - public void maxRemapCount(int maxRemapCnt) { - this.maxRemapCnt = maxRemapCnt; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteDataLoaderImpl.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06201200/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java index 49245cc..e22d564 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java @@ -19,10 +19,14 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; +import javax.cache.integration.*; import java.util.*; import static org.apache.ignite.cache.CacheAtomicityMode.*;