# IGNITE-283: Fixed DR start issue.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/47fa3cef Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/47fa3cef Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/47fa3cef Branch: refs/heads/ignite-281 Commit: 47fa3cefae42cca7ddc2b859208480a4122db319 Parents: c93d86f Author: vozerov <voze...@gridgain.com> Authored: Thu Feb 19 14:55:33 2015 +0300 Committer: vozerov <voze...@gridgain.com> Committed: Thu Feb 19 14:55:33 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 5 +++ .../processors/cache/GridCacheProcessor.java | 4 ++ .../GridDistributedTxRemoteAdapter.java | 11 +++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 44 ++++++++++---------- 4 files changed, 36 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/47fa3cef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 060a825..3ec013c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -320,7 +320,12 @@ public class GridCacheContext<K, V> implements Externalizable { expiryPlc = null; itHolder = new CacheWeakQueryIteratorsHolder(log); + } + /** + * Initialize conflict resolver after all managers are started. + */ + void initConflictResolver() { // Conflict resolver is determined in two stages: // 1. If DR receiver hub is enabled, then pick it from DR manager. // 2. Otherwise instantiate default resolver in case local store is configured. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/47fa3cef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index d038e91..e99c706 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -854,6 +854,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx)))) mgr.start(cacheCtx); + cacheCtx.initConflictResolver(); + if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) { GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context(); @@ -861,6 +863,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheManager mgr : dhtManagers(dhtCtx)) mgr.start(dhtCtx); + dhtCtx.initConflictResolver(); + // Start DHT cache. dhtCtx.cache().start(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/47fa3cef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 1ae5778..e36947e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -509,17 +509,16 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> if (txEntry.ttl() == CU.TTL_ZERO) op = DELETE; - boolean drNeedResolve = cacheCtx.conflictNeedResolve(); - if (drNeedResolve) { - IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>> - drRes = conflictResolve(op, txEntry.key(), val, valBytes, - txEntry.ttl(), txEntry.conflictExpireTime(), explicitVer, cached); + if (drNeedResolve) { + IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>> + drRes = conflictResolve(op, txEntry.key(), val, valBytes, + txEntry.ttl(), txEntry.conflictExpireTime(), explicitVer, cached); assert drRes != null; - GridCacheVersionConflictContext<K, V> drCtx = drRes.get2(); + GridCacheVersionConflictContext<K, V> drCtx = drRes.get2(); if (drCtx.isUseOld()) op = NOOP; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/47fa3cef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 14b8248..ef3de55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -85,13 +85,13 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> /** Optional arguments for entry processor. */ private Object[] invokeArgs; - /** DR put values. */ + /** Conflict put values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private Collection<GridCacheDrInfo<V>> drPutVals; + private Collection<GridCacheDrInfo<V>> conflictPutVals; - /** DR remove values. */ + /** Conflict remove values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private Collection<GridCacheVersion> drRmvVals; + private Collection<GridCacheVersion> conflictRmvVals; /** Mappings. */ @GridToStringInclude @@ -174,8 +174,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> * @param keys Keys to update. * @param vals Values or transform closure. * @param invokeArgs Optional arguments for entry processor. - * @param drPutVals DR put values (optional). - * @param drRmvVals DR remove values (optional). + * @param conflictPutVals Conflict put values (optional). + * @param conflictRmvVals Conflict remove values (optional). * @param retval Return value require flag. * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. * @param cached Cached entry if keys size is 1. @@ -192,8 +192,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> Collection<? extends K> keys, @Nullable Collection<?> vals, @Nullable Object[] invokeArgs, - @Nullable Collection<GridCacheDrInfo<V>> drPutVals, - @Nullable Collection<GridCacheVersion> drRmvVals, + @Nullable Collection<GridCacheDrInfo<V>> conflictPutVals, + @Nullable Collection<GridCacheVersion> conflictRmvVals, final boolean retval, final boolean rawRetval, @Nullable GridCacheEntryEx<K, V> cached, @@ -207,8 +207,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> this.rawRetval = rawRetval; assert vals == null || vals.size() == keys.size(); - assert drPutVals == null || drPutVals.size() == keys.size(); - assert drRmvVals == null || drRmvVals.size() == keys.size(); + assert conflictPutVals == null || conflictPutVals.size() == keys.size(); + assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); assert cached == null || keys.size() == 1; assert subjId != null; @@ -219,8 +219,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> this.keys = keys; this.vals = vals; this.invokeArgs = invokeArgs; - this.drPutVals = drPutVals; - this.drRmvVals = drRmvVals; + this.conflictPutVals = conflictPutVals; + this.conflictRmvVals = conflictRmvVals; this.retval = retval; this.cached = cached; this.expiryPlc = expiryPlc; @@ -530,19 +530,19 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> drExpireTime = -1; drVer = null; } - else if (drPutVals != null) { - GridCacheDrInfo<V> drPutVal = F.first(drPutVals); + else if (conflictPutVals != null) { + GridCacheDrInfo<V> drPutVal = F.first(conflictPutVals); val = drPutVal.value(); drTtl = drPutVal.ttl(); drExpireTime = drPutVal.expireTime(); drVer = drPutVal.version(); } - else if (drRmvVals != null) { + else if (conflictRmvVals != null) { val = null; drTtl = -1; drExpireTime = -1; - drVer = F.first(drRmvVals); + drVer = F.first(conflictRmvVals); } else { val = null; @@ -616,13 +616,13 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> Iterator<GridCacheDrInfo<V>> drPutValsIt = null; - if (drPutVals != null) - drPutValsIt = drPutVals.iterator(); + if (conflictPutVals != null) + drPutValsIt = conflictPutVals.iterator(); Iterator<GridCacheVersion> drRmvValsIt = null; - if (drRmvVals != null) - drRmvValsIt = drRmvVals.iterator(); + if (conflictRmvVals != null) + drRmvValsIt = conflictRmvVals.iterator(); Map<UUID, GridNearAtomicUpdateRequest<K, V>> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); @@ -661,7 +661,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> throw err; } } - else if (drPutVals != null) { + else if (conflictPutVals != null) { GridCacheDrInfo<V> drPutVal = drPutValsIt.next(); val = drPutVal.value(); @@ -669,7 +669,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object> drExpireTime = drPutVal.expireTime(); drVer = drPutVal.version(); } - else if (drRmvVals != null) { + else if (conflictRmvVals != null) { val = null; drTtl = -1; drExpireTime = -1;