Applied fix.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e8a0c3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e8a0c3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e8a0c3f Branch: refs/heads/ignite-6 Commit: 8e8a0c3ffa15442018b9dc696f12210ee36b9898 Parents: 9996140 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Jan 28 17:38:15 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Jan 28 17:38:15 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 43 +++++ .../processors/cache/GridCacheEntryEx.java | 8 + .../processors/cache/GridCacheMapEntry.java | 116 +++++++++--- .../cache/GridCacheUpdateAtomicResult.java | 6 +- .../processors/cache/GridDrResolveResult.java | 63 ------- .../GridDistributedTxRemoteAdapter.java | 47 +++-- .../dht/atomic/GridDhtAtomicCache.java | 10 +- .../processors/cache/dr/GridCacheDrManager.java | 57 ++---- .../cache/dr/os/GridOsCacheDrManager.java | 35 +--- .../cache/transactions/IgniteTxAdapter.java | 45 +++++ .../transactions/IgniteTxLocalAdapter.java | 52 +++-- ...ridCacheVersionAbstractConflictResolver.java | 56 ++++++ .../GridCacheVersionConflictContext.java | 73 +++++++ .../GridCacheVersionConflictContextImpl.java | 188 +++++++++++++++++++ .../GridCacheVersionConflictResolver.java | 59 ++++++ .../version/GridCacheVersionedEntryEx.java | 2 +- .../processors/cache/GridCacheTestEntryEx.java | 5 + 17 files changed, 666 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 571a7a4..3fb5329 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -190,6 +190,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Cache weak query iterator holder. */ private CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder; + /** Conflict resolver. */ + private GridCacheVersionAbstractConflictResolver conflictRslvr; + /** * Empty constructor required for {@link Externalizable}. */ @@ -306,6 +309,14 @@ public class GridCacheContext<K, V> implements Externalizable { expiryPlc = null; itHolder = new CacheWeakQueryIteratorsHolder(log); + + // Conflict resolver is determined in two stages: + // 1. If DR receiver hub is enabled, then pick it from DR manager. + // 2. Otherwise instantiate default resolver in case local store is configured. + conflictRslvr = drMgr.conflictResolver(); + + if (conflictRslvr == null && storeMgr.isLocalStore()) + conflictRslvr = new GridCacheVersionConflictResolver(); } /** @@ -1546,6 +1557,38 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * Check whether conflict resolution is required. + * + * @param oldVer Old version. + * @param newVer New version. + * @return {@code True} in case DR is required. + */ + public boolean conflictNeedResolve(GridCacheVersion oldVer, GridCacheVersion newVer) { + return conflictRslvr != null; + } + + /** + * Resolve DR conflict. + * + * @param oldEntry Old entry. + * @param newEntry New entry. + * @param atomicVerComparator Whether to use atomic version comparator. + * @return Conflict resolution result. + * @throws IgniteCheckedException In case of exception. + */ + public GridCacheVersionConflictContextImpl<K, V> conflictResolve(GridCacheVersionedEntryEx<K, V> oldEntry, + GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComparator) throws IgniteCheckedException { + assert conflictRslvr != null : "Should not reach this place."; + + GridCacheVersionConflictContextImpl<K, V> ctx = conflictRslvr.resolve(oldEntry, newEntry, atomicVerComparator); + + if (ctx.isManualResolve()) + drMgr.onReceiveCacheConflictResolved(ctx.isUseNew(), ctx.isUseOld(), ctx.isMerge()); + + return ctx; + } + + /** * @return Data center ID. */ public byte dataCenterId() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 6748d6e..8eeacc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -671,6 +671,14 @@ public interface GridCacheEntryEx<K, V> { throws IgniteCheckedException, GridCacheEntryRemovedException; /** + * Create versioned entry for this cache entry. + * + * @return Versioned entry. + * @throws IgniteCheckedException In case of error. + */ + public GridCacheVersionedEntryEx<K, V> versionedEntry() throws IgniteCheckedException; + + /** * Sets new value if passed in version matches the current version * (used for read-through only). * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 0b34457..3c4e9d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -68,7 +68,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> private static final byte IS_UNSWAPPED_MASK = 0x02; /** */ - private static final Comparator<GridCacheVersion> ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator(); + public static final Comparator<GridCacheVersion> ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator(); /** * NOTE @@ -1658,7 +1658,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheVersion enqueueVer = null; - GridDrResolveResult<V> drRes = null; + GridCacheVersionConflictContextImpl<K, V> drRes = null; EntryProcessorResult<?> invokeRes = null; @@ -1675,49 +1675,113 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (isNew()) unswap(true, retval); + boolean newTtlResolved = false; + + boolean drNeedResolve = false; + Object transformClo = null; if (drResolve) { - drRes = cctx.dr().resolveAtomic(this, - op, - writeObj, - valBytes, - expiryPlc, - drTtl, - drExpireTime, - drVer); + GridCacheVersion oldDrVer = version().drVersion(); + + drNeedResolve = cctx.conflictNeedResolve(oldDrVer, drVer); + + if (drNeedResolve) { + // Get old value. + V oldVal = rawGetOrUnmarshalUnlocked(true); + + if (writeObj == null && valBytes != null) + writeObj = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); + + if (op == GridCacheOperation.TRANSFORM) { + transformClo = writeObj; + + writeObj = ((IgniteClosure<V, V>)writeObj).apply(oldVal); + } + + K k = key(); + + if (drTtl >= 0L) { + // DR TTL is set explicitly + assert drExpireTime >= 0L; + + newTtl = drTtl; + newExpireTime = drExpireTime; + } + else { + long ttl = expiryPlc != null ? (isNew() ? expiryPlc.forCreate() : expiryPlc.forUpdate()) : -1L; + + newTtl = ttl < 0 ? ttlExtras() : ttl; + newExpireTime = CU.toExpireTime(newTtl); + } + + newTtlResolved = true; + + GridCacheVersionedEntryEx<K, V> oldEntry = versionedEntry(); + GridCacheVersionedEntryEx<K, V> newEntry = + new GridCachePlainVersionedEntry<>(k, (V)writeObj, newTtl, newExpireTime, drVer); + + drRes = cctx.conflictResolve(oldEntry, newEntry, verCheck); + + assert drRes != null; - if (drRes != null) { if (drRes.isUseOld()) { + // Handle special case with atomic comparator. + if (!isNew() && // Not initial value, + verCheck && // and atomic version check, + oldDrVer.dataCenterId() == drVer.dataCenterId() && // and data centers are equal, + ATOMIC_VER_COMPARATOR.compare(oldDrVer, drVer) == 0 && // and both versions are equal, + cctx.writeThrough() && // and store is enabled, + primary) // and we are primary. + { + V val = rawGetOrUnmarshalUnlocked(false); + + if (val == null) { + assert deletedUnlocked(); + + cctx.store().removeFromStore(null, key()); + } + else + cctx.store().putToStore(null, key(), val, ver); + } + old = retval ? rawGetOrUnmarshalUnlocked(false) : val; return new GridCacheUpdateAtomicResult<>(false, old, null, invokeRes, - -1L, + 0L, -1L, null, null, false); } + else if (drRes.isUseNew()) + op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; + else { + assert drRes.isMerge(); - newTtl = drRes.newTtl(); - - newExpireTime = drRes.newExpireTime(); - - newDrExpireTime = drRes.newDrExpireTime(); + writeObj = drRes.mergeValue(); + valBytes = null; - op = drRes.operation(); + drVer = null; // Update will be considered as local. - writeObj = drRes.value(); + op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; + } - valBytes = drRes.valueBytes(); + newTtl = drRes.ttl(); + newExpireTime = drRes.expireTime(); - if (drRes.isMerge()) - drVer = null; // Update will be considered as local. + // Explicit DR expire time will be passed to remote node only in that case. + if (!drRes.explicitTtl() && !drRes.isMerge()) { + if (drRes.isUseNew() && newEntry.dataCenterId() != cctx.dataCenterId() || + drRes.isUseOld() && oldEntry.dataCenterId() != cctx.dataCenterId()) + newDrExpireTime = drRes.expireTime(); + } } else + // Nullify DR version on this update, so that we will use regular version during next updates. drVer = null; } @@ -3095,6 +3159,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } /** {@inheritDoc} */ + @Override public synchronized GridCacheVersionedEntryEx<K, V> versionedEntry() throws IgniteCheckedException { + boolean isNew = isStartVersion(); + + return new GridCachePlainVersionedEntry<>(key, isNew ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false), + ttlExtras(), expireTimeExtras(), ver.drVersion(), isNew); + } + + /** {@inheritDoc} */ @Override public synchronized boolean versionedValue(V val, GridCacheVersion curVer, GridCacheVersion newVer) throws IgniteCheckedException, GridCacheEntryRemovedException { checkObsolete(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 0b7d776..34be479 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -51,7 +51,7 @@ public class GridCacheUpdateAtomicResult<K, V> { /** DR resolution result. */ @GridToStringInclude - private final GridDrResolveResult<V> drRes; + private final GridCacheVersionConflictContextImpl<K, V> drRes; /** Whether update should be propagated to DHT node. */ private final boolean sndToDht; @@ -79,7 +79,7 @@ public class GridCacheUpdateAtomicResult<K, V> { long newTtl, long drExpireTime, @Nullable GridCacheVersion rmvVer, - @Nullable GridDrResolveResult<V> drRes, + @Nullable GridCacheVersionConflictContextImpl<K, V> drRes, boolean sndToDht) { this.success = success; this.oldVal = oldVal; @@ -144,7 +144,7 @@ public class GridCacheUpdateAtomicResult<K, V> { /** * @return DR conflict resolution context. */ - @Nullable public GridDrResolveResult<V> drResolveResult() { + @Nullable public GridCacheVersionConflictContextImpl<K, V> drResolveResult() { return drRes; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDrResolveResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDrResolveResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDrResolveResult.java deleted file mode 100644 index faf71ca..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDrResolveResult.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -/** - * - */ -public interface GridDrResolveResult<V> { - /** - * @return TTL. - */ - public long newTtl(); - - /** - * @return Expire time. - */ - public long newExpireTime(); - - /** - * @return DR expire time. - */ - public long newDrExpireTime(); - - /** - * @return {@code True} in case merge is to be performed. - */ - public boolean isMerge(); - - /** - * @return {@code True} in case old value should be used. - */ - public boolean isUseOld(); - - /** - * @return Cache operation. - */ - public GridCacheOperation operation(); - - /** - * @return Value. - */ - public V value(); - - /** - * @return Value bytes. - */ - public byte[] valueBytes(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index cea3a0d..f7376cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -509,24 +509,37 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> if (explicitVer == null) explicitVer = writeVersion(); // Force write version to be used. - GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached, - txEntry, - explicitVer, - op, - val, - valBytes, - txEntry.ttl(), - txEntry.drExpireTime()); - - if (drRes != null) { - op = drRes.operation(); - val = drRes.value(); - valBytes = drRes.valueBytes(); - - if (drRes.isMerge()) + boolean drNeedResolve = + cacheCtx.conflictNeedResolve(cached.version(), explicitVer); + + if (drNeedResolve) { + IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContextImpl<K, V>> + drRes = conflictResolve(op, txEntry.key(), val, valBytes, + txEntry.ttl(), txEntry.drExpireTime(), explicitVer, cached); + + assert drRes != null; + + GridCacheVersionConflictContextImpl<K, V> drCtx = drRes.get2(); + + if (drCtx.isUseOld()) + op = NOOP; + else if (drCtx.isUseNew()) { + txEntry.ttl(drCtx.ttl()); + + if (drCtx.newEntry().dataCenterId() != cacheCtx.dataCenterId()) + txEntry.drExpireTime(drCtx.expireTime()); + else + txEntry.drExpireTime(-1L); + } + else if (drCtx.isMerge()) { + op = drRes.get1(); + val = drCtx.mergeValue(); + valBytes = null; explicitVer = writeVersion(); - else if (op == NOOP) - txEntry.ttl(-1L); + + txEntry.ttl(drCtx.ttl()); + txEntry.drExpireTime(-1L); + } } else // Nullify explicit version so that innerSet/innerRemove will work as usual. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index c993397..c35743f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1083,7 +1083,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (plc != null) expiry = new UpdateExpiryPolicy(plc); - if (writeThrough() && keys.size() > 1 && !ctx.dr().receiveEnabled()) { + if (keys.size() > 1 && // Several keys ... + writeThrough() && // and store is enabled ... + !ctx.store().isLocalStore() && // and this is not local store ... + !ctx.dr().receiveEnabled() // and no DR. + ) { // This method can only be used when there are no replicated entries in the batch. UpdateBatchResult<K, V> updRes = updateWithBatch(node, hasNear, @@ -1681,7 +1685,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (dhtFut != null) { if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. - GridDrResolveResult<V> ctx = updRes.drResolveResult(); + GridCacheVersionConflictContextImpl<K, V> ctx = updRes.drResolveResult(); long ttl = updRes.newTtl(); long expireTime = updRes.drExpireTime(); @@ -1727,7 +1731,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (hasNear) { if (primary && updRes.sendToDht()) { if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { - GridDrResolveResult<V> ctx = updRes.drResolveResult(); + GridCacheVersionConflictContextImpl<K, V> ctx = updRes.drResolveResult(); long ttl = updRes.newTtl(); long expireTime = updRes.drExpireTime(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java index ff83198..d0a0c26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java @@ -34,53 +34,9 @@ public interface GridCacheDrManager<K, V> extends GridCacheManager<K, V> { public byte dataCenterId(); /** - * Handles DR for atomic cache. - * - * @param e Cache entry. - * @param op Operation. - * @param writeObj New value. - * @param valBytes New value byte. - * @param ttl TTL. - * @param drTtl DR TTL. - * @param drExpireTime DR expire time - * @param drVer DR version. - * @return DR result. - * @throws IgniteCheckedException If update failed. - * @throws GridCacheEntryRemovedException If entry is obsolete. - */ - public GridDrResolveResult<V> resolveAtomic(GridCacheEntryEx<K, V> e, - GridCacheOperation op, - @Nullable Object writeObj, - @Nullable byte[] valBytes, - @Nullable IgniteCacheExpiryPolicy expiryPlc, - long drTtl, - long drExpireTime, - @Nullable GridCacheVersion drVer) throws IgniteCheckedException, GridCacheEntryRemovedException; - - /** - * Handles DR for transactional cache. - * - * @param e Cache entry. - * @param txEntry Transaction entry. - * @param newVer Version. - * @param op Operation. - * @param newVal New value. - * @param newValBytes New value bytes. - * @param newTtl TTL. - * @param newDrExpireTime DR expire time - * @return DR result. - * @throws IgniteCheckedException If update failed. - * @throws GridCacheEntryRemovedException If entry is obsolete. + * @return Cache version conflict resolver. */ - public GridDrResolveResult<V> resolveTx( - GridCacheEntryEx<K, V> e, - IgniteTxEntry<K, V> txEntry, - GridCacheVersion newVer, - GridCacheOperation op, - V newVal, - byte[] newValBytes, - long newTtl, - long newDrExpireTime) throws IgniteCheckedException, GridCacheEntryRemovedException; + public GridCacheVersionAbstractConflictResolver conflictResolver(); /** * Performs replication. @@ -138,6 +94,15 @@ public interface GridCacheDrManager<K, V> extends GridCacheManager<K, V> { public void onReceiveCacheEntriesReceived(int entriesCnt); /** + * Callback for manual conflict resolution. + * + * @param useNew Use new. + * @param useOld Use old. + * @param merge Merge. + */ + public void onReceiveCacheConflictResolved(boolean useNew, boolean useOld, boolean merge); + + /** * Resets metrics for current cache. */ public void resetMetrics(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java index 20b8804..49f617b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.dr.os; import org.apache.ignite.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.dr.*; -import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.dr.*; import org.jetbrains.annotations.*; @@ -65,6 +64,11 @@ public class GridOsCacheDrManager<K, V> implements GridCacheDrManager<K, V> { } /** {@inheritDoc} */ + @Override public GridCacheVersionAbstractConflictResolver conflictResolver() { + return null; + } + + /** {@inheritDoc} */ @Override public void replicate(K key, @Nullable byte[] keyBytes, @Nullable V val, @@ -77,30 +81,6 @@ public class GridOsCacheDrManager<K, V> implements GridCacheDrManager<K, V> { } /** {@inheritDoc} */ - @Override public GridDrResolveResult<V> resolveAtomic(GridCacheEntryEx<K, V> e, - GridCacheOperation op, - @Nullable Object writeObj, - @Nullable byte[] valBytes, - @Nullable IgniteCacheExpiryPolicy expiryPlc, - long drTtl, - long drExpireTime, - @Nullable GridCacheVersion drVer) throws IgniteCheckedException, GridCacheEntryRemovedException { - return null; - } - - /** {@inheritDoc} */ - @Override public GridDrResolveResult<V> resolveTx(GridCacheEntryEx<K, V> e, - IgniteTxEntry<K, V> txEntry, - GridCacheVersion newVer, - GridCacheOperation op, - V newVal, - byte[] newValBytes, - long newTtl, - long newDrExpireTime) throws IgniteCheckedException, GridCacheEntryRemovedException { - return null; - } - - /** {@inheritDoc} */ @Override public void beforeExchange(long topVer, boolean left) throws IgniteCheckedException { // No-op. } @@ -116,6 +96,11 @@ public class GridOsCacheDrManager<K, V> implements GridCacheDrManager<K, V> { } /** {@inheritDoc} */ + @Override public void onReceiveCacheConflictResolved(boolean useNew, boolean useOld, boolean merge) { + // No-op. + } + + /** {@inheritDoc} */ @Override public void resetMetrics() { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 47cd12b..e079a5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1252,6 +1252,51 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter } /** + * Resolve DR conflict. + * + * @param op Initially proposed operation. + * @param key Key. + * @param newVal New value. + * @param newValBytes New value bytes. + * @param newTtl New TTL. + * @param newDrExpireTime New explicit DR expire time. + * @param newVer New version. + * @param old Old entry. + * @return Tuple with adjusted operation type and conflict context. + * @throws org.apache.ignite.IgniteCheckedException In case of eny exception. + * @throws GridCacheEntryRemovedException If entry got removed. + */ + protected IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContextImpl<K, V>> conflictResolve( + GridCacheOperation op, K key, V newVal, byte[] newValBytes, long newTtl, long newDrExpireTime, + GridCacheVersion newVer, GridCacheEntryEx<K, V> old) + throws IgniteCheckedException, GridCacheEntryRemovedException { + // Construct old entry info. + GridCacheVersionedEntryEx<K, V> oldEntry = old.versionedEntry(); + + // Construct new entry info. + if (newVal == null && newValBytes != null) + newVal = cctx.marshaller().unmarshal(newValBytes, cctx.deploy().globalLoader()); + + long newExpireTime = newDrExpireTime >= 0L ? newDrExpireTime : CU.toExpireTime(newTtl); + + GridCacheVersionedEntryEx<K, V> newEntry = + new GridCachePlainVersionedEntry<K, V>(key, newVal, newTtl, newExpireTime, newVer); + + GridCacheVersionConflictContextImpl<K, V> ctx = old.context().conflictResolve(oldEntry, newEntry, false); + + if (ctx.isMerge()) { + V resVal = ctx.mergeValue(); + + if ((op == CREATE || op == UPDATE) && resVal == null) + op = DELETE; + else if (op == DELETE && resVal != null) + op = old.isNewLocked() ? CREATE : UPDATE; + } + + return F.t(op, ctx); + } + + /** * @param e Transaction entry. * @param primaryOnly Flag to include backups into check or not. * @return {@code True} if entry is locally mapped as a primary or back up node. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 46ab74f..d9c49d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -699,31 +699,45 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } } - GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached, - txEntry, - explicitVer, - op, - val, - valBytes, - txEntry.ttl(), - txEntry.drExpireTime()); - - if (drRes != null) { - op = drRes.operation(); - val = drRes.value(); - valBytes = drRes.valueBytes(); - - if (drRes.isMerge()) + boolean drNeedResolve = cacheCtx.conflictNeedResolve(cached.version(), explicitVer); + + if (drNeedResolve) { + IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContextImpl<K, V>> + drRes = conflictResolve(op, txEntry.key(), val, valBytes, txEntry.ttl(), + txEntry.drExpireTime(), explicitVer, cached); + + assert drRes != null; + + GridCacheVersionConflictContextImpl<K, V> conflictCtx = drRes.get2(); + + if (conflictCtx.isUseOld()) + op = NOOP; + else if (conflictCtx.isUseNew()) { + txEntry.ttl(conflictCtx.ttl()); + + if (conflictCtx.newEntry().dataCenterId() != cctx.dataCenterId()) + txEntry.drExpireTime(conflictCtx.expireTime()); + else + txEntry.drExpireTime(-1L); + } + else { + assert conflictCtx.isMerge(); + + op = drRes.get1(); + val = conflictCtx.mergeValue(); + valBytes = null; explicitVer = writeVersion(); - else if (op == NOOP) - txEntry.ttl(-1L); + + txEntry.ttl(conflictCtx.ttl()); + txEntry.drExpireTime(-1L); + } } else // Nullify explicit version so that innerSet/innerRemove will work as usual. explicitVer = null; - if (sndTransformedVals || (drRes != null)) { - assert sndTransformedVals && cacheCtx.isReplicated() || (drRes != null); + if (sndTransformedVals || drNeedResolve) { + assert sndTransformedVals && cacheCtx.isReplicated() || drNeedResolve; txEntry.value(val, true, false); txEntry.valueBytes(valBytes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAbstractConflictResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAbstractConflictResolver.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAbstractConflictResolver.java new file mode 100644 index 0000000..a91bd4d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionAbstractConflictResolver.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.*; + +/** + * Cache version conflict resolver. + */ +public abstract class GridCacheVersionAbstractConflictResolver { + /** + * Resolve the conflict. + * + * @param oldEntry Old entry. + * @param newEntry New entry. + * @param atomicVerComparator Whether to use atomic version comparator. + * @return Conflict resolution context. + * @throws IgniteCheckedException If failed. + */ + public <K, V> GridCacheVersionConflictContextImpl<K, V> resolve(GridCacheVersionedEntryEx<K, V> oldEntry, + GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComparator) throws IgniteCheckedException { + GridCacheVersionConflictContextImpl<K, V> ctx = new GridCacheVersionConflictContextImpl<>(oldEntry, newEntry); + + resolve0(ctx, oldEntry, newEntry, atomicVerComparator); + + return ctx; + } + + /** + * Internal conflict resolution routine. + * + * @param ctx Context. + * @param oldEntry Old entry. + * @param newEntry New entry. + * @param atomicVerComparator Whether to use atomic version comparator. + * @throws IgniteCheckedException If failed. + */ + protected abstract <K, V> void resolve0(GridCacheVersionConflictContextImpl<K, V> ctx, + GridCacheVersionedEntryEx<K, V> oldEntry, GridCacheVersionedEntryEx<K, V> newEntry, + boolean atomicVerComparator) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java new file mode 100644 index 0000000..72c323b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; +import org.jetbrains.annotations.*; + +/** + * Cache version conflict context. + */ +public interface GridCacheVersionConflictContext<K, V> { + /** + * Gets old (existing) cache entry. + * + * @return Old (existing) cache entry. + */ + public GridCacheVersionedEntry<K, V> oldEntry(); + + /** + * Gets new cache entry. + * + * @return New cache entry. + */ + public GridCacheVersionedEntry<K, V> newEntry(); + + /** + * Force cache to ignore new entry and leave old (existing) entry unchanged. + */ + public void useOld(); + + /** + * Force cache to apply new entry overwriting old (existing) entry. + * <p> + * Note that updates from remote data centers always have explicit TTL , while local data center + * updates will only have explicit TTL in case {@link CacheEntry#timeToLive(long)} was called + * before update. In the latter case new entry will pick TTL of the old (existing) entry, even + * if it was set through update from remote data center. it means that depending on concurrent + * update timings new update might pick unexpected TTL. For example, consider that three updates + * of the same key are performed: local update with explicit TTL (1) followed by another local + * update without explicit TTL (2) and one remote update (3). In this case you might expect that + * update (2) will pick TTL set during update (1). However, in case update (3) occurrs between (1) + * and (2) and it overwrites (1) during conflict resolution, then update (2) will pick TTL of + * update (3). To have predictable TTL in such cases you should either always set it explicitly + * through {@code GridCacheEntry.timeToLive(long)} or use {@link #merge(Object, long)}. + */ + public void useNew(); + + /** + * Force cache to use neither old, nor new, but some other value passed as argument. In this case old + * value will be replaced with merge value and update will be considered as local. + * <p> + * Also in case of merge you have to specify new TTL explicitly. For unlimited TTL use {@code 0}. + * + * @param mergeVal Merge value or {@code null} to force remove. + * @param ttl Time to live in milliseconds. + */ + public void merge(@Nullable V mergeVal, long ttl); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContextImpl.java new file mode 100644 index 0000000..228a224 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContextImpl.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +/** + * Conflict context implementation. + */ +public class GridCacheVersionConflictContextImpl<K, V> implements GridCacheVersionConflictContext<K, V> { + /** Old entry. */ + @GridToStringInclude + private final GridCacheVersionedEntry<K, V> oldEntry; + + /** New entry. */ + @GridToStringInclude + private final GridCacheVersionedEntry<K, V> newEntry; + + /** Current state. */ + private State state; + + /** Current merge value. */ + @GridToStringExclude + private V mergeVal; + + /** TTL. */ + private long ttl; + + /** Explicit TTL flag. */ + private boolean explicitTtl; + + /** Manual resolve flag. */ + private boolean manualResolve; + + /** + * Constructor. + * + * @param oldEntry Old entry. + * @param newEntry New entry. + */ + public GridCacheVersionConflictContextImpl(GridCacheVersionedEntry<K, V> oldEntry, + GridCacheVersionedEntry<K, V> newEntry) { + assert oldEntry != null && newEntry != null; + assert oldEntry.ttl() >= 0 && newEntry.ttl() >= 0; + + this.oldEntry = oldEntry; + this.newEntry = newEntry; + + // Set initial state. + useNew(); + } + + /** {@inheritDoc} */ + @Override public GridCacheVersionedEntry<K, V> oldEntry() { + return oldEntry; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersionedEntry<K, V> newEntry() { + return newEntry; + } + + /** {@inheritDoc} */ + @Override public void useOld() { + state = State.USE_OLD; + } + + /** {@inheritDoc} */ + @Override public void useNew() { + state = State.USE_NEW; + + if (!explicitTtl) + ttl = newEntry.ttl(); + } + + /** {@inheritDoc} */ + @Override public void merge(@Nullable V mergeVal, long ttl) { + state = State.MERGE; + + this.mergeVal = mergeVal; + this.ttl = ttl; + + explicitTtl = true; + } + + /** + * @return {@code True} in case old value should be used. + */ + public boolean isUseOld() { + return state == State.USE_OLD; + } + + /** + * @return {@code True} in case new value should be used. + */ + public boolean isUseNew() { + return state == State.USE_NEW; + } + + /** + * @return {@code True} in case merge is to be performed. + */ + public boolean isMerge() { + return state == State.MERGE; + } + + /** + * Set manual resolve class. + */ + public void manualResolve() { + this.manualResolve = true; + } + + /** + * @return Manual resolve flag. + */ + public boolean isManualResolve() { + return manualResolve; + } + + /** + * @return Value to merge (if any). + */ + @Nullable public V mergeValue() { + return mergeVal; + } + + /** + * @return TTL. + */ + public long ttl() { + return ttl; + } + + /** + * @return Expire time. + */ + public long expireTime() { + return explicitTtl ? CU.toExpireTime(ttl) : isUseNew() ? newEntry.expireTime() : + isUseOld() ? oldEntry.expireTime() : 0L; + } + + /** + * @return Explicit TTL flag. + */ + public boolean explicitTtl() { + return explicitTtl; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return state == State.MERGE ? + S.toString(GridCacheVersionConflictContextImpl.class, this, "mergeValue", mergeVal) : + S.toString(GridCacheVersionConflictContextImpl.class, this); + } + + /** + * State. + */ + private enum State { + /** Use old. */ + USE_OLD, + + /** Use new. */ + USE_NEW, + + /** Merge. */ + MERGE + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictResolver.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictResolver.java new file mode 100644 index 0000000..e327fb9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictResolver.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; + +/** + * Default conflict resolver. + */ +public class GridCacheVersionConflictResolver extends GridCacheVersionAbstractConflictResolver { + /** {@inheritDoc} */ + @Override protected <K, V> void resolve0(GridCacheVersionConflictContextImpl<K, V> ctx, + GridCacheVersionedEntryEx<K, V> oldEntry, GridCacheVersionedEntryEx<K, V> newEntry, + boolean atomicVerComparator) throws IgniteCheckedException { + if (newEntry.dataCenterId() != oldEntry.dataCenterId()) + ctx.useNew(); + else { + if (oldEntry.isStartVersion()) + ctx.useNew(); + else { + if (atomicVerComparator) { + // Handle special case when version check using ATOMIC cache comparator is required. + if (GridCacheMapEntry.ATOMIC_VER_COMPARATOR.compare(oldEntry.version(), newEntry.version()) >= 0) + ctx.useOld(); + else + ctx.useNew(); + } + else { + long topVerDiff = newEntry.topologyVersion() - oldEntry.topologyVersion(); + + if (topVerDiff > 0) + ctx.useNew(); + else if (topVerDiff < 0) + ctx.useOld(); + else if (newEntry.order() > oldEntry.order()) + ctx.useNew(); + else + ctx.useOld(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java index b8f7c19..5351966 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionedEntryEx.java @@ -25,7 +25,7 @@ import org.apache.ignite.cache.*; public interface GridCacheVersionedEntryEx<K, V> extends GridCacheVersionedEntry<K, V>, GridCacheVersionable { /** * - * @return + * @return {@code True} if entry is new. */ public boolean isStartVersion(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e8a0c3f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 60df4e4..d1b6ce6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -631,6 +631,11 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme } /** @inheritDoc */ + @Override public GridCacheVersionedEntryEx<K, V> versionedEntry() throws IgniteCheckedException { + return null; + } + + /** @inheritDoc */ @Override public boolean versionedValue(V val, GridCacheVersion curVer, GridCacheVersion newVer) { assert false; return false; }