Applied fix.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e8a0c3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e8a0c3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e8a0c3f

Branch: refs/heads/ignite-109
Commit: 8e8a0c3ffa15442018b9dc696f12210ee36b9898
Parents: 9996140
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Wed Jan 28 17:38:15 2015 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Wed Jan 28 17:38:15 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |  43 +++++
 .../processors/cache/GridCacheEntryEx.java      |   8 +
 .../processors/cache/GridCacheMapEntry.java     | 116 +++++++++---
 .../cache/GridCacheUpdateAtomicResult.java      |   6 +-
 .../processors/cache/GridDrResolveResult.java   |  63 -------
 .../GridDistributedTxRemoteAdapter.java         |  47 +++--
 .../dht/atomic/GridDhtAtomicCache.java          |  10 +-
 .../processors/cache/dr/GridCacheDrManager.java |  57 ++----
 .../cache/dr/os/GridOsCacheDrManager.java       |  35 +---
 .../cache/transactions/IgniteTxAdapter.java     |  45 +++++
 .../transactions/IgniteTxLocalAdapter.java      |  52 +++--
 ...ridCacheVersionAbstractConflictResolver.java |  56 ++++++
 .../GridCacheVersionConflictContext.java        |  73 +++++++
 .../GridCacheVersionConflictContextImpl.java    | 188 +++++++++++++++++++
 .../GridCacheVersionConflictResolver.java       |  59 ++++++
 .../version/GridCacheVersionedEntryEx.java      |   2 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   5 +
 17 files changed, 666 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 571a7a4..3fb5329 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -190,6 +190,9 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     /** Cache weak query iterator holder. */
     private CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder;
 
+    /** Conflict resolver. */
+    private GridCacheVersionAbstractConflictResolver conflictRslvr;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -306,6 +309,14 @@ public class GridCacheContext<K, V> implements 
Externalizable {
             expiryPlc = null;
 
         itHolder = new CacheWeakQueryIteratorsHolder(log);
+
+        // Conflict resolver is determined in two stages:
+        // 1. If DR receiver hub is enabled, then pick it from DR manager.
+        // 2. Otherwise instantiate default resolver in case local store is 
configured.
+        conflictRslvr = drMgr.conflictResolver();
+
+        if (conflictRslvr == null && storeMgr.isLocalStore())
+            conflictRslvr = new GridCacheVersionConflictResolver();
     }
 
     /**
@@ -1546,6 +1557,38 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     }
 
     /**
+     * Check whether conflict resolution is required.
+     *
+     * @param oldVer Old version.
+     * @param newVer New version.
+     * @return {@code True} in case DR is required.
+     */
+    public boolean conflictNeedResolve(GridCacheVersion oldVer, 
GridCacheVersion newVer) {
+        return conflictRslvr != null;
+    }
+
+    /**
+     * Resolve DR conflict.
+     *
+     * @param oldEntry Old entry.
+     * @param newEntry New entry.
+     * @param atomicVerComparator Whether to use atomic version comparator.
+     * @return Conflict resolution result.
+     * @throws IgniteCheckedException In case of exception.
+     */
+    public GridCacheVersionConflictContextImpl<K, V> 
conflictResolve(GridCacheVersionedEntryEx<K, V> oldEntry,
+        GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComparator) 
throws IgniteCheckedException {
+        assert conflictRslvr != null : "Should not reach this place.";
+
+        GridCacheVersionConflictContextImpl<K, V> ctx = 
conflictRslvr.resolve(oldEntry, newEntry, atomicVerComparator);
+
+        if (ctx.isManualResolve())
+            drMgr.onReceiveCacheConflictResolved(ctx.isUseNew(), 
ctx.isUseOld(), ctx.isMerge());
+
+        return ctx;
+    }
+
+    /**
      * @return Data center ID.
      */
     public byte dataCenterId() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 6748d6e..8eeacc5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -671,6 +671,14 @@ public interface GridCacheEntryEx<K, V> {
         throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
+     * Create versioned entry for this cache entry.
+     *
+     * @return Versioned entry.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public GridCacheVersionedEntryEx<K, V> versionedEntry() throws 
IgniteCheckedException;
+
+    /**
      * Sets new value if passed in version matches the current version
      * (used for read-through only).
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 0b34457..3c4e9d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -68,7 +68,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
     private static final byte IS_UNSWAPPED_MASK = 0x02;
 
     /** */
-    private static final Comparator<GridCacheVersion> ATOMIC_VER_COMPARATOR = 
new GridCacheAtomicVersionComparator();
+    public static final Comparator<GridCacheVersion> ATOMIC_VER_COMPARATOR = 
new GridCacheAtomicVersionComparator();
 
     /**
      * NOTE
@@ -1658,7 +1658,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
         GridCacheVersion enqueueVer = null;
 
-        GridDrResolveResult<V> drRes = null;
+        GridCacheVersionConflictContextImpl<K, V> drRes = null;
 
         EntryProcessorResult<?> invokeRes = null;
 
@@ -1675,49 +1675,113 @@ public abstract class GridCacheMapEntry<K, V> 
implements GridCacheEntryEx<K, V>
             if (isNew())
                 unswap(true, retval);
 
+            boolean newTtlResolved = false;
+
+            boolean drNeedResolve = false;
+
             Object transformClo = null;
 
             if (drResolve) {
-                drRes = cctx.dr().resolveAtomic(this,
-                    op,
-                    writeObj,
-                    valBytes,
-                    expiryPlc,
-                    drTtl,
-                    drExpireTime,
-                    drVer);
+                GridCacheVersion oldDrVer = version().drVersion();
+
+                drNeedResolve = cctx.conflictNeedResolve(oldDrVer, drVer);
+
+                if (drNeedResolve) {
+                    // Get old value.
+                    V oldVal = rawGetOrUnmarshalUnlocked(true);
+
+                    if (writeObj == null && valBytes != null)
+                        writeObj = cctx.marshaller().unmarshal(valBytes, 
cctx.deploy().globalLoader());
+
+                    if (op == GridCacheOperation.TRANSFORM) {
+                        transformClo = writeObj;
+
+                        writeObj = ((IgniteClosure<V, 
V>)writeObj).apply(oldVal);
+                    }
+
+                    K k = key();
+
+                    if (drTtl >= 0L) {
+                        // DR TTL is set explicitly
+                        assert drExpireTime >= 0L;
+
+                        newTtl = drTtl;
+                        newExpireTime = drExpireTime;
+                    }
+                    else {
+                        long ttl = expiryPlc != null ? (isNew() ? 
expiryPlc.forCreate() : expiryPlc.forUpdate()) : -1L;
+
+                        newTtl = ttl < 0 ? ttlExtras() : ttl;
+                        newExpireTime = CU.toExpireTime(newTtl);
+                    }
+
+                    newTtlResolved = true;
+
+                    GridCacheVersionedEntryEx<K, V> oldEntry = 
versionedEntry();
+                    GridCacheVersionedEntryEx<K, V> newEntry =
+                        new GridCachePlainVersionedEntry<>(k, (V)writeObj, 
newTtl, newExpireTime, drVer);
+
+                    drRes = cctx.conflictResolve(oldEntry, newEntry, verCheck);
+
+                    assert drRes != null;
 
-                if (drRes != null) {
                     if (drRes.isUseOld()) {
+                        // Handle special case with atomic comparator.
+                        if (!isNew() &&                                        
    // Not initial value,
+                            verCheck &&                                        
    // and atomic version check,
+                            oldDrVer.dataCenterId() == drVer.dataCenterId() && 
    // and data centers are equal,
+                            ATOMIC_VER_COMPARATOR.compare(oldDrVer, drVer) == 
0 && // and both versions are equal,
+                            cctx.writeThrough() &&                             
    // and store is enabled,
+                            primary)                                           
    // and we are primary.
+                        {
+                            V val = rawGetOrUnmarshalUnlocked(false);
+
+                            if (val == null) {
+                                assert deletedUnlocked();
+
+                                cctx.store().removeFromStore(null, key());
+                            }
+                            else
+                                cctx.store().putToStore(null, key(), val, ver);
+                        }
+
                         old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
 
                         return new GridCacheUpdateAtomicResult<>(false,
                             old,
                             null,
                             invokeRes,
-                            -1L,
+                            0L,
                             -1L,
                             null,
                             null,
                             false);
                     }
+                    else if (drRes.isUseNew())
+                        op = writeObj != null ? GridCacheOperation.UPDATE : 
GridCacheOperation.DELETE;
+                    else {
+                        assert drRes.isMerge();
 
-                    newTtl = drRes.newTtl();
-
-                    newExpireTime = drRes.newExpireTime();
-
-                    newDrExpireTime = drRes.newDrExpireTime();
+                        writeObj = drRes.mergeValue();
+                        valBytes = null;
 
-                    op = drRes.operation();
+                        drVer = null; // Update will be considered as local.
 
-                    writeObj = drRes.value();
+                        op = writeObj != null ? GridCacheOperation.UPDATE : 
GridCacheOperation.DELETE;
+                    }
 
-                    valBytes = drRes.valueBytes();
+                    newTtl = drRes.ttl();
+                    newExpireTime = drRes.expireTime();
 
-                    if (drRes.isMerge())
-                        drVer = null; // Update will be considered as local.
+                    // Explicit DR expire time will be passed to remote node 
only in that case.
+                    if (!drRes.explicitTtl() && !drRes.isMerge()) {
+                        if (drRes.isUseNew() && newEntry.dataCenterId() != 
cctx.dataCenterId() ||
+                            drRes.isUseOld() && oldEntry.dataCenterId() != 
cctx.dataCenterId())
+                            newDrExpireTime = drRes.expireTime();
+                    }
                 }
                 else
+                    // Nullify DR version on this update, so that we will use 
regular version during next updates.
                     drVer = null;
             }
 
@@ -3095,6 +3159,14 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
     }
 
     /** {@inheritDoc} */
+    @Override public synchronized GridCacheVersionedEntryEx<K, V> 
versionedEntry() throws IgniteCheckedException {
+        boolean isNew = isStartVersion();
+
+        return new GridCachePlainVersionedEntry<>(key, isNew ? unswap(true, 
true) : rawGetOrUnmarshalUnlocked(false),
+            ttlExtras(), expireTimeExtras(), ver.drVersion(), isNew);
+    }
+
+    /** {@inheritDoc} */
     @Override public synchronized boolean versionedValue(V val, 
GridCacheVersion curVer, GridCacheVersion newVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         checkObsolete();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 0b7d776..34be479 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -51,7 +51,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
 
     /** DR resolution result. */
     @GridToStringInclude
-    private final GridDrResolveResult<V> drRes;
+    private final GridCacheVersionConflictContextImpl<K, V> drRes;
 
     /** Whether update should be propagated to DHT node. */
     private final boolean sndToDht;
@@ -79,7 +79,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
         long newTtl,
         long drExpireTime,
         @Nullable GridCacheVersion rmvVer,
-        @Nullable GridDrResolveResult<V> drRes,
+        @Nullable GridCacheVersionConflictContextImpl<K, V> drRes,
         boolean sndToDht) {
         this.success = success;
         this.oldVal = oldVal;
@@ -144,7 +144,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
     /**
      * @return DR conflict resolution context.
      */
-    @Nullable public GridDrResolveResult<V> drResolveResult() {
+    @Nullable public GridCacheVersionConflictContextImpl<K, V> 
drResolveResult() {
         return drRes;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDrResolveResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDrResolveResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDrResolveResult.java
deleted file mode 100644
index faf71ca..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDrResolveResult.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-/**
- *
- */
-public interface GridDrResolveResult<V> {
-    /**
-     * @return TTL.
-     */
-    public long newTtl();
-
-    /**
-     * @return Expire time.
-     */
-    public long newExpireTime();
-
-    /**
-     * @return DR expire time.
-     */
-    public long newDrExpireTime();
-
-    /**
-     * @return {@code True} in case merge is to be performed.
-     */
-    public boolean isMerge();
-
-    /**
-     * @return {@code True} in case old value should be used.
-     */
-    public boolean isUseOld();
-
-    /**
-     * @return Cache operation.
-     */
-    public GridCacheOperation operation();
-
-    /**
-     * @return Value.
-     */
-    public V value();
-
-    /**
-     * @return Value bytes.
-     */
-    public byte[] valueBytes();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index cea3a0d..f7376cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -509,24 +509,37 @@ public class GridDistributedTxRemoteAdapter<K, V> extends 
IgniteTxAdapter<K, V>
                                         if (explicitVer == null)
                                             explicitVer = writeVersion(); // 
Force write version to be used.
 
-                                        GridDrResolveResult<V> drRes = 
cacheCtx.dr().resolveTx(cached,
-                                            txEntry,
-                                            explicitVer,
-                                            op,
-                                            val,
-                                            valBytes,
-                                            txEntry.ttl(),
-                                            txEntry.drExpireTime());
-
-                                        if (drRes != null) {
-                                            op = drRes.operation();
-                                            val = drRes.value();
-                                            valBytes = drRes.valueBytes();
-
-                                            if (drRes.isMerge())
+                                        boolean drNeedResolve =
+                                            
cacheCtx.conflictNeedResolve(cached.version(), explicitVer);
+
+                                        if (drNeedResolve) {
+                                            IgniteBiTuple<GridCacheOperation, 
GridCacheVersionConflictContextImpl<K, V>>
+                                                drRes = conflictResolve(op, 
txEntry.key(), val, valBytes,
+                                                txEntry.ttl(), 
txEntry.drExpireTime(), explicitVer, cached);
+
+                                            assert drRes != null;
+
+                                            
GridCacheVersionConflictContextImpl<K, V> drCtx = drRes.get2();
+
+                                            if (drCtx.isUseOld())
+                                                op = NOOP;
+                                            else if (drCtx.isUseNew()) {
+                                                txEntry.ttl(drCtx.ttl());
+
+                                                if 
(drCtx.newEntry().dataCenterId() != cacheCtx.dataCenterId())
+                                                    
txEntry.drExpireTime(drCtx.expireTime());
+                                                else
+                                                    txEntry.drExpireTime(-1L);
+                                            }
+                                            else if (drCtx.isMerge()) {
+                                                op = drRes.get1();
+                                                val = drCtx.mergeValue();
+                                                valBytes = null;
                                                 explicitVer = writeVersion();
-                                            else if (op == NOOP)
-                                                txEntry.ttl(-1L);
+
+                                                txEntry.ttl(drCtx.ttl());
+                                                txEntry.drExpireTime(-1L);
+                                            }
                                         }
                                         else
                                             // Nullify explicit version so 
that innerSet/innerRemove will work as usual.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c993397..c35743f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1083,7 +1083,11 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         if (plc != null)
                             expiry = new UpdateExpiryPolicy(plc);
 
-                        if (writeThrough() && keys.size() > 1 && 
!ctx.dr().receiveEnabled()) {
+                        if (keys.size() > 1 &&                             // 
Several keys ...
+                            writeThrough() &&                              // 
and store is enabled ...
+                            !ctx.store().isLocalStore() &&                 // 
and this is not local store ...
+                            !ctx.dr().receiveEnabled()  // and no DR.
+                        ) {
                             // This method can only be used when there are no 
replicated entries in the batch.
                             UpdateBatchResult<K, V> updRes = 
updateWithBatch(node,
                                 hasNear,
@@ -1681,7 +1685,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                 if (dhtFut != null) {
                     if (updRes.sendToDht()) { // Send to backups even in case 
of remove-remove scenarios.
-                        GridDrResolveResult<V> ctx = updRes.drResolveResult();
+                        GridCacheVersionConflictContextImpl<K, V> ctx = 
updRes.drResolveResult();
 
                         long ttl = updRes.newTtl();
                         long expireTime = updRes.drExpireTime();
@@ -1727,7 +1731,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 if (hasNear) {
                     if (primary && updRes.sendToDht()) {
                         if (!ctx.affinity().belongs(node, entry.partition(), 
topVer)) {
-                            GridDrResolveResult<V> ctx = 
updRes.drResolveResult();
+                            GridCacheVersionConflictContextImpl<K, V> ctx = 
updRes.drResolveResult();
 
                             long ttl = updRes.newTtl();
                             long expireTime = updRes.drExpireTime();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
index ff83198..d0a0c26 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
@@ -34,53 +34,9 @@ public interface GridCacheDrManager<K, V> extends 
GridCacheManager<K, V> {
     public byte dataCenterId();
 
     /**
-     * Handles DR for atomic cache.
-     *
-     * @param e Cache entry.
-     * @param op Operation.
-     * @param writeObj New value.
-     * @param valBytes New value byte.
-     * @param ttl TTL.
-     * @param drTtl DR TTL.
-     * @param drExpireTime DR expire time
-     * @param drVer DR version.
-     * @return DR result.
-     * @throws IgniteCheckedException If update failed.
-     * @throws GridCacheEntryRemovedException If entry is obsolete.
-     */
-    public GridDrResolveResult<V> resolveAtomic(GridCacheEntryEx<K, V> e,
-         GridCacheOperation op,
-         @Nullable Object writeObj,
-         @Nullable byte[] valBytes,
-         @Nullable IgniteCacheExpiryPolicy expiryPlc,
-         long drTtl,
-         long drExpireTime,
-         @Nullable GridCacheVersion drVer) throws IgniteCheckedException, 
GridCacheEntryRemovedException;
-
-    /**
-     * Handles DR for transactional cache.
-     *
-     * @param e Cache entry.
-     * @param txEntry Transaction entry.
-     * @param newVer Version.
-     * @param op Operation.
-     * @param newVal New value.
-     * @param newValBytes New value bytes.
-     * @param newTtl TTL.
-     * @param newDrExpireTime DR expire time
-     * @return DR result.
-     * @throws IgniteCheckedException If update failed.
-     * @throws GridCacheEntryRemovedException If entry is obsolete.
+     * @return Cache version conflict resolver.
      */
-    public GridDrResolveResult<V> resolveTx(
-        GridCacheEntryEx<K, V> e,
-        IgniteTxEntry<K, V> txEntry,
-        GridCacheVersion newVer,
-        GridCacheOperation op,
-        V newVal,
-        byte[] newValBytes,
-        long newTtl,
-        long newDrExpireTime) throws IgniteCheckedException, 
GridCacheEntryRemovedException;
+    public GridCacheVersionAbstractConflictResolver conflictResolver();
 
     /**
      * Performs replication.
@@ -138,6 +94,15 @@ public interface GridCacheDrManager<K, V> extends 
GridCacheManager<K, V> {
     public void onReceiveCacheEntriesReceived(int entriesCnt);
 
     /**
+     * Callback for manual conflict resolution.
+     *
+     * @param useNew Use new.
+     * @param useOld Use old.
+     * @param merge Merge.
+     */
+    public void onReceiveCacheConflictResolved(boolean useNew, boolean useOld, 
boolean merge);
+
+    /**
      * Resets metrics for current cache.
      */
     public void resetMetrics();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
index 20b8804..49f617b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.dr.os;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.dr.*;
 import org.jetbrains.annotations.*;
@@ -65,6 +64,11 @@ public class GridOsCacheDrManager<K, V> implements 
GridCacheDrManager<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public GridCacheVersionAbstractConflictResolver 
conflictResolver() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void replicate(K key,
         @Nullable byte[] keyBytes,
         @Nullable V val,
@@ -77,30 +81,6 @@ public class GridOsCacheDrManager<K, V> implements 
GridCacheDrManager<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDrResolveResult<V> resolveAtomic(GridCacheEntryEx<K, 
V> e,
-        GridCacheOperation op,
-        @Nullable Object writeObj,
-        @Nullable byte[] valBytes,
-        @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        long drTtl,
-        long drExpireTime,
-        @Nullable GridCacheVersion drVer) throws IgniteCheckedException, 
GridCacheEntryRemovedException {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDrResolveResult<V> resolveTx(GridCacheEntryEx<K, V> e,
-        IgniteTxEntry<K, V> txEntry,
-        GridCacheVersion newVer,
-        GridCacheOperation op,
-        V newVal,
-        byte[] newValBytes,
-        long newTtl,
-        long newDrExpireTime) throws IgniteCheckedException, 
GridCacheEntryRemovedException {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public void beforeExchange(long topVer, boolean left) throws 
IgniteCheckedException {
         // No-op.
     }
@@ -116,6 +96,11 @@ public class GridOsCacheDrManager<K, V> implements 
GridCacheDrManager<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public void onReceiveCacheConflictResolved(boolean useNew, 
boolean useOld, boolean merge) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void resetMetrics() {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 47cd12b..e079a5c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1252,6 +1252,51 @@ public abstract class IgniteTxAdapter<K, V> extends 
GridMetadataAwareAdapter
     }
 
     /**
+     * Resolve DR conflict.
+     *
+     * @param op Initially proposed operation.
+     * @param key Key.
+     * @param newVal New value.
+     * @param newValBytes New value bytes.
+     * @param newTtl New TTL.
+     * @param newDrExpireTime New explicit DR expire time.
+     * @param newVer New version.
+     * @param old Old entry.
+     * @return Tuple with adjusted operation type and conflict context.
+     * @throws org.apache.ignite.IgniteCheckedException In case of eny 
exception.
+     * @throws GridCacheEntryRemovedException If entry got removed.
+     */
+    protected IgniteBiTuple<GridCacheOperation, 
GridCacheVersionConflictContextImpl<K, V>> conflictResolve(
+        GridCacheOperation op, K key, V newVal, byte[] newValBytes, long 
newTtl, long newDrExpireTime,
+        GridCacheVersion newVer, GridCacheEntryEx<K, V> old)
+        throws IgniteCheckedException, GridCacheEntryRemovedException {
+        // Construct old entry info.
+        GridCacheVersionedEntryEx<K, V> oldEntry = old.versionedEntry();
+
+        // Construct new entry info.
+        if (newVal == null && newValBytes != null)
+            newVal = cctx.marshaller().unmarshal(newValBytes, 
cctx.deploy().globalLoader());
+
+        long newExpireTime = newDrExpireTime >= 0L ? newDrExpireTime : 
CU.toExpireTime(newTtl);
+
+        GridCacheVersionedEntryEx<K, V> newEntry =
+            new GridCachePlainVersionedEntry<K, V>(key, newVal, newTtl, 
newExpireTime, newVer);
+
+        GridCacheVersionConflictContextImpl<K, V> ctx = 
old.context().conflictResolve(oldEntry, newEntry, false);
+
+        if (ctx.isMerge()) {
+            V resVal = ctx.mergeValue();
+
+            if ((op == CREATE || op == UPDATE) && resVal == null)
+                op = DELETE;
+            else if (op == DELETE && resVal != null)
+                op = old.isNewLocked() ? CREATE : UPDATE;
+        }
+
+        return F.t(op, ctx);
+    }
+
+    /**
      * @param e Transaction entry.
      * @param primaryOnly Flag to include backups into check or not.
      * @return {@code True} if entry is locally mapped as a primary or back up 
node.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 46ab74f..d9c49d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -699,31 +699,45 @@ public abstract class IgniteTxLocalAdapter<K, V> extends 
IgniteTxAdapter<K, V>
                                         }
                                     }
 
-                                    GridDrResolveResult<V> drRes = 
cacheCtx.dr().resolveTx(cached,
-                                        txEntry,
-                                        explicitVer,
-                                        op,
-                                        val,
-                                        valBytes,
-                                        txEntry.ttl(),
-                                        txEntry.drExpireTime());
-
-                                    if (drRes != null) {
-                                        op = drRes.operation();
-                                        val = drRes.value();
-                                        valBytes = drRes.valueBytes();
-
-                                        if (drRes.isMerge())
+                                    boolean drNeedResolve = 
cacheCtx.conflictNeedResolve(cached.version(), explicitVer);
+
+                                    if (drNeedResolve) {
+                                        IgniteBiTuple<GridCacheOperation, 
GridCacheVersionConflictContextImpl<K, V>>
+                                            drRes = conflictResolve(op, 
txEntry.key(), val, valBytes, txEntry.ttl(),
+                                                txEntry.drExpireTime(), 
explicitVer, cached);
+
+                                        assert drRes != null;
+
+                                        GridCacheVersionConflictContextImpl<K, 
V> conflictCtx = drRes.get2();
+
+                                        if (conflictCtx.isUseOld())
+                                            op = NOOP;
+                                        else if (conflictCtx.isUseNew()) {
+                                            txEntry.ttl(conflictCtx.ttl());
+
+                                            if 
(conflictCtx.newEntry().dataCenterId() != cctx.dataCenterId())
+                                                
txEntry.drExpireTime(conflictCtx.expireTime());
+                                            else
+                                                txEntry.drExpireTime(-1L);
+                                        }
+                                        else {
+                                            assert conflictCtx.isMerge();
+
+                                            op = drRes.get1();
+                                            val = conflictCtx.mergeValue();
+                                            valBytes = null;
                                             explicitVer = writeVersion();
-                                        else if (op == NOOP)
-                                            txEntry.ttl(-1L);
+
+                                            txEntry.ttl(conflictCtx.ttl());
+                                            txEntry.drExpireTime(-1L);
+                                        }
                                     }
                                     else
                                         // Nullify explicit version so that 
innerSet/innerRemove will work as usual.
                                         explicitVer = null;
 
-                                    if (sndTransformedVals || (drRes != null)) 
{
-                                        assert sndTransformedVals && 
cacheCtx.isReplicated() || (drRes != null);
+                                    if (sndTransformedVals || drNeedResolve) {
+                                        assert sndTransformedVals && 
cacheCtx.isReplicated() || drNeedResolve;
 
                                         txEntry.value(val, true, false);
                                         txEntry.valueBytes(valBytes);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAbstractConflictResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAbstractConflictResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAbstractConflictResolver.java
new file mode 100644
index 0000000..a91bd4d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAbstractConflictResolver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.version;
+
+import org.apache.ignite.*;
+
+/**
+ * Cache version conflict resolver.
+ */
+public abstract class GridCacheVersionAbstractConflictResolver {
+    /**
+     * Resolve the conflict.
+     *
+     * @param oldEntry Old entry.
+     * @param newEntry New entry.
+     * @param atomicVerComparator Whether to use atomic version comparator.
+     * @return Conflict resolution context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public <K, V> GridCacheVersionConflictContextImpl<K, V> 
resolve(GridCacheVersionedEntryEx<K, V> oldEntry,
+        GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComparator) 
throws IgniteCheckedException {
+        GridCacheVersionConflictContextImpl<K, V> ctx = new 
GridCacheVersionConflictContextImpl<>(oldEntry, newEntry);
+
+        resolve0(ctx, oldEntry, newEntry, atomicVerComparator);
+
+        return ctx;
+    }
+
+    /**
+     * Internal conflict resolution routine.
+     *
+     * @param ctx Context.
+     * @param oldEntry Old entry.
+     * @param newEntry New entry.
+     * @param atomicVerComparator Whether to use atomic version comparator.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract <K, V> void 
resolve0(GridCacheVersionConflictContextImpl<K, V> ctx,
+        GridCacheVersionedEntryEx<K, V> oldEntry, GridCacheVersionedEntryEx<K, 
V> newEntry,
+        boolean atomicVerComparator) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
new file mode 100644
index 0000000..72c323b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.version;
+
+import org.apache.ignite.cache.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Cache version conflict context.
+ */
+public interface GridCacheVersionConflictContext<K, V> {
+    /**
+     * Gets old (existing) cache entry.
+     *
+     * @return Old (existing) cache entry.
+     */
+    public GridCacheVersionedEntry<K, V> oldEntry();
+
+    /**
+     * Gets new cache entry.
+     *
+     * @return New cache entry.
+     */
+    public GridCacheVersionedEntry<K, V> newEntry();
+
+    /**
+     * Force cache to ignore new entry and leave old (existing) entry 
unchanged.
+     */
+    public void useOld();
+
+    /**
+     * Force cache to apply new entry overwriting old (existing) entry.
+     * <p>
+     * Note that updates from remote data centers always have explicit TTL , 
while local data center
+     * updates will only have explicit TTL in case {@link 
CacheEntry#timeToLive(long)} was called
+     * before update. In the latter case new entry will pick TTL of the old 
(existing) entry, even
+     * if it was set through update from remote data center. it means that 
depending on concurrent
+     * update timings new update might pick unexpected TTL. For example, 
consider that three updates
+     * of the same key are performed: local update with explicit TTL (1) 
followed by another local
+     * update without explicit TTL (2) and one remote update (3). In this case 
you might expect that
+     * update (2) will pick TTL set during update (1). However, in case update 
(3) occurrs between (1)
+     * and (2) and it overwrites (1) during conflict resolution, then update 
(2) will pick TTL of
+     * update (3). To have predictable TTL in such cases you should either 
always set it explicitly
+     * through {@code GridCacheEntry.timeToLive(long)} or use {@link 
#merge(Object, long)}.
+     */
+    public void useNew();
+
+    /**
+     * Force cache to use neither old, nor new, but some other value passed as 
argument. In this case old
+     * value will be replaced with merge value and update will be considered 
as local.
+     * <p>
+     * Also in case of merge you have to specify new TTL explicitly. For 
unlimited TTL use {@code 0}.
+     *
+     * @param mergeVal Merge value or {@code null} to force remove.
+     * @param ttl Time to live in milliseconds.
+     */
+    public void merge(@Nullable V mergeVal, long ttl);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContextImpl.java
new file mode 100644
index 0000000..228a224
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContextImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.version;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Conflict context implementation.
+ */
+public class GridCacheVersionConflictContextImpl<K, V> implements 
GridCacheVersionConflictContext<K, V> {
+    /** Old entry. */
+    @GridToStringInclude
+    private final GridCacheVersionedEntry<K, V> oldEntry;
+
+    /** New entry. */
+    @GridToStringInclude
+    private final GridCacheVersionedEntry<K, V> newEntry;
+
+    /** Current state. */
+    private State state;
+
+    /** Current merge value. */
+    @GridToStringExclude
+    private V mergeVal;
+
+    /** TTL. */
+    private long ttl;
+
+    /** Explicit TTL flag. */
+    private boolean explicitTtl;
+
+    /** Manual resolve flag. */
+    private boolean manualResolve;
+
+    /**
+     * Constructor.
+     *
+     * @param oldEntry Old entry.
+     * @param newEntry New entry.
+     */
+    public GridCacheVersionConflictContextImpl(GridCacheVersionedEntry<K, V> 
oldEntry,
+        GridCacheVersionedEntry<K, V> newEntry) {
+        assert oldEntry != null && newEntry != null;
+        assert oldEntry.ttl() >= 0 && newEntry.ttl() >= 0;
+
+        this.oldEntry = oldEntry;
+        this.newEntry = newEntry;
+
+        // Set initial state.
+        useNew();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersionedEntry<K, V> oldEntry() {
+        return oldEntry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersionedEntry<K, V> newEntry() {
+        return newEntry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void useOld() {
+        state = State.USE_OLD;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void useNew() {
+        state = State.USE_NEW;
+
+        if (!explicitTtl)
+            ttl = newEntry.ttl();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void merge(@Nullable V mergeVal, long ttl) {
+        state = State.MERGE;
+
+        this.mergeVal = mergeVal;
+        this.ttl = ttl;
+
+        explicitTtl = true;
+    }
+
+    /**
+     * @return {@code True} in case old value should be used.
+     */
+    public boolean isUseOld() {
+        return state == State.USE_OLD;
+    }
+
+    /**
+     * @return {@code True} in case new value should be used.
+     */
+    public boolean isUseNew() {
+        return state == State.USE_NEW;
+    }
+
+    /**
+     * @return {@code True} in case merge is to be performed.
+     */
+    public boolean isMerge() {
+        return state == State.MERGE;
+    }
+
+    /**
+     * Set manual resolve class.
+     */
+    public void manualResolve() {
+        this.manualResolve = true;
+    }
+
+    /**
+     * @return Manual resolve flag.
+     */
+    public boolean isManualResolve() {
+        return manualResolve;
+    }
+
+    /**
+     * @return Value to merge (if any).
+     */
+    @Nullable public V mergeValue() {
+        return mergeVal;
+    }
+
+    /**
+     * @return TTL.
+     */
+    public long ttl() {
+        return ttl;
+    }
+
+    /**
+     * @return Expire time.
+     */
+    public long expireTime() {
+        return explicitTtl ? CU.toExpireTime(ttl) : isUseNew() ? 
newEntry.expireTime() :
+            isUseOld() ? oldEntry.expireTime() : 0L;
+    }
+
+    /**
+     * @return Explicit TTL flag.
+     */
+    public boolean explicitTtl() {
+        return explicitTtl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return state == State.MERGE ?
+            S.toString(GridCacheVersionConflictContextImpl.class, this, 
"mergeValue", mergeVal) :
+            S.toString(GridCacheVersionConflictContextImpl.class, this);
+    }
+
+    /**
+     * State.
+     */
+    private enum State {
+        /** Use old. */
+        USE_OLD,
+
+        /** Use new. */
+        USE_NEW,
+
+        /** Merge. */
+        MERGE
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictResolver.java
new file mode 100644
index 0000000..e327fb9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictResolver.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.version;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+
+/**
+ * Default conflict resolver.
+ */
+public class GridCacheVersionConflictResolver extends 
GridCacheVersionAbstractConflictResolver {
+    /** {@inheritDoc} */
+    @Override protected <K, V> void 
resolve0(GridCacheVersionConflictContextImpl<K, V> ctx,
+        GridCacheVersionedEntryEx<K, V> oldEntry, GridCacheVersionedEntryEx<K, 
V> newEntry,
+        boolean atomicVerComparator) throws IgniteCheckedException {
+        if (newEntry.dataCenterId() != oldEntry.dataCenterId())
+            ctx.useNew();
+        else {
+            if (oldEntry.isStartVersion())
+                ctx.useNew();
+            else {
+                if (atomicVerComparator) {
+                    // Handle special case when version check using ATOMIC 
cache comparator is required.
+                    if 
(GridCacheMapEntry.ATOMIC_VER_COMPARATOR.compare(oldEntry.version(), 
newEntry.version()) >= 0)
+                        ctx.useOld();
+                    else
+                        ctx.useNew();
+                }
+                else {
+                    long topVerDiff = newEntry.topologyVersion() - 
oldEntry.topologyVersion();
+
+                    if (topVerDiff > 0)
+                        ctx.useNew();
+                    else if (topVerDiff < 0)
+                        ctx.useOld();
+                    else if (newEntry.order() > oldEntry.order())
+                        ctx.useNew();
+                    else
+                        ctx.useOld();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java
index b8f7c19..5351966 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java
@@ -25,7 +25,7 @@ import org.apache.ignite.cache.*;
 public interface GridCacheVersionedEntryEx<K, V> extends 
GridCacheVersionedEntry<K, V>, GridCacheVersionable {
     /**
      *
-     * @return
+     * @return {@code True} if entry is new.
      */
     public boolean isStartVersion();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 60df4e4..d1b6ce6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -631,6 +631,11 @@ public class GridCacheTestEntryEx<K, V> extends 
GridMetadataAwareAdapter impleme
     }
 
     /** @inheritDoc */
+    @Override public GridCacheVersionedEntryEx<K, V> versionedEntry() throws 
IgniteCheckedException {
+        return null;
+    }
+
+    /** @inheritDoc */
     @Override public boolean versionedValue(V val, GridCacheVersion curVer, 
GridCacheVersion newVer) {
         assert false; return false;
     }

Reply via email to