IGNITE-52 - Remove group lock transactions from public API
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/51eefaf7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/51eefaf7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/51eefaf7 Branch: refs/heads/sprint-1 Commit: 51eefaf79d36fee49dcde2ade6e7b1467f6f2052 Parents: 7f7b562 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Fri Feb 13 21:53:16 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Fri Feb 13 21:53:16 2015 -0800 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteTransactions.java | 79 +- .../apache/ignite/cache/CacheProjection.java | 72 - .../ignite/internal/IgniteTransactionsEx.java | 37 +- .../processors/cache/GridCacheAdapter.java | 14 - .../cache/GridCacheProjectionImpl.java | 12 - .../processors/cache/GridCacheProxyImpl.java | 26 - .../transactions/IgniteTransactionsImpl.java | 154 -- .../dataload/GridDataLoadCacheUpdaters.java | 81 -- .../dataload/IgniteDataLoaderImpl.java | 2 +- .../cache/GridCacheDeploymentSelfTest.java | 40 - .../GridCacheGroupLockAbstractSelfTest.java | 1337 ------------------ ...heGroupLockFailoverOptimisticTxSelfTest.java | 28 - .../GridCacheGroupLockFailoverSelfTest.java | 533 ------- ...CacheGroupLockMultiNodeAbstractSelfTest.java | 28 - .../cache/GridCacheGroupLockPutTask.java | 161 --- .../GridCacheGroupLockColocatedSelfTest.java | 38 - ...acheGroupLockMultiNodeColocatedSelfTest.java | 29 - ...cheGroupLockPartitionedAbstractSelfTest.java | 137 -- ...ockPartitionedMultiNodeAbstractSelfTest.java | 174 --- ...GridCacheGroupLockMultiNodeNearSelfTest.java | 30 - .../near/GridCacheGroupLockNearSelfTest.java | 38 - ...cheGroupLockMultiNodeReplicatedSelfTest.java | 45 - .../GridCacheGroupLockReplicatedSelfTest.java | 39 - .../GridCacheEvictionTouchSelfTest.java | 94 -- .../dataload/GridDataLoaderPerformanceTest.java | 17 +- .../GridDataLoaderProcessorSelfTest.java | 59 +- ...ridCacheAffinityTransactionsOffHeapTest.java | 270 ---- .../cache/GridCacheGroupLockComparisonTest.java | 204 --- .../IgniteCacheGroupLockSelfTestSuite.java | 48 - .../ignite/testsuites/IgniteCacheTestSuite.java | 3 - 30 files changed, 15 insertions(+), 3814 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java index 550501a..5277381 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java @@ -17,6 +17,7 @@ package org.apache.ignite; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.*; import org.apache.ignite.transactions.*; @@ -30,7 +31,7 @@ public interface IgniteTransactions { * * @return New transaction * @throws IllegalStateException If transaction is already started by this thread. - * @throws UnsupportedOperationException If cache is {@link org.apache.ignite.cache.CacheAtomicityMode#ATOMIC}. + * @throws UnsupportedOperationException If cache is {@link CacheAtomicityMode#ATOMIC}. */ public IgniteTx txStart() throws IllegalStateException; @@ -41,7 +42,7 @@ public interface IgniteTransactions { * @param isolation Isolation. * @return New transaction. * @throws IllegalStateException If transaction is already started by this thread. - * @throws UnsupportedOperationException If cache is {@link org.apache.ignite.cache.CacheAtomicityMode#ATOMIC}. + * @throws UnsupportedOperationException If cache is {@link CacheAtomicityMode#ATOMIC}. */ public IgniteTx txStart(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation); @@ -55,84 +56,12 @@ public interface IgniteTransactions { * @param txSize Number of entries participating in transaction (may be approximate). * @return New transaction. * @throws IllegalStateException If transaction is already started by this thread. - * @throws UnsupportedOperationException If cache is {@link org.apache.ignite.cache.CacheAtomicityMode#ATOMIC}. + * @throws UnsupportedOperationException If cache is {@link CacheAtomicityMode#ATOMIC}. */ public IgniteTx txStart(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation, long timeout, int txSize); /** - * Starts {@code affinity-group-locked} transaction based on affinity key. In this mode only affinity key - * is locked and all other entries in transaction are written without locking. However, - * all keys in such transaction must have the same affinity key. Node on which transaction - * is started must be primary for the given affinity key (an exception is thrown otherwise). - * <p> - * Since only affinity key is locked, and no individual keys, it is user's responsibility to make sure - * there are no other concurrent explicit updates directly on individual keys participating in the - * transaction. All updates to the keys involved should always go through {@code affinity-group-locked} - * transaction, otherwise cache may be left in inconsistent state. - * <p> - * If cache sanity check is enabled ({@link IgniteConfiguration#isCacheSanityCheckEnabled()}), - * the following checks are performed: - * <ul> - * <li> - * An exception will be thrown if affinity key differs from one specified on transaction start. - * </li> - * <li> - * An exception is thrown if entry participating in transaction is externally locked at commit. - * </li> - * </ul> - * - * @param affinityKey Affinity key for all entries updated by transaction. This node - * must be primary for this key. - * @param timeout Timeout ({@code 0} for default). - * @param txSize Number of entries participating in transaction (may be approximate), {@code 0} for default. - * @param concurrency Transaction concurrency control. - * @param isolation Cache transaction isolation level. - * @return Started transaction. - * @throws IllegalStateException If transaction is already started by this thread. - * @throws IgniteException If local node is not primary for any of provided keys. - * @throws UnsupportedOperationException If cache is {@link org.apache.ignite.cache.CacheAtomicityMode#ATOMIC}. - */ - public IgniteTx txStartAffinity(String cacheName, Object affinityKey, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteException; - - /** - * Starts {@code partition-group-locked} transaction based on partition ID. In this mode the whole partition - * is locked and all other entries in transaction are written without locking. However, - * all keys in such transaction must belong to the same partition. Node on which transaction - * is started must be primary for the given partition (an exception is thrown otherwise). - * <p> - * Since only partition is locked, and no individual keys, it is user's responsibility to make sure - * there are no other concurrent explicit updates directly on individual keys participating in the - * transaction. All updates to the keys involved should always go through {@code partition-group-locked} - * transaction, otherwise, cache may be left in inconsistent state. - * <p> - * If cache sanity check is enabled ({@link IgniteConfiguration#isCacheSanityCheckEnabled()}), - * the following checks are performed: - * <ul> - * <li> - * An exception will be thrown if key partition differs from one specified on transaction start. - * </li> - * <li> - * An exception is thrown if entry participating in transaction is externally locked at commit. - * </li> - * </ul> - * - * @param partId Partition id for which transaction is started. This node - * must be primary for this partition. - * @param timeout Timeout ({@code 0} for default). - * @param txSize Number of entries participating in transaction (may be approximate), {@code 0} for default. - * @param concurrency Transaction concurrency control. - * @param isolation Cache transaction isolation level. - * @return Started transaction. - * @throws IllegalStateException If transaction is already started by this thread. - * @throws IgniteException If local node is not primary for any of provided keys. - * @throws UnsupportedOperationException If cache is {@link org.apache.ignite.cache.CacheAtomicityMode#ATOMIC}. - */ - public IgniteTx txStartPartition(String cacheName, int partId, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteException; - - /** * Gets transaction started by this thread or {@code null} if this thread does * not have a transaction. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java index 942b23d..9e75c05 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java @@ -1232,78 +1232,6 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { int txSize); /** - * Starts {@code affinity-group-locked} transaction based on affinity key. In this mode only affinity key - * is locked and all other entries in transaction are written without locking. However, - * all keys in such transaction must have the same affinity key. Node on which transaction - * is started must be primary for the given affinity key (an exception is thrown otherwise). - * <p> - * Since only affinity key is locked, and no individual keys, it is user's responsibility to make sure - * there are no other concurrent explicit updates directly on individual keys participating in the - * transaction. All updates to the keys involved should always go through {@code affinity-group-locked} - * transaction, otherwise cache may be left in inconsistent state. - * <p> - * If cache sanity check is enabled ({@link org.apache.ignite.configuration.IgniteConfiguration#isCacheSanityCheckEnabled()}), - * the following checks are performed: - * <ul> - * <li> - * An exception will be thrown if affinity key differs from one specified on transaction start. - * </li> - * <li> - * An exception is thrown if entry participating in transaction is externally locked at commit. - * </li> - * </ul> - * - * @param affinityKey Affinity key for all entries updated by transaction. This node - * must be primary for this key. - * @param timeout Timeout ({@code 0} for default). - * @param txSize Number of entries participating in transaction (may be approximate), {@code 0} for default. - * @param concurrency Transaction concurrency control. - * @param isolation Cache transaction isolation level. - * @return Started transaction. - * @throws IllegalStateException If transaction is already started by this thread. - * @throws IgniteCheckedException If local node is not primary for any of provided keys. - * @throws UnsupportedOperationException If cache is {@link CacheAtomicityMode#ATOMIC}. - */ - public IgniteTx txStartAffinity(Object affinityKey, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException; - - /** - * Starts {@code partition-group-locked} transaction based on partition ID. In this mode the whole partition - * is locked and all other entries in transaction are written without locking. However, - * all keys in such transaction must belong to the same partition. Node on which transaction - * is started must be primary for the given partition (an exception is thrown otherwise). - * <p> - * Since only partition is locked, and no individual keys, it is user's responsibility to make sure - * there are no other concurrent explicit updates directly on individual keys participating in the - * transaction. All updates to the keys involved should always go through {@code partition-group-locked} - * transaction, otherwise, cache may be left in inconsistent state. - * <p> - * If cache sanity check is enabled ({@link org.apache.ignite.configuration.IgniteConfiguration#isCacheSanityCheckEnabled()}), - * the following checks are performed: - * <ul> - * <li> - * An exception will be thrown if key partition differs from one specified on transaction start. - * </li> - * <li> - * An exception is thrown if entry participating in transaction is externally locked at commit. - * </li> - * </ul> - * - * @param partId Partition id for which transaction is started. This node - * must be primary for this partition. - * @param timeout Timeout ({@code 0} for default). - * @param txSize Number of entries participating in transaction (may be approximate), {@code 0} for default. - * @param concurrency Transaction concurrency control. - * @param isolation Cache transaction isolation level. - * @return Started transaction. - * @throws IllegalStateException If transaction is already started by this thread. - * @throws IgniteCheckedException If local node is not primary for any of provided keys. - * @throws UnsupportedOperationException If cache is {@link CacheAtomicityMode#ATOMIC}. - */ - public IgniteTx txStartPartition(int partId, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException; - - /** * Gets transaction started by this thread or {@code null} if this thread does * not have a transaction. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java index 08a9fe0..82852a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.transactions.*; @@ -36,7 +37,7 @@ public interface IgniteTransactionsEx extends IgniteTransactions { * @param txSize Number of entries participating in transaction (may be approximate). * @return New transaction. * @throws IllegalStateException If transaction is already started by this thread. - * @throws UnsupportedOperationException If cache is {@link org.apache.ignite.cache.CacheAtomicityMode#ATOMIC}. + * @throws UnsupportedOperationException If cache is {@link CacheAtomicityMode#ATOMIC}. */ public IgniteTx txStartSystem(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation, long timeout, int txSize); @@ -62,38 +63,4 @@ public interface IgniteTransactionsEx extends IgniteTransactions { * @return New transaction. */ public IgniteInternalTx txStartEx(GridCacheContext ctx, IgniteTxConcurrency concurrency, IgniteTxIsolation isolation); - - /** - * @param ctx Cache context. - * @param partId Partition. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout Timeout. - * @param txSize Number of entries participating in transaction (may be approximate). - * @return New transaction. - * @throws IgniteCheckedException If failed. - */ - public IgniteInternalTx txStartPartitionEx(GridCacheContext ctx, - int partId, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - int txSize) throws IgniteCheckedException; - - /** - * @param ctx Cache context. - * @param affinityKey Affinity key. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout Timeout. - * @param txSize Number of entries participating in transaction (may be approximate). - * @return New transaction. - * @throws IgniteCheckedException If failed. - */ - public IgniteInternalTx txStartAffinity(GridCacheContext ctx, - Object affinityKey, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - int txSize) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 988e4b6..75131a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3785,20 +3785,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public IgniteTx txStartAffinity(Object affinityKey, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException { - return ctx.kernalContext().cache().transactions().txStartAffinity(name(), affinityKey, concurrency, isolation, - timeout, txSize); - } - - /** {@inheritDoc} */ - @Override public IgniteTx txStartPartition(int partId, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException { - return ctx.kernalContext().cache().transactions().txStartPartition(name(), partId, concurrency, isolation, - timeout, txSize); - } - - /** {@inheritDoc} */ @Override public long overflowSize() throws IgniteCheckedException { return ctx.swap().swapSize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index aaaf6f7..57c8186 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -1301,18 +1301,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Override public IgniteTx txStartAffinity(Object affinityKey, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException { - return cache.txStartAffinity(affinityKey, concurrency, isolation, timeout, txSize); - } - - /** {@inheritDoc} */ - @Override public IgniteTx txStartPartition(int partId, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException { - return cache.txStartPartition(partId, concurrency, isolation, timeout, txSize); - } - - /** {@inheritDoc} */ @Override public IgniteTx tx() { return cache.tx(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index d55c975..54bcd0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1194,32 +1194,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Override public IgniteTx txStartAffinity(Object affinityKey, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.txStartAffinity(affinityKey, concurrency, isolation, timeout, txSize); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ - @Override public IgniteTx txStartPartition(int partId, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.txStartPartition(partId, concurrency, isolation, timeout, txSize); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Override public IgniteTx tx() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 7d84194..411976d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -17,18 +17,13 @@ package org.apache.ignite.internal.processors.cache.transactions; -import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.cache.CacheFlag.*; import static org.apache.ignite.transactions.IgniteTxIsolation.*; /** @@ -130,46 +125,6 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { } /** {@inheritDoc} */ - @Override public IgniteInternalTx txStartAffinity(GridCacheContext ctx, - Object affinityKey, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - int txSize) - throws IgniteCheckedException - { - return txStartGroupLock(ctx, - affinityKey, - concurrency, - isolation, - false, - timeout, - txSize, - ctx.system()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalTx txStartPartitionEx(GridCacheContext ctx, - int partId, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - int txSize) - throws IgniteCheckedException - { - Object grpLockKey = ctx.affinity().partitionAffinityKey(partId); - - return txStartGroupLock(ctx, - grpLockKey, - concurrency, - isolation, - true, - timeout, - txSize, - ctx.system()); - } - - /** {@inheritDoc} */ @Override public IgniteTx txStartSystem(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation, long timeout, int txSize) { A.notNull(concurrency, "concurrency"); @@ -228,115 +183,6 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { } /** {@inheritDoc} */ - @Override public IgniteTx txStartAffinity(String cacheName, Object affinityKey, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteException { - GridCacheAdapter<Object, Object> cache = cctx.kernalContext().cache().internalCache(cacheName); - - if (cache == null) - throw new IllegalArgumentException("Failed to find cache with given name (cache is not configured): " + - cacheName); - - try { - return txStartGroupLock(cache.context(), - affinityKey, - concurrency, - isolation, - false, - timeout, - txSize, - false).proxy(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public IgniteTx txStartPartition(String cacheName, int partId, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteException { - GridCacheAdapter<Object, Object> cache = cctx.kernalContext().cache().internalCache(cacheName); - - if (cache == null) - throw new IllegalArgumentException("Failed to find cache with given name (cache is not configured): " + - cacheName); - - Object grpLockKey = cache.context().affinity().partitionAffinityKey(partId); - - try { - return txStartGroupLock(cache.context(), - grpLockKey, - concurrency, - isolation, - true, - timeout, - txSize, - false).proxy(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** - * Internal method to start group-lock transaction. - * - * @param ctx Cache context. - * @param grpLockKey Group lock key. - * @param concurrency Transaction concurrency control. - * @param isolation Transaction isolation level. - * @param partLock {@code True} if this is a partition-lock transaction. In this case {@code grpLockKey} - * should be a unique partition-specific key. - * @param timeout Tx timeout. - * @param txSize Expected transaction size. - * @param sys System flag. - * @return Started transaction. - * @throws IllegalStateException If other transaction was already started. - * @throws IgniteCheckedException In case of error. - */ - @SuppressWarnings("unchecked") - private IgniteInternalTx txStartGroupLock(GridCacheContext ctx, Object grpLockKey, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, boolean partLock, long timeout, int txSize, boolean sys) - throws IllegalStateException, IgniteCheckedException { - IgniteInternalTx tx = cctx.tm().userTx(); - - if (tx != null) - throw new IllegalStateException("Failed to start new transaction " + - "(current thread already has a transaction): " + tx); - - IgniteTxLocalAdapter<K, V> tx0 = cctx.tm().newTx( - false, - false, - sys, - concurrency, - isolation, - timeout, - ctx.hasFlag(INVALIDATE), - !ctx.hasFlag(SKIP_STORE), - txSize, - ctx.txKey(grpLockKey), - partLock - ); - - assert tx0 != null; - - if (ctx.hasFlag(SYNC_COMMIT)) - tx0.syncCommit(true); - - IgniteInternalFuture<?> lockFut = tx0.groupLockAsync(ctx, (Collection)F.asList(grpLockKey)); - - try { - lockFut.get(); - } - catch (IgniteCheckedException e) { - tx0.rollback(); - - throw e; - } - - return tx0; - } - - /** {@inheritDoc} */ @Nullable @Override public IgniteTx tx() { IgniteInternalTx tx = cctx.tm().userTx(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java index 42adf3e..58c1efd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java @@ -44,9 +44,6 @@ public class GridDataLoadCacheUpdaters { /** */ private static final IgniteDataLoader.Updater BATCHED_SORTED = new BatchedSorted(); - /** */ - private static final IgniteDataLoader.Updater GROUP_LOCKED = new GroupLocked(); - /** * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance @@ -81,17 +78,6 @@ public class GridDataLoadCacheUpdaters { } /** - * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and - * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])} in group lock transaction. Requires that there are no - * concurrent updates other than in group lock. - * - * @return Updater with group lock. - */ - public static <K, V> IgniteDataLoader.Updater<K, V> groupLocked() { - return GROUP_LOCKED; - } - - /** * Updates cache. * * @param cache Cache. @@ -217,71 +203,4 @@ public class GridDataLoadCacheUpdaters { updateAll(cache, rmvAll, putAll); } } - - /** - * Cache updater which uses group lock. - */ - private static class GroupLocked<K, V> implements IgniteDataLoader.Updater<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) { - assert cache != null; - assert !F.isEmpty(entries); - - assert cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != ATOMIC; - - Map<Integer, Integer> partsCounts = new HashMap<>(); - - // Group by partition ID. - Map<Integer, Set<K>> rmvPartMap = null; - Map<Integer, Map<K, V>> putPartMap = null; - - Ignite ignite = cache.unwrap(Ignite.class); - - CacheAffinity<K> aff = ignite.<K, V>cache(cache.getName()).affinity(); - - for (Map.Entry<K, V> entry : entries) { - K key = entry.getKey(); - - assert key != null; - - V val = entry.getValue(); - - int part = aff.partition(key); - - Integer cnt = partsCounts.get(part); - - partsCounts.put(part, cnt == null ? 1 : cnt + 1); - - if (val == null) { - if (rmvPartMap == null) - rmvPartMap = new HashMap<>(); - - F.addIfAbsent(rmvPartMap, part, F.<K>newSet()).add(key); - } - else { - if (putPartMap == null) - putPartMap = new HashMap<>(); - - F.addIfAbsent(putPartMap, part, F.<K, V>newMap()).put(key, val); - } - } - - IgniteTransactions txs = ignite.transactions(); - - for (Map.Entry<Integer, Integer> e : partsCounts.entrySet()) { - Integer part = e.getKey(); - int cnt = e.getValue(); - - try (IgniteTx tx = txs.txStartPartition(cache.getName(), part, PESSIMISTIC, REPEATABLE_READ, 0, cnt)) { - updateAll(cache, rmvPartMap == null ? null : rmvPartMap.get(part), - putPartMap == null ? null : putPartMap.get(part)); - - tx.commit(); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index c7002f8..f2dbf78 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -298,7 +298,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay updater = a.atomicityMode() == CacheAtomicityMode.ATOMIC ? GridDataLoadCacheUpdaters.<K, V>batched() : - GridDataLoadCacheUpdaters.<K, V>groupLocked(); + GridDataLoadCacheUpdaters.<K, V>individual(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java index 6d2e5ba..645e5a7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java @@ -412,46 +412,6 @@ public class GridCacheDeploymentSelfTest extends GridCommonAbstractTest { } } - /** @throws Exception If failed. */ - public void _testDeploymentGroupLock() throws Exception { - ClassLoader ldr = getExternalClassLoader(); - - Class<?> keyCls = ldr.loadClass(TEST_KEY); - - try { - depMode = SHARED; - - Ignite g1 = startGrid(1); - startGrid(2); - - Constructor<?> constructor = keyCls.getDeclaredConstructor(String.class); - - Object affKey; - - int i = 0; - - do { - affKey = constructor.newInstance(String.valueOf(i)); - - i++; - } - while (!g1.cluster().mapKeyToNode(null, affKey).id().equals(g1.cluster().localNode().id())); - - IgniteCache<Object, Object> cache = g1.jcache(null); - - try (IgniteTx tx = g1.transactions().txStartAffinity(null, affKey, PESSIMISTIC, REPEATABLE_READ, 0, 1)) { - cache.put(new CacheAffinityKey<>("key1", affKey), "val1"); - - tx.commit(); - } - - assertEquals("val1", cache.get(new CacheAffinityKey<>("key1", affKey))); - } - finally { - stopAllGrids(); - } - } - /** * Looks for next key starting from {@code start} for which primary node is {@code primary} and backup is {@code * backup}. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java deleted file mode 100644 index 890137e..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java +++ /dev/null @@ -1,1337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; -import org.apache.ignite.transactions.*; -import org.jdk8.backport.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Test for group locking. - */ -public abstract class GridCacheGroupLockAbstractSelfTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Event wait timeout. */ - private static final int WAIT_TIMEOUT = 3000; - - /** */ - private TestStore store; - - /** @return Grid count to run in test. */ - protected int gridCount() { - return 1; - } - - /** @return Whether near cache is enabled. */ - protected abstract boolean nearEnabled(); - - /** @return Cache mode for test. */ - protected abstract CacheMode cacheMode(); - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setCacheMode(cacheMode()); - cacheCfg.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY); - cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - - cacheCfg.setCacheStoreFactory(new Factory<CacheStore<? super Object, ? super Object>>() { - @Override public CacheStore<? super Object, ? super Object> create() { - return store; - } - }); - cacheCfg.setReadThrough(true); - cacheCfg.setWriteThrough(true); - cacheCfg.setLoadPreviousValue(true); - - cfg.setCacheConfiguration(cacheCfg); - cfg.setCacheSanityCheckEnabled(sanityCheckEnabled()); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); - - TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); - - commSpi.setSharedMemoryPort(-1); - - cfg.setCommunicationSpi(commSpi); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - store = new TestStore(); - - startGridsMultiThreaded(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - store = null; - - super.afterTestsStopped(); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockPutOneKeyOptimistic() throws Exception { - checkGroupLockPutOneKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockPutOneKeyPessimistic() throws Exception { - checkGroupLockPutOneKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockPutOneKey(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - CacheAffinityKey<String> key1; - CacheAffinityKey<String> key2; - - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - key1 = new CacheAffinityKey<>("key1", affinityKey); - key2 = new CacheAffinityKey<>("key2", affinityKey); - - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.localPeek(key2, CachePeekMode.ONHEAP)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockRemoveOneKeyOptimistic() throws Exception { - checkGroupLockRemoveOneKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockRemoveOneKeyPessimistic() throws Exception { - checkGroupLockRemoveOneKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockRemoveOneKey(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - CacheAffinityKey<String> key2 = new CacheAffinityKey<>("key2", affinityKey); - - IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.localPeek(key2, CachePeekMode.ONHEAP)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - - cache.removeAll(F.asSet(key1, key2)); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertNull("For index: " + i, gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertNull("For index: " + i, gCache.localPeek(key2, CachePeekMode.ONHEAP)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockGetOneKeyOptimistic() throws Exception { - checkGroupLockGetOneKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockGetOneKeyPessimistic() throws Exception { - checkGroupLockGetOneKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockGetOneKey(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - CacheAffinityKey<String> key2 = new CacheAffinityKey<>("key2", affinityKey); - - IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.localPeek(key2, CachePeekMode.ONHEAP)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals("val1", cache.get(key1)); - - assertEquals("val2", cache.get(key2)); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockWithExternalLockOptimistic() throws Exception { - checkGroupLockWithExternalLock(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockWithExternalLockPessimistic() throws Exception { - checkGroupLockWithExternalLock(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockWithExternalLock(final IgniteTxConcurrency concurrency) throws Exception { - assert sanityCheckEnabled(); - - final UUID affinityKey = primaryKeyForCache(grid(0)); - - final CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - - final IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - // Populate cache. - cache.put(key1, "val1"); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - } - - final CountDownLatch unlockLatch = new CountDownLatch(1); - final CountDownLatch lockLatch = new CountDownLatch(1); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @Override public void run() { - try { - Lock lock = cache.lock(key1); - - lock.lock(); - - try { - lockLatch.countDown(); - unlockLatch.await(); - } - finally { - lock.unlock(); - } - } - catch (CacheException e) { - fail(e.getMessage()); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } - }, 1); - - try { - lockLatch.await(); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, - READ_COMMITTED, 0, 1)) { - cache.put(key1, "val01"); - - tx.commit(); - } - - return null; - } - }, IgniteTxHeuristicException.class, null); - } - finally { - unlockLatch.countDown(); - - fut.get(); - } - } - - /** - * @throws Exception If failed. - */ - public void testSanityCheckDisabledOptimistic() throws Exception { - checkSanityCheckDisabled(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testSanityCheckDisabledPessimistic() throws Exception { - checkSanityCheckDisabled(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkSanityCheckDisabled(final IgniteTxConcurrency concurrency) throws Exception { - assert !sanityCheckEnabled(); - - IgniteEx grid = grid(0); - - final UUID affinityKey = primaryKeyForCache(grid); - - final CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - - final IgniteCache<CacheAffinityKey<String>, String> cache = grid.jcache(null); - - // Populate cache. - cache.put(key1, "val1"); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - } - - Lock lock = cache.lock(key1); - - lock.lock(); - - try { - try (IgniteTx tx = grid.transactions().txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 1)) { - cache.put(key1, "val01"); - - tx.commit(); - } - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val01", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - } - } - finally { - lock.unlock(); - } - } - - /** - * @throws Exception If failed. - */ - public void testGroupPartitionLockOptimistic() throws Exception { - checkGroupPartitionLock(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupPartitionLockPessimistic() throws Exception { - checkGroupPartitionLock(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupPartitionLock(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - IgniteCache<UUID, String> cache = grid(0).jcache(null); - - UUID key1; - UUID key2; - - CacheAffinity<UUID> affinity = grid(0).affinity(null); - - try (IgniteTx tx = grid(0).transactions().txStartPartition(null, affinity.partition(affinityKey), - concurrency, READ_COMMITTED, 0, 2)) { - // Note that events are not generated for internal keys. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, - unlocks.affectedKeys().size()); - - GridCacheAdapter<Object, Object> cacheAdapter = ((IgniteKernal)grid(0)).internalCache(); - - GridCacheAffinityManager<Object, Object> affMgr = cacheAdapter.context().affinity(); - - GridPartitionLockKey partAffKey = affMgr.partitionAffinityKey(affinity.partition(affinityKey)); - - if (concurrency == PESSIMISTIC) - assertTrue(cacheAdapter.entryEx(partAffKey).lockedByThread()); - - do { - key1 = UUID.randomUUID(); - } - while (affinity.partition(key1) != affinity.partition(affinityKey)); - - do { - key2 = UUID.randomUUID(); - } - while (affinity.partition(key2) != affinity.partition(affinityKey)); - - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.localPeek(key2, CachePeekMode.ONHEAP)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGetPutOptimisticReadCommitted() throws Exception { - checkGetPut(OPTIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutOptimisticRepeatableRead() throws Exception { - checkGetPut(OPTIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutPessimisticReadCommitted() throws Exception { - checkGetPut(PESSIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutPessimisticRepeatableRead() throws Exception { - checkGetPut(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutEmptyCachePessimisticReadCommitted() throws Exception { - checkGetPutEmptyCache(PESSIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutEmptyCachePessimisticRepeatableRead() throws Exception { - checkGetPutEmptyCache(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutEmptyCacheOptimisticReadCommitted() throws Exception { - checkGetPutEmptyCache(OPTIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutEmptyCacheOptimisticRepeatableRead() throws Exception { - checkGetPutEmptyCache(OPTIMISTIC, REPEATABLE_READ); - } - - /** - * @param concurrency Transaction concurrency mode. - * @param isolation Transaction isolation mode. - * @throws Exception If failed. - */ - private void checkGetPut(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - CacheAffinityKey<String> key2 = new CacheAffinityKey<>("key2", affinityKey); - - IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.localPeek(key2, CachePeekMode.ONHEAP)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, isolation, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals("val1", cache.get(key1)); - - assertEquals("val2", cache.get(key2)); - - cache.put(key1, "val01"); - - cache.put(key2, "val02"); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - } - - /** - * @param concurrency Transaction concurrency mode. - * @param isolation Transaction isolation mode. - * @throws Exception If failed. - */ - private void checkGetPutEmptyCache(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - CacheAffinityKey<String> key2 = new CacheAffinityKey<>("key2", affinityKey); - - IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, isolation, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals(null, cache.get(key1)); - - assertEquals(null, cache.get(key2)); - - cache.put(key1, "val01"); - - cache.put(key2, "val02"); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val01", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val02", gCache.localPeek(key2, CachePeekMode.ONHEAP)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGetRemoveOptimisticReadCommitted() throws Exception { - checkGetRemove(OPTIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetRemoveOptimisticRepeatableRead() throws Exception { - checkGetRemove(OPTIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGetRemovePessimisticReadCommitted() throws Exception { - checkGetRemove(PESSIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetRemovePessimisticRepeatableRead() throws Exception { - checkGetRemove(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @param concurrency Transaction concurrency mode. - * @param isolation Transaction isolation mode. - * @throws Exception If failed. - */ - private void checkGetRemove(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - CacheAffinityKey<String> key2 = new CacheAffinityKey<>("key2", affinityKey); - - IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.localPeek(key2, CachePeekMode.ONHEAP)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, isolation, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals("val1", cache.get(key1)); - - assertEquals("val2", cache.get(key2)); - - cache.remove(key1); - - cache.remove(key2); - - tx.commit(); - } - - for (int i = 0; i < gridCount(); i++) { - IgniteCache<Object, Object> cacheI = jcache(i); - - assertNull("For cache [i=" + i + ", val=" + cacheI.localPeek(key1, CachePeekMode.ONHEAP) + ']', cacheI.localPeek(key1, CachePeekMode.ONHEAP)); - assertNull("For cache [i=" + i + ", val=" + cacheI.localPeek(key2, CachePeekMode.ONHEAP) + ']', cacheI.localPeek(key2, CachePeekMode.ONHEAP)); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - } - - /** - * @throws Exception If failed. - */ - public void testGetAfterPutOptimistic() throws Exception { - checkGetAfterPut(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGetAfterPut() throws Exception { - checkGetAfterPut(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGetAfterPut(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - CacheAffinityKey<String> key2 = new CacheAffinityKey<>("key2", affinityKey); - - IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.localPeek(key2, CachePeekMode.ONHEAP)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = grid(0).transactions() - .txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals("val1", cache.get(key1)); - - assertEquals("val2", cache.get(key2)); - - cache.put(key1, "val01"); - - cache.put(key2, "val02"); - - assertEquals("val01", cache.get(key1)); - - assertEquals("val02", cache.get(key2)); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - - assertEquals("val01", cache.get(key1)); - - assertEquals("val02", cache.get(key2)); - } - - /** - * @throws Exception If failed. - */ - public void testGetRepeatableReadOptimistic() throws Exception { - checkGetRepeatableRead(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGetRepeatableReadPessimistic() throws Exception { - checkGetRepeatableRead(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGetRepeatableRead(IgniteTxConcurrency concurrency) throws Exception { - UUID key = primaryKeyForCache(grid(0)); - - jcache(0).put(key, "val"); - - try (IgniteTx ignored = ignite(0).transactions().txStartPartition(null, ignite(0).affinity(null).partition(key), - concurrency, REPEATABLE_READ, 0, 1)) { - assertEquals("val", jcache(0).get(key)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockPutWrongKeyOptimistic() throws Exception { - checkGroupLockPutWrongKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockPutWrongKeyPessimistic() throws Exception { - checkGroupLockPutWrongKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockPutWrongKey(IgniteTxConcurrency concurrency) throws Exception { - UUID affinityKey = primaryKeyForCache(grid(0)); - - final IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - try (IgniteTx ignored = grid(0).transactions() - .txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 1)) { - // Key with affinity key different from enlisted on tx start should raise exception. - cache.put(new CacheAffinityKey<>("key1", UUID.randomUUID()), "val1"); - - fail("Exception should be thrown"); - } - catch (CacheException ignored) { - // Expected exception. - } - - assertNull(grid(0).transactions().tx()); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockRemoveWrongKeyOptimistic() throws Exception { - checkGroupLockRemoveWrongKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockRemoveWrongKeyPessimistic() throws Exception { - checkGroupLockRemoveWrongKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockRemoveWrongKey(IgniteTxConcurrency concurrency) throws Exception { - UUID affinityKey = primaryKeyForCache(grid(0)); - - final IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - final CacheAffinityKey<String> key = new CacheAffinityKey<>("key1", UUID.randomUUID()); - - cache.put(key, "val"); - - try (IgniteTx ignored = grid(0).transactions() - .txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 1)) { - // Key with affinity key different from enlisted on tx start should raise exception. - cache.remove(key); - - fail("Exception should be thrown."); - } - catch (CacheException ignored) { - // Expected exception. - } - - assertNull(grid(0).transactions().tx()); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockReadAffinityKeyPessimitsticRepeatableRead() throws Exception { - checkGroupLockReadAffinityKey(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockReadAffinityKeyPessimitsticReadCommitted() throws Exception { - checkGroupLockReadAffinityKey(PESSIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockReadAffinityKeyOptimisticRepeatableRead() throws Exception { - checkGroupLockReadAffinityKey(OPTIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockReadAffinityKeyOptimisticReadCommitted() throws Exception { - checkGroupLockReadAffinityKey(OPTIMISTIC, READ_COMMITTED); - } - - /** - * @param concurrency Concurrency. - * @param isolation Isolation. - * @throws Exception If failed. - */ - private void checkGroupLockReadAffinityKey(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) - throws Exception { - UUID affinityKey = primaryKeyForCache(grid(0)); - - final IgniteCache<Object, String> cache = grid(0).jcache(null); - - final CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - final CacheAffinityKey<String> key2 = new CacheAffinityKey<>("key2", affinityKey); - - cache.put(affinityKey, "0"); - cache.put(key1, "0"); - cache.put(key2, "0"); - - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, isolation, 0, 3)) { - assertEquals("0", cache.get(affinityKey)); - assertEquals("0", cache.get(key1)); - assertEquals("0", cache.get(key2)); - - cache.put(affinityKey, "1"); - cache.put(key1, "1"); - cache.put(key2, "1"); - - assertEquals("1", cache.get(affinityKey)); - assertEquals("1", cache.get(key1)); - assertEquals("1", cache.get(key2)); - - tx.commit(); - } - - assertEquals("1", cache.get(affinityKey)); - assertEquals("1", cache.get(key1)); - assertEquals("1", cache.get(key2)); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockWriteThroughBatchUpdateOptimistic() throws Exception { - checkGroupLockWriteThrough(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockWriteThroughBatchUpdatePessimistic() throws Exception { - checkGroupLockWriteThrough(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockWriteThrough(IgniteTxConcurrency concurrency) throws Exception { - UUID affinityKey = primaryKeyForCache(grid(0)); - - IgniteCache<CacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - CacheAffinityKey<String> key1 = new CacheAffinityKey<>("key1", affinityKey); - CacheAffinityKey<String> key2 = new CacheAffinityKey<>("key2", affinityKey); - CacheAffinityKey<String> key3 = new CacheAffinityKey<>("key3", affinityKey); - CacheAffinityKey<String> key4 = new CacheAffinityKey<>("key4", affinityKey); - - Map<CacheAffinityKey<String>, String> putMap = F.asMap( - key1, "val1", - key2, "val2", - key3, "val3", - key4, "val4"); - - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 4)) { - cache.put(key1, "val1"); - cache.put(key2, "val2"); - cache.put(key3, "val3"); - cache.put(key4, "val4"); - - tx.commit(); - } - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - IgniteCache<Object, Object> gCache = g.jcache(null); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.localPeek(key1, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.localPeek(key2, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key3)) - assertEquals("For index: " + i, "val3", gCache.localPeek(key3, CachePeekMode.ONHEAP)); - - if (g.affinity(null).isPrimaryOrBackup(g.cluster().localNode(), key4)) - assertEquals("For index: " + i, "val4", gCache.localPeek(key4, CachePeekMode.ONHEAP)); - } - - // Check the store. - assertTrue(store.storeMap().equals(putMap)); - assertEquals(1, store.putCount()); - } - - /** @return {@code True} if sanity check should be enabled. */ - private boolean sanityCheckEnabled() { - return !getName().contains("SanityCheckDisabled"); - } - - /** - * @param primary Primary node for which key should be calculated. - * @return Key for which given node is primary. - * @throws IgniteCheckedException If affinity can not be calculated. - */ - protected UUID primaryKeyForCache(Ignite primary) throws IgniteCheckedException { - UUID res; - - int cnt = 0; - - UUID primaryId = primary.cluster().localNode().id(); - - do { - res = UUID.randomUUID(); - - cnt++; - - if (cnt > 10000) - throw new IllegalStateException("Cannot find key for primary node: " + primaryId); - } - while (!primary.cluster().mapKeyToNode(null, res).id().equals(primaryId)); - - return res; - } - - /** - * @param primary Primary node for which keys should be calculated. - * @param cnt Key count. - * @return Collection of generated keys. - * @throws IgniteCheckedException If affinity can not be calculated. - */ - protected UUID[] primaryKeysForCache(Ignite primary, int cnt) throws IgniteCheckedException { - Collection<UUID> keys = new LinkedHashSet<>(); - - int iters = 0; - - do { - keys.add(primaryKeyForCache(primary)); - - iters++; - - if (iters > 10000) - throw new IllegalStateException("Cannot find keys for primary node [nodeId=" + - primary.cluster().localNode().id() + ", cnt=" + cnt + ']'); - } - while (keys.size() < cnt); - - UUID[] res = new UUID[keys.size()]; - - return keys.toArray(res); - } - - /** Event listener that collects all incoming events. */ - protected static class CollectingEventListener implements IgnitePredicate<Event> { - /** Collected events. */ - private final Collection<Object> affectedKeys = new GridConcurrentLinkedHashSet<>(); - - /** {@inheritDoc} */ - @Override public boolean apply(Event evt) { - assert evt.type() == EVT_CACHE_OBJECT_LOCKED || evt.type() == EVT_CACHE_OBJECT_UNLOCKED; - - CacheEvent cacheEvt = (CacheEvent)evt; - - synchronized (this) { - affectedKeys.add(cacheEvt.key()); - - notifyAll(); - } - - return true; - } - - /** @return Collection of affected keys. */ - public Collection<Object> affectedKeys() { - return affectedKeys; - } - - /** - * Waits until events received for all supplied keys. - * - * @param timeout Timeout to wait. - * @param keys Keys to wait for. - * @return {@code True} if wait was successful, {@code false} if wait timed out. - * @throws InterruptedException If thread was interrupted. - */ - public boolean awaitKeys(long timeout, Object... keys) throws InterruptedException { - long start = System.currentTimeMillis(); - - Collection<Object> keysCol = Arrays.asList(keys); - - synchronized (this) { - while (true) { - long now = System.currentTimeMillis(); - - if (affectedKeys.containsAll(keysCol)) - return true; - else if (start + timeout > now) - wait(start + timeout - now); - else - return false; - } - } - } - } - - /** Test store that accumulates values into map. */ - private static class TestStore extends CacheStoreAdapter<Object, Object> { - /** */ - private ConcurrentMap<Object, Object> storeMap = new ConcurrentHashMap8<>(); - - /** */ - private AtomicInteger putCnt = new AtomicInteger(); - - /** {@inheritDoc} */ - @Override public Object load(Object key) { - return null; - } - - /** {@inheritDoc} */ - @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) { - for (Cache.Entry<?, ?> e : entries) - storeMap.put(e.getKey(), e.getValue()); - - putCnt.incrementAndGet(); - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<?, ?> e) { - storeMap.put(e.getKey(), e.getValue()); - - putCnt.incrementAndGet(); - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) { - storeMap.remove(key); - } - - /** @return Stored values map. */ - public ConcurrentMap<Object, Object> storeMap() { - return storeMap; - } - - /** @return Number of calls to put(). */ - public int putCount() { - return putCnt.get(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/51eefaf7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java deleted file mode 100644 index 16d58c0..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -/** - * Tests optimistic group lock transactions. - */ -public class GridCacheGroupLockFailoverOptimisticTxSelfTest extends GridCacheGroupLockFailoverSelfTest { - /** {@inheritDoc} */ - @Override protected boolean optimisticTx() { - return true; - } -}