sprint-2 fix for ignite-321
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/11efb918 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/11efb918 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/11efb918 Branch: refs/heads/ignite-185 Commit: 11efb91874a547c408dcac58f0651725ad518abe Parents: 3be22ab Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Mon Mar 2 16:36:02 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Mon Mar 2 16:36:02 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheStoreManager.java | 12 ++++---- .../GridDistributedCacheAdapter.java | 2 ++ .../dataload/IgniteDataLoaderImpl.java | 29 +++++++++++++++----- 3 files changed, 30 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11efb918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java index 9262a8f..c768f54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java @@ -616,6 +616,9 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { handleClassCastException(e); } catch (Exception e) { + if (!(e instanceof CacheWriterException)) + e = new CacheWriterException(e); + if (!entries.isEmpty()) { List<Object> keys = new ArrayList<>(entries.size()); @@ -625,9 +628,6 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { throw new CacheStorePartialUpdateException(keys, e); } - if (!(e instanceof CacheWriterException)) - e = new CacheWriterException(e); - throw new IgniteCheckedException(e); } finally { @@ -725,12 +725,12 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { handleClassCastException(e); } catch (Exception e) { - if (!keys0.isEmpty()) - throw new CacheStorePartialUpdateException(keys0, e); - if (!(e instanceof CacheWriterException)) e = new CacheWriterException(e); + if (!keys0.isEmpty()) + throw new CacheStorePartialUpdateException(keys0, e); + throw new IgniteCheckedException(e); } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11efb918/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index dc82e83..00190d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -277,6 +277,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) { + ((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0); + dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched()); for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/11efb918/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index ced8d1d..ed3bbcb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -66,7 +66,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay private byte[] updaterBytes; /** Max remap count before issuing an error. */ - private static final int MAX_REMAP_CNT = 32; + private static final int DFLT_MAX_REMAP_CNT = 32; /** Log reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@ -156,6 +156,9 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay /** */ private boolean skipStore; + /** */ + private int maxRemapCnt = DFLT_MAX_REMAP_CNT; + /** * @param ctx Grid kernal context. * @param cacheName Cache name. @@ -434,12 +437,6 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay ) { assert entries != null; - if (remaps >= MAX_REMAP_CNT) { - resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " + remaps)); - - return; - } - Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>(); boolean initPda = ctx.deploy().enabled() && jobPda == null; @@ -526,6 +523,10 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataLoaderImpl.this, e1)); } + else if (remaps + 1 > maxRemapCnt) { + resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " + + remaps), e1); + } else load0(entriesForNode, resFut, activeKeys, remaps + 1); } @@ -760,6 +761,20 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay close(false); } + /** + * @return Max remap count. + */ + public int maxRemapCount() { + return maxRemapCnt; + } + + /** + * @param maxRemapCnt New max remap count. + */ + public void maxRemapCount(int maxRemapCnt) { + this.maxRemapCnt = maxRemapCnt; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteDataLoaderImpl.class, this);