# ignite-26
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3d6afdaa Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3d6afdaa Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3d6afdaa Branch: refs/heads/ignite-26 Commit: 3d6afdaa804f64ba5ed37d9607bf65646a3bd15e Parents: 416491a Author: sboikov <sboi...@gridgain.com> Authored: Sat Jan 31 00:03:24 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Sat Jan 31 09:11:20 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteTransactions.java | 8 +- .../apache/ignite/cache/CacheProjection.java | 8 + .../ignite/internal/ClusterGroupAdapter.java | 2 +- .../internal/ComputeTaskInternalFuture.java | 2 + .../ignite/internal/IgniteTransactionsEx.java | 58 +++ .../internal/executor/GridExecutorService.java | 105 ++-- .../processors/cache/GridCacheAdapter.java | 53 +- .../processors/cache/GridCacheEventManager.java | 8 +- .../cache/GridCacheMultiTxFuture.java | 4 +- .../cache/GridCacheProjectionImpl.java | 6 + .../processors/cache/GridCacheProxyImpl.java | 13 + .../cache/GridCacheSharedContext.java | 2 +- .../processors/cache/GridCacheStoreManager.java | 31 +- .../processors/cache/GridCacheUtils.java | 26 +- .../GridAtomicCacheQueueImpl.java | 26 +- .../datastructures/GridCacheAtomicLongImpl.java | 17 +- .../GridCacheAtomicReferenceImpl.java | 6 +- .../GridCacheAtomicSequenceImpl.java | 3 +- .../GridCacheAtomicStampedImpl.java | 6 +- .../GridCacheCountDownLatchImpl.java | 6 +- .../GridCacheDataStructuresManager.java | 19 +- .../datastructures/GridCacheQueueAdapter.java | 55 ++- .../GridTransactionalCacheQueueImpl.java | 107 ++-- .../GridDistributedTxRemoteAdapter.java | 8 +- .../dht/GridDhtTransactionalCacheAdapter.java | 4 +- .../distributed/dht/GridDhtTxFinishFuture.java | 12 +- .../cache/distributed/dht/GridDhtTxLocal.java | 4 +- .../near/GridNearTxFinishFuture.java | 16 +- .../cache/distributed/near/GridNearTxLocal.java | 12 +- .../processors/cache/local/GridLocalTx.java | 6 +- .../transactions/IgniteTransactionsImpl.java | 137 +++++- .../cache/transactions/IgniteTxAdapter.java | 482 +++++++++++++++++-- .../cache/transactions/IgniteTxEx.java | 172 ++++++- .../cache/transactions/IgniteTxHandler.java | 26 +- .../transactions/IgniteTxLocalAdapter.java | 16 +- .../cache/transactions/IgniteTxManager.java | 20 +- .../cache/transactions/IgniteTxProxyImpl.java | 17 +- .../processors/fs/GridGgfsDataManager.java | 5 +- .../processors/fs/GridGgfsMetaManager.java | 27 +- .../handlers/cache/GridCacheCommandHandler.java | 3 +- .../service/GridServiceProcessor.java | 3 +- .../apache/ignite/transactions/IgniteTx.java | 12 +- .../internal/IgniteExecutorServiceTest.java | 2 +- .../cache/GridCacheNestedTxAbstractTest.java | 4 +- .../processors/cache/IgniteTxAbstractTest.java | 2 +- .../near/GridCacheNearMultiGetSelfTest.java | 2 +- .../cache/GridAbstractCacheStoreSelfTest.java | 6 +- .../GridHibernateReadWriteAccessStrategy.java | 2 +- .../processors/cache/jta/CacheJtaManager.java | 2 +- .../cache/websession/GridWebSessionFilter.java | 6 +- 50 files changed, 1143 insertions(+), 436 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java index b769362..08799aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java @@ -92,11 +92,11 @@ public interface IgniteTransactions { * @param isolation Cache transaction isolation level. * @return Started transaction. * @throws IllegalStateException If transaction is already started by this thread. - * @throws IgniteCheckedException If local node is not primary for any of provided keys. + * @throws IgniteException If local node is not primary for any of provided keys. * @throws UnsupportedOperationException If cache is {@link org.apache.ignite.cache.CacheAtomicityMode#ATOMIC}. */ public IgniteTx txStartAffinity(String cacheName, Object affinityKey, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException; + IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteException; /** * Starts {@code partition-group-locked} transaction based on partition ID. In this mode the whole partition @@ -128,11 +128,11 @@ public interface IgniteTransactions { * @param isolation Cache transaction isolation level. * @return Started transaction. * @throws IllegalStateException If transaction is already started by this thread. - * @throws IgniteCheckedException If local node is not primary for any of provided keys. + * @throws IgniteException If local node is not primary for any of provided keys. * @throws UnsupportedOperationException If cache is {@link org.apache.ignite.cache.CacheAtomicityMode#ATOMIC}. */ public IgniteTx txStartPartition(String cacheName, int partId, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteCheckedException; + IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException, IgniteException; /** * Gets transaction started by this thread or {@code null} if this thread does http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java index 2457fcd..33c6d1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java @@ -23,6 +23,7 @@ import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; @@ -1166,6 +1167,13 @@ public interface CacheProjection<K, V> extends Iterable<CacheEntry<K, V>> { public IgniteTx txStart(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation); /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @return New transaction. + */ + public IgniteTxEx txStartEx(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation); + + /** * Starts transaction with specified isolation, concurrency, timeout, invalidation flag, * and number of participating entries. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java index a05dc89..528e9a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterGroupAdapter.java @@ -260,7 +260,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { public ExecutorService executorService() { assert ctx != null; - return new GridExecutorService(this, ctx.log()); + return new GridExecutorService(this, ctx); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java index bdccb75..a6a6004 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -42,6 +43,7 @@ public class ComputeTaskInternalFuture<R> extends GridFutureAdapter<R> { private GridKernalContext ctx; /** */ + @GridToStringExclude private ComputeFuture<R> userFut; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java index c6223c2..6efe99f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.transactions.*; /** @@ -38,4 +40,60 @@ public interface IgniteTransactionsEx extends IgniteTransactions { */ public IgniteTx txStartSystem(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation, long timeout, int txSize); + + /** + * @param ctx Cache context. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param timeout Timeout. + * @param txSize Number of entries participating in transaction (may be approximate). + * @return New transaction. + */ + public IgniteTxEx txStartEx(GridCacheContext ctx, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + long timeout, + int txSize); + + /** + * @param ctx Cache context. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @return New transaction. + */ + public IgniteTxEx txStartEx(GridCacheContext ctx, IgniteTxConcurrency concurrency, IgniteTxIsolation isolation); + + /** + * @param ctx Cache context. + * @param partId Partition. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param timeout Timeout. + * @param txSize Number of entries participating in transaction (may be approximate). + * @return New transaction. + * @throws IgniteCheckedException If failed. + */ + public IgniteTxEx txStartPartitionEx(GridCacheContext ctx, + int partId, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + long timeout, + int txSize) throws IgniteCheckedException; + + /** + * @param ctx Cache context. + * @param affinityKey Affinity key. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param timeout Timeout. + * @param txSize Number of entries participating in transaction (may be approximate). + * @return New transaction. + * @throws IgniteCheckedException If failed. + */ + public IgniteTxEx txStartAffinity(GridCacheContext ctx, + Object affinityKey, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + long timeout, + int txSize) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java index 424be58..6bb4b18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.compute.*; import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -29,6 +28,8 @@ import java.io.*; import java.util.*; import java.util.concurrent.*; +import static org.apache.ignite.internal.GridClosureCallMode.*; + /** * An {@link ExecutorService} that executes each submitted task in grid * through {@link org.apache.ignite.Ignite} instance, normally configured using @@ -77,8 +78,8 @@ public class GridExecutorService implements ExecutorService, Externalizable { /** Projection. */ private ClusterGroupAdapter prj; - /** Compute. */ - private IgniteCompute comp; + /** */ + private GridKernalContext ctx; /** Logger. */ private IgniteLogger log; @@ -106,16 +107,15 @@ public class GridExecutorService implements ExecutorService, Externalizable { * Creates executor service. * * @param prj Projection. - * @param log Grid logger. + * @param ctx Grid logger. */ - public GridExecutorService(ClusterGroupAdapter prj, IgniteLogger log) { + public GridExecutorService(ClusterGroupAdapter prj, GridKernalContext ctx) { assert prj != null; - assert log != null; + assert ctx != null; this.prj = prj; - this.log = log.getLogger(GridExecutorService.class); - - comp = prj.compute().withAsync(); + this.ctx = ctx; + this.log = ctx.log().getLogger(GridExecutorService.class); } /** {@inheritDoc} */ @@ -237,18 +237,13 @@ public class GridExecutorService implements ExecutorService, Externalizable { checkShutdown(); - assert comp.isAsync(); + ctx.gateway().readLock(); try { - comp.call(task); - - IgniteFutureImpl<T> fut = (IgniteFutureImpl<T>)comp.future(); - - return addFuture(fut.internalFuture()); + return addFuture(ctx.closure().callAsync(BALANCE, task, prj.nodes())); } - catch (IgniteException e) { - // Should not be thrown since uses asynchronous execution. - return addFuture(new GridFinishedFutureEx<T>(e)); + finally { + ctx.gateway().readUnlock(); } } @@ -258,16 +253,12 @@ public class GridExecutorService implements ExecutorService, Externalizable { checkShutdown(); - assert comp.isAsync(); + ctx.gateway().readLock(); try { - comp.run(task); - - IgniteInternalFuture<T> fut0 = ((IgniteFutureImpl<T>)comp.future()).internalFuture(); - - IgniteInternalFuture<T> fut = fut0.chain(new CX1<IgniteInternalFuture<?>, T>() { - @Override - public T applyx(IgniteInternalFuture<?> fut) throws IgniteCheckedException { + IgniteInternalFuture<T> fut = ctx.closure().runAsync(BALANCE, task, prj.nodes()).chain( + new CX1<IgniteInternalFuture<?>, T>() { + @Override public T applyx(IgniteInternalFuture<?> fut) throws IgniteCheckedException { fut.get(); return res; @@ -276,9 +267,8 @@ public class GridExecutorService implements ExecutorService, Externalizable { return addFuture(fut); } - catch (IgniteException e) { - // Should not be thrown since uses asynchronous execution. - return addFuture(new GridFinishedFutureEx<T>(e)); + finally { + ctx.gateway().readUnlock(); } } @@ -288,18 +278,13 @@ public class GridExecutorService implements ExecutorService, Externalizable { checkShutdown(); - assert comp.isAsync(); + ctx.gateway().readLock(); try { - comp.run(task); - - IgniteFutureImpl<?> fut = (IgniteFutureImpl<?>)comp.future(); - - return addFuture(fut.internalFuture()); + return addFuture(ctx.closure().runAsync(BALANCE, task, prj.nodes())); } - catch (IgniteException e) { - // Should not be thrown since uses asynchronous execution. - return addFuture(new GridFinishedFutureEx<>(e)); + finally { + ctx.gateway().readUnlock(); } } @@ -353,23 +338,18 @@ public class GridExecutorService implements ExecutorService, Externalizable { Collection<IgniteInternalFuture<T>> taskFuts = new ArrayList<>(); - assert comp.isAsync(); - for (Callable<T> task : tasks) { // Execute task without predefined timeout. // GridFuture.cancel() will be called if timeout elapsed. IgniteInternalFuture<T> fut; - try { - comp.call(task); - - IgniteFutureImpl<T> fut0 = (IgniteFutureImpl<T>)comp.future(); + ctx.gateway().readLock(); - fut = fut0.internalFuture(); + try { + fut = ctx.closure().callAsync(BALANCE, task, prj.nodes()); } - catch (IgniteException e) { - // Should not be thrown since uses asynchronous execution. - fut = new GridFinishedFutureEx<>(e); + finally { + ctx.gateway().readUnlock(); } taskFuts.add(fut); @@ -495,23 +475,17 @@ public class GridExecutorService implements ExecutorService, Externalizable { Collection<IgniteInternalFuture<T>> taskFuts = new ArrayList<>(); - assert comp.isAsync(); - for (Callable<T> cmd : tasks) { // Execute task with predefined timeout. IgniteInternalFuture<T> fut; - try - { - comp.call(cmd); + ctx.gateway().readLock(); - IgniteFutureImpl<T> fut0 = (IgniteFutureImpl<T>)comp.future(); - - fut = fut0.internalFuture(); + try { + fut = ctx.closure().callAsync(BALANCE, cmd, prj.nodes()); } - catch (IgniteException e) { - // Should not be thrown since uses asynchronous execution. - fut = new GridFinishedFutureEx<>(e); + finally { + ctx.gateway().readUnlock(); } taskFuts.add(fut); @@ -582,18 +556,13 @@ public class GridExecutorService implements ExecutorService, Externalizable { checkShutdown(); - assert comp.isAsync(); + ctx.gateway().readLock(); try { - comp.run(cmd); - - IgniteFutureImpl<?> fut0 = (IgniteFutureImpl<?>)comp.future(); - - addFuture(fut0.internalFuture()); + addFuture(ctx.closure().runAsync(BALANCE, cmd, prj.nodes())); } - catch (IgniteException e) { - // Should not be thrown since uses asynchronous execution. - addFuture(new GridFinishedFutureEx(e)); + finally { + ctx.gateway().readUnlock(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 557de12..1cabc20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3558,6 +3558,13 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ + @Override public IgniteTxEx txStartEx(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) { + IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions(); + + return txs.txStartEx(ctx, concurrency, isolation); + } + + /** {@inheritDoc} */ @Override public IgniteTx txStart(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation, long timeout, int txSize) throws IllegalStateException { IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions(); @@ -4034,7 +4041,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @return Transaction commit future. */ @SuppressWarnings("unchecked") - public IgniteInternalFuture<IgniteTx> commitTxAsync(final IgniteTxEx tx) { + public IgniteInternalFuture<IgniteTxEx> commitTxAsync(final IgniteTxEx tx) { FutureHolder holder = lastFut.get(); holder.lock(); @@ -4043,9 +4050,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, IgniteInternalFuture fut = holder.future(); if (fut != null && !fut.isDone()) { - IgniteInternalFuture<IgniteTx> f = new GridEmbeddedFuture<>(fut, - new C2<Object, Exception, IgniteInternalFuture<IgniteTx>>() { - @Override public IgniteInternalFuture<IgniteTx> apply(Object o, Exception e) { + IgniteInternalFuture<IgniteTxEx> f = new GridEmbeddedFuture<>(fut, + new C2<Object, Exception, IgniteInternalFuture<IgniteTxEx>>() { + @Override public IgniteInternalFuture<IgniteTxEx> apply(Object o, Exception e) { return tx.commitAsync(); } }, ctx.kernalContext()); @@ -4055,7 +4062,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return f; } - IgniteInternalFuture<IgniteTx> f = tx.commitAsync(); + IgniteInternalFuture<IgniteTxEx> f = tx.commitAsync(); saveFuture(holder, f); @@ -4069,42 +4076,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** - * Synchronously commits transaction after all previous asynchronous operations are completed. - * - * @param tx Transaction to commit. - * @throws IgniteCheckedException If commit failed. - */ - void commitTx(IgniteTx tx) throws IgniteCheckedException { - awaitLastFut(); - - tx.commit(); - } - - /** - * Synchronously rolls back transaction after all previous asynchronous operations are completed. - * - * @param tx Transaction to commit. - * @throws IgniteCheckedException If commit failed. - */ - void rollbackTx(IgniteTx tx) throws IgniteCheckedException { - awaitLastFut(); - - tx.rollback(); - } - - /** - * Synchronously ends transaction after all previous asynchronous operations are completed. - * - * @param tx Transaction to commit. - * @throws IgniteCheckedException If commit failed. - */ - void endTx(IgniteTx tx) throws IgniteCheckedException { - awaitLastFut(); - - tx.close(); - } - - /** * Awaits for previous async operation to be completed. */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index 3f1f8ba..25c07f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -75,7 +75,7 @@ public class GridCacheEventManager<K, V> extends GridCacheManagerAdapter<K, V> { * @param cloClsName Closure class name. * @param taskName Task name. */ - public void addEvent(int part, K key, IgniteTx tx, @Nullable GridCacheMvccCandidate<K> owner, + public void addEvent(int part, K key, IgniteTxEx tx, @Nullable GridCacheMvccCandidate<K> owner, int type, @Nullable V newVal, boolean hasNewVal, @Nullable V oldVal, boolean hasOldVal, UUID subjId, String cloClsName, String taskName) { addEvent(part, key, locNodeId, tx, owner, type, newVal, hasNewVal, oldVal, hasOldVal, subjId, cloClsName, @@ -97,7 +97,7 @@ public class GridCacheEventManager<K, V> extends GridCacheManagerAdapter<K, V> { * @param cloClsName Closure class name. * @param taskName Task name. */ - public void addEvent(int part, K key, UUID nodeId, IgniteTx tx, GridCacheMvccCandidate<K> owner, + public void addEvent(int part, K key, UUID nodeId, IgniteTxEx tx, GridCacheMvccCandidate<K> owner, int type, V newVal, boolean hasNewVal, V oldVal, boolean hasOldVal, UUID subjId, String cloClsName, String taskName) { addEvent(part, key, nodeId, tx == null ? null : tx.xid(), owner == null ? null : owner.version(), type, @@ -121,7 +121,7 @@ public class GridCacheEventManager<K, V> extends GridCacheManagerAdapter<K, V> { public void addEvent(int part, K key, UUID evtNodeId, @Nullable GridCacheMvccCandidate<K> owner, int type, @Nullable V newVal, boolean hasNewVal, V oldVal, boolean hasOldVal, UUID subjId, String cloClsName, String taskName) { - IgniteTx tx = owner == null ? null : cctx.tm().tx(owner.version()); + IgniteTxEx tx = owner == null ? null : cctx.tm().tx(owner.version()); addEvent(part, key, evtNodeId, tx == null ? null : tx.xid(), owner == null ? null : owner.version(), type, newVal, hasNewVal, oldVal, hasOldVal, subjId, cloClsName, taskName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java index aa593f5..af5cc87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java @@ -103,8 +103,8 @@ public final class GridCacheMultiTxFuture<K, V> extends GridFutureAdapter<Boolea for (final IgniteTxEx<K, V> tx : txs) { if (!tx.done()) { - tx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteTx> t) { + tx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteTxEx>>() { + @Override public void apply(IgniteInternalFuture<IgniteTxEx> t) { remainingTxs.remove(tx); checkRemaining(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 72b2505..ebd6f81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; @@ -1239,6 +1240,11 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ + @Override public IgniteTxEx txStartEx(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) { + return cache.txStartEx(concurrency, isolation); + } + + /** {@inheritDoc} */ @Override public IgniteTx txStart(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) { return cache.txStart(concurrency, isolation); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index a5b73dc..0dfea35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -24,6 +24,7 @@ import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; @@ -1128,6 +1129,18 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ + @Override public IgniteTxEx txStartEx(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.txStartEx(concurrency, isolation); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public IgniteTx txStart(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 956c503..c036de4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -461,7 +461,7 @@ public class GridCacheSharedContext<K, V> { * @param tx Transaction to commit. * @return Commit future. */ - public IgniteInternalFuture<IgniteTx> commitTxAsync(IgniteTxEx<K, V> tx) { + public IgniteInternalFuture<IgniteTxEx> commitTxAsync(IgniteTxEx<K, V> tx) { Collection<Integer> cacheIds = tx.activeCacheIds(); if (cacheIds.isEmpty()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java index d89a670..9cb78d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; @@ -233,7 +234,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @throws IgniteCheckedException If data loading failed. */ @SuppressWarnings("unchecked") - @Nullable public V loadFromStore(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + @Nullable public V loadFromStore(@Nullable IgniteTxEx tx, K key) throws IgniteCheckedException { return (V)loadFromStore(tx, key, true); } @@ -246,7 +247,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return Loaded value, possibly <tt>null</tt>. * @throws IgniteCheckedException If data loading failed. */ - @Nullable private Object loadFromStore(@Nullable IgniteTx tx, + @Nullable private Object loadFromStore(@Nullable IgniteTxEx tx, K key, boolean convert) throws IgniteCheckedException { @@ -320,7 +321,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @param vis Closure to apply for loaded elements. * @throws IgniteCheckedException If data loading failed. */ - public void localStoreLoadAll(@Nullable IgniteTx tx, + public void localStoreLoadAll(@Nullable IgniteTxEx tx, Collection<? extends K> keys, final GridInClosure3<K, V, GridCacheVersion> vis) throws IgniteCheckedException { @@ -340,11 +341,11 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @throws IgniteCheckedException If data loading failed. */ @SuppressWarnings({"unchecked"}) - public boolean loadAllFromStore(@Nullable IgniteTx tx, + public boolean loadAllFromStore(@Nullable IgniteTxEx tx, Collection<? extends K> keys, final IgniteBiInClosure<K, V> vis) throws IgniteCheckedException { if (store != null) { - loadAllFromStore(null, keys, vis, null); + loadAllFromStore(tx, keys, vis, null); return true; } @@ -364,7 +365,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - private void loadAllFromStore(@Nullable IgniteTx tx, + private void loadAllFromStore(@Nullable IgniteTxEx tx, Collection<? extends K> keys, final @Nullable IgniteBiInClosure<K, V> vis, final @Nullable GridInClosure3<K, V, GridCacheVersion> verVis) @@ -518,7 +519,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return {@code true} If there is a persistent storage. * @throws IgniteCheckedException If storage failed. */ - public boolean putToStore(@Nullable IgniteTx tx, K key, V val, GridCacheVersion ver) + public boolean putToStore(@Nullable IgniteTxEx tx, K key, V val, GridCacheVersion ver) throws IgniteCheckedException { if (store != null) { // Never persist internal keys. @@ -568,7 +569,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return {@code True} if there is a persistent storage. * @throws IgniteCheckedException If storage failed. */ - public boolean putAllToStore(@Nullable IgniteTx tx, Map<K, IgniteBiTuple<V, GridCacheVersion>> map) + public boolean putAllToStore(@Nullable IgniteTxEx tx, Map<K, IgniteBiTuple<V, GridCacheVersion>> map) throws IgniteCheckedException { if (F.isEmpty(map)) return true; @@ -628,7 +629,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return {@code True} if there is a persistent storage. * @throws IgniteCheckedException If storage failed. */ - public boolean removeFromStore(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + public boolean removeFromStore(@Nullable IgniteTxEx tx, K key) throws IgniteCheckedException { if (store != null) { // Never remove internal key from store as it is never persisted. if (key instanceof GridCacheInternal) @@ -674,7 +675,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @throws IgniteCheckedException If storage failed. */ @SuppressWarnings("unchecked") - public boolean removeAllFromStore(@Nullable IgniteTx tx, Collection<?> keys) throws IgniteCheckedException { + public boolean removeAllFromStore(@Nullable IgniteTxEx tx, Collection<?> keys) throws IgniteCheckedException { if (F.isEmpty(keys)) return true; @@ -741,7 +742,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @param commit Commit. * @throws IgniteCheckedException If failed. */ - public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { + public void txEnd(IgniteTxEx tx, boolean commit) throws IgniteCheckedException { assert store != null; initSession(tx); @@ -775,7 +776,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { /** * @param tx Current transaction. */ - private void initSession(@Nullable IgniteTx tx) { + private void initSession(@Nullable IgniteTxEx<?, ?> tx) { SessionData ses; if (tx != null) { @@ -802,7 +803,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { private static class SessionData { /** */ @GridToStringExclude - private final IgniteTx tx; + private final IgniteTxEx tx; /** */ private String cacheName; @@ -815,7 +816,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @param tx Current transaction. * @param cacheName Cache name. */ - private SessionData(@Nullable IgniteTx tx, @Nullable String cacheName) { + private SessionData(@Nullable IgniteTxEx tx, @Nullable String cacheName) { this.tx = tx; this.cacheName = cacheName; } @@ -824,7 +825,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return Transaction. */ @Nullable private IgniteTx transaction() { - return tx; + return tx != null ? tx.proxy() : null; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 7cbcd85..f23ff59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -166,17 +166,6 @@ public class GridCacheUtils { } }; - /** Converts transaction to XID. */ - private static final IgniteClosure<IgniteTx, IgniteUuid> tx2xid = new C1<IgniteTx, IgniteUuid>() { - @Override public IgniteUuid apply(IgniteTx tx) { - return tx.xid(); - } - - @Override public String toString() { - return "Transaction to XID converter."; - } - }; - /** Converts transaction to XID version. */ private static final IgniteClosure tx2xidVer = new C1<IgniteTxEx, GridCacheVersion>() { @Override public GridCacheVersion apply(IgniteTxEx tx) { @@ -806,13 +795,6 @@ public class GridCacheUtils { } /** - * @return Closure which converts transaction to xid. - */ - public static IgniteClosure<IgniteTx, IgniteUuid> tx2xid() { - return tx2xid; - } - - /** * @return Closure that converts entry to key. */ @SuppressWarnings({"unchecked"}) @@ -1210,21 +1192,21 @@ public class GridCacheUtils { * @param isolation Isolation. * @return New transaction. */ - public static IgniteTx txStartInternal(GridCacheContext ctx, CacheProjection prj, + public static IgniteTxEx txStartInternal(GridCacheContext ctx, CacheProjection prj, IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) { assert ctx != null; assert prj != null; ctx.tm().txContextReset(); - return prj.txStart(concurrency, isolation); + return prj.txStartEx(concurrency, isolation); } /** * @param tx Transaction. * @return String view of all safe-to-print transaction properties. */ - public static String txString(@Nullable IgniteTx tx) { + public static String txString(@Nullable IgniteTxEx tx) { if (tx == null) return "null"; @@ -1627,7 +1609,7 @@ public class GridCacheUtils { public static <K, V> void inTx(CacheProjection<K, V> cache, IgniteTxConcurrency concurrency, IgniteTxIsolation isolation, IgniteInClosureX<CacheProjection<K ,V>> clo) throws IgniteCheckedException { - try (IgniteTx tx = cache.txStart(concurrency, isolation)) { + try (IgniteTxEx tx = cache.txStartEx(concurrency, isolation);) { clo.applyx(cache); tx.commit(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java index 7ce5632..9511c86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java @@ -101,7 +101,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { while (true) { try { - T data = (T)cache.getAndRemove(key); + T data = (T)cache.remove(key, null); if (data != null) return data; @@ -110,7 +110,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { stop = U.currentTimeMillis() + RETRY_TIMEOUT; while (U.currentTimeMillis() < stop ) { - data = (T)cache.getAndRemove(key); + data = (T)cache.remove(key, null); if (data != null) return data; @@ -118,7 +118,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { break; } - catch (CachePartialUpdateException e) { + catch (CachePartialUpdateCheckedException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { @@ -133,7 +133,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { } } catch (IgniteCheckedException e) { - throw new IgniteException(e); + throw U.convertException(e); } } @@ -162,11 +162,11 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { while (true) { try { - cache.putAll(putMap); + cache.putAll(putMap, null); break; } - catch (CachePartialUpdateException e) { + catch (CachePartialUpdateCheckedException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { @@ -180,14 +180,14 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { return true; } catch (IgniteCheckedException e) { - throw new IgniteException(e); + throw U.convertException(e); } } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected void removeItem(long rmvIdx) throws IgniteCheckedException { - Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)); + Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get(); if (idx != null) { checkRemoved(idx); @@ -200,20 +200,20 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { while (true) { try { - if (cache.remove(key)) + if (cache.removex(key, null)) return; if (stop == 0) stop = U.currentTimeMillis() + RETRY_TIMEOUT; while (U.currentTimeMillis() < stop ) { - if (cache.remove(key)) + if (cache.removex(key, null)) return; } break; } - catch (CachePartialUpdateException e) { + catch (CachePartialUpdateCheckedException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { @@ -240,9 +240,9 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { while (true) { try { - return (Long)cache.invoke(queueKey, c); + return (Long)cache.invoke(queueKey, c).get(); } - catch (CachePartialUpdateException e) { + catch (CachePartialUpdateCheckedException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicLongImpl.java index 686b38f..3157674 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicLongImpl.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.internal.util.typedef.*; @@ -81,7 +82,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #incrementAndGet()}. */ private final Callable<Long> incAndGetCall = new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicLongValue val = atomicView.get(key); @@ -112,7 +113,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #getAndIncrement()}. */ private final Callable<Long> getAndIncCall = new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicLongValue val = atomicView.get(key); @@ -143,7 +144,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #decrementAndGet()}. */ private final Callable<Long> decAndGetCall = new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicLongValue val = atomicView.get(key); @@ -174,7 +175,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #getAndDecrement()}. */ private final Callable<Long> getAndDecCall = new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicLongValue val = atomicView.get(key); @@ -339,7 +340,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalAddAndGet(final long l) { return new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicLongValue val = atomicView.get(key); @@ -377,7 +378,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalGetAndAdd(final long l) { return new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicLongValue val = atomicView.get(key); @@ -415,7 +416,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalGetAndSet(final long l) { return new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicLongValue val = atomicView.get(key); @@ -455,7 +456,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Boolean> internalCompareAndSet(final long expVal, final long newVal) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicLongValue val = atomicView.get(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicReferenceImpl.java index b5fa296..72504a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicReferenceImpl.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.internal.util.typedef.*; @@ -193,8 +194,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef private Callable<Boolean> internalSet(final T val) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); @@ -233,7 +233,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef final IgniteClosure<T, T> newValClos) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java index 628d696..c8c46aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicSequenceImpl.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.internal.util.typedef.*; @@ -400,7 +401,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc private Callable<Long> internalUpdate(final long l, final boolean updated) { return new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicSequenceValue seq = seqView.get(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicStampedImpl.java index 7941faf..bf5861a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAtomicStampedImpl.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; @@ -219,8 +220,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt private Callable<Boolean> internalSet(final T val, final S stamp) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); @@ -262,7 +262,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt final IgniteClosure<S, S> newStampClos) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheCountDownLatchImpl.java index 7ee45bf..018d8d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheCountDownLatchImpl.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -219,7 +219,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc internalLatch = CU.outTx( new Callable<CountDownLatch>() { @Override public CountDownLatch call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheCountDownLatchValue val = latchView.get(key); @@ -318,7 +318,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public Integer call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ); + IgniteTxEx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ); try { GridCacheCountDownLatchValue latchVal = latchView.get(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheDataStructuresManager.java index d07da6e..cab9a2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheDataStructuresManager.java @@ -198,9 +198,8 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager return val; return CU.outTx(new Callable<CacheAtomicSequence>() { - @Override - public CacheAtomicSequence call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + @Override public CacheAtomicSequence call() throws Exception { + try (IgniteTxEx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class); @@ -318,7 +317,7 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager return CU.outTx(new Callable<CacheAtomicLong>() { @Override public CacheAtomicLong call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTxEx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class); @@ -414,7 +413,7 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager return CU.outTx(new Callable<CacheAtomicReference<T>>() { @Override public CacheAtomicReference<T> call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTxEx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue val = cast(dsView.get(key), GridCacheAtomicReferenceValue.class); @@ -513,7 +512,7 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager return CU.outTx(new Callable<CacheAtomicStamped<T, S>>() { @Override public CacheAtomicStamped<T, S> call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTxEx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicStampedValue val = cast(dsView.get(key), GridCacheAtomicStampedValue.class); @@ -748,7 +747,7 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager if (hdr.empty()) return true; - GridCacheQueueAdapter.removeKeys(cctx.kernalContext().cache().jcache(cctx.cache().name()), + GridCacheQueueAdapter.removeKeys(cctx.cache(), hdr.id(), name, hdr.collocated(), @@ -793,7 +792,7 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager return CU.outTx(new Callable<CacheCountDownLatch>() { @Override public CacheCountDownLatch call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTxEx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class); @@ -857,7 +856,7 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager @Override public Boolean call() throws Exception { GridCacheInternal key = new GridCacheInternalKeyImpl(name); - try (IgniteTx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTxEx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class); @@ -904,7 +903,7 @@ public final class GridCacheDataStructuresManager<K, V> extends GridCacheManager return CU.outTx( new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTxEx tx = CU.txStartInternal(cctx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. R val = cast(dsView.get(key), cls); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueAdapter.java index 344a9c9..870d9b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueAdapter.java @@ -57,7 +57,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp protected final GridCacheContext<?, ?> cctx; /** Cache. */ - protected final IgniteCache cache; + protected final GridCacheAdapter cache; /** Queue name. */ protected final String queueName; @@ -98,7 +98,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp cap = hdr.capacity(); collocated = hdr.collocated(); queueKey = new GridCacheQueueHeaderKey(queueName); - cache = cctx.kernalContext().cache().jcache(cctx.name()); + cache = cctx.kernalContext().cache().internalCache(cctx.name()); log = cctx.logger(getClass()); @@ -137,24 +137,34 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public int size() { - GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); + try { + GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); - checkRemoved(hdr); + checkRemoved(hdr); - return hdr.size(); + return hdr.size(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public T peek() throws IgniteException { - GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); + try { + GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); - checkRemoved(hdr); + checkRemoved(hdr); - if (hdr.empty()) - return null; + if (hdr.empty()) + return null; - return (T)cache.get(itemKey(hdr.head())); + return (T)cache.get(itemKey(hdr.head())); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ @@ -381,7 +391,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - static void removeKeys(IgniteCache cache, + static void removeKeys(GridCacheAdapter cache, IgniteUuid id, String name, boolean collocated, @@ -579,19 +589,24 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp if (next == null) throw new NoSuchElementException(); - cur = next; - curIdx = idx; + try { + cur = next; + curIdx = idx; - idx++; + idx++; - if (rmvIdxs != null) { - while (F.contains(rmvIdxs, idx) && idx < endIdx) - idx++; - } + if (rmvIdxs != null) { + while (F.contains(rmvIdxs, idx) && idx < endIdx) + idx++; + } - next = idx < endIdx ? (T)cache.get(itemKey(idx)) : null; + next = idx < endIdx ? (T)cache.get(itemKey(idx)) : null; - return cur; + return cur; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java index 257d8ae..9340380 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -34,9 +34,6 @@ import static org.apache.ignite.transactions.IgniteTxIsolation.*; * {@link org.apache.ignite.cache.datastructures.CacheQueue} implementation using transactional cache. */ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { - /** */ - private final IgniteTransactions txs; - /** * @param queueName Queue name. * @param hdr Queue header. @@ -44,8 +41,6 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> */ public GridTransactionalCacheQueueImpl(String queueName, GridCacheQueueHeader hdr, GridCacheContext<?, ?> cctx) { super(queueName, hdr, cctx); - - txs = cctx.kernalContext().grid().transactions(); } /** {@inheritDoc} */ @@ -60,8 +55,8 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> while (true) { try { - try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)); + try (IgniteTxEx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get(); if (idx != null) { checkRemoved(idx); @@ -78,28 +73,24 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> break; } } - catch (CacheException e) { - if (e.getCause() instanceof ClusterGroupEmptyCheckedException) + catch (ClusterTopologyCheckedException e) { + if (e instanceof ClusterGroupEmptyCheckedException) throw e; - if (e.getCause() instanceof ClusterTopologyCheckedException) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + if (cnt++ == MAX_UPDATE_RETRIES) + throw e; + else { + U.warn(log, "Failed to add item, will retry [err=" + e + ']'); - U.sleep(RETRY_DELAY); - } + U.sleep(RETRY_DELAY); } - else - throw e; } } return retVal; } catch (IgniteCheckedException e) { - throw new IgniteException(e); + throw U.convertException(e); } } @@ -112,13 +103,13 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> T retVal; while (true) { - try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)); + try (IgniteTxEx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get(); if (idx != null) { checkRemoved(idx); - retVal = (T)cache.getAndRemove(itemKey(idx)); + retVal = (T)cache.remove(itemKey(idx), null); assert retVal != null; } @@ -129,28 +120,24 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> break; } - catch (CacheException e) { - if (e.getCause() instanceof ClusterGroupEmptyCheckedException) + catch (ClusterTopologyCheckedException e) { + if (e instanceof ClusterGroupEmptyCheckedException) throw e; - if (e.getCause() instanceof ClusterTopologyCheckedException) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + if (cnt++ == MAX_UPDATE_RETRIES) + throw e; + else { + U.warn(log, "Failed to add item, will retry [err=" + e + ']'); - U.sleep(RETRY_DELAY); - } + U.sleep(RETRY_DELAY); } - else - throw e; } } return retVal; } catch (IgniteCheckedException e) { - throw new IgniteException(e); + throw U.convertException(e); } } @@ -165,8 +152,8 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> int cnt = 0; while (true) { - try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())); + try (IgniteTxEx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get(); if (idx != null) { checkRemoved(idx); @@ -179,7 +166,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> idx++; } - cache.putAll(putMap); + cache.putAll(putMap, null); retVal = true; } @@ -190,21 +177,17 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> break; } - catch (CacheException e) { - if (e.getCause() instanceof ClusterGroupEmptyCheckedException) + catch (ClusterTopologyCheckedException e) { + if (e instanceof ClusterGroupEmptyCheckedException) throw e; - if (e.getCause() instanceof ClusterTopologyCheckedException) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + if (cnt++ == MAX_UPDATE_RETRIES) + throw e; + else { + U.warn(log, "Failed to add item, will retry [err=" + e + ']'); - U.sleep(RETRY_DELAY); - } + U.sleep(RETRY_DELAY); } - else - throw e; } } @@ -222,13 +205,13 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> int cnt = 0; while (true) { - try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)); + try (IgniteTxEx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get(); if (idx != null) { checkRemoved(idx); - boolean rmv = cache.remove(itemKey(idx)); + boolean rmv = cache.removex(itemKey(idx)); assert rmv; } @@ -237,26 +220,22 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> break; } - catch (CacheException e) { - if (e.getCause() instanceof ClusterGroupEmptyCheckedException) + catch (ClusterTopologyCheckedException e) { + if (e instanceof ClusterGroupEmptyCheckedException) throw e; - if (e.getCause() instanceof ClusterTopologyCheckedException) { - if (cnt++ == MAX_UPDATE_RETRIES) - throw e; - else { - U.warn(log, "Failed to add item, will retry [err=" + e + ']'); + if (cnt++ == MAX_UPDATE_RETRIES) + throw e; + else { + U.warn(log, "Failed to add item, will retry [err=" + e + ']'); - U.sleep(RETRY_DELAY); - } + U.sleep(RETRY_DELAY); } - else - throw e; } } } catch (IgniteCheckedException e) { - throw new IgniteException(e); + throw U.convertException(e); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index c417e9d..8dae033 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -718,11 +718,11 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteTx> commitAsync() { + @Override public IgniteInternalFuture<IgniteTxEx> commitAsync() { try { commit(); - return new GridFinishedFutureEx<IgniteTx>(this); + return new GridFinishedFutureEx<IgniteTxEx>(this); } catch (IgniteCheckedException e) { return new GridFinishedFutureEx<>(e); @@ -749,10 +749,10 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteTx> rollbackAsync() { + @Override public IgniteInternalFuture<IgniteTxEx> rollbackAsync() { rollback(); - return new GridFinishedFutureEx<IgniteTx>(this); + return new GridFinishedFutureEx<IgniteTxEx>(this); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d6afdaa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 466d43b..5b0335d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -868,8 +868,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert t.implicit(); return t.commitAsync().chain( - new C1<IgniteInternalFuture<IgniteTx>, GridNearLockResponse<K, V>>() { - @Override public GridNearLockResponse<K, V> apply(IgniteInternalFuture<IgniteTx> f) { + new C1<IgniteInternalFuture<IgniteTxEx>, GridNearLockResponse<K, V>>() { + @Override public GridNearLockResponse<K, V> apply(IgniteInternalFuture<IgniteTxEx> f) { try { // Check for error. f.get();