http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index ef63038,7e7d526..205b10f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@@ -1732,9 -1734,9 +1734,9 @@@ public class GridCacheEvictionManager<K log.debug("Sending eviction request [node=" + nodeId + ", req=" + req + ']'); try { - cctx.io().send(nodeId, req); + cctx.io().send(nodeId, req, cctx.ioPolicy()); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { // Node left the topology. onNodeLeft(nodeId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 359236d,69d151e..d78cadc --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@@ -334,9 -335,20 +335,9 @@@ public class GridCacheIoManager<K, V> e * @param node Node to send the message to. * @param msg Message to send. * @throws IgniteCheckedException If sending failed. - * @throws ClusterTopologyException If receiver left. + * @throws ClusterTopologyCheckedException If receiver left. */ - public void send(ClusterNode node, GridCacheMessage<K, V> msg) throws IgniteCheckedException { - send(node, msg, SYSTEM_POOL); - } - - /** - * Sends communication message. - * - * @param node Node to send the message to. - * @param msg Message to send. - * @throws IgniteCheckedException If sending failed. - * @throws ClusterTopologyCheckedException If receiver left. - */ + @SuppressWarnings("unchecked") public void send(ClusterNode node, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException { assert !node.isLocal(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java index 567fd37,6a828a3..7047780 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java @@@ -157,9 -158,9 +158,9 @@@ public class GridCacheOptimisticCheckPr nodeTransactions(id), futureId(), fut.futureId()); try { - cctx.io().send(id, req); + cctx.io().send(id, req, tx.ioPolicy()); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { fut.onNodeLeft(); } catch (IgniteCheckedException e) { @@@ -178,9 -179,9 +179,9 @@@ nodeTransactions(nodeId), futureId(), fut.futureId()); try { - cctx.io().send(nodeId, req); + cctx.io().send(nodeId, req, tx.ioPolicy()); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { fut.onNodeLeft(); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java index 56cf33b,61d1407..b65b79b --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java @@@ -148,9 -149,9 +149,9 @@@ public class GridCachePessimisticCheckC add(fut); try { - cctx.io().send(rmtNode.id(), req); + cctx.io().send(rmtNode.id(), req, tx.ioPolicy()); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { fut.onNodeLeft(); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 06bebd6,a1b7655..6cd3fe3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@@ -945,9 -947,9 +946,9 @@@ public final class GridDhtLockFuture<K log.debug("Sending DHT lock request to near node [node=" + n.id() + ", req=" + req + ']'); - cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(n, req, cctx.ioPolicy()); } - catch (ClusterTopologyException e) { + catch (ClusterTopologyCheckedException e) { fut.onResult(e); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index f72f62a,4d92302..5674793 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@@ -437,9 -440,9 +440,9 @@@ public abstract class GridDhtTransactio if (res != null) { try { // Reply back to sender. - ctx.io().send(nodeId, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + ctx.io().send(nodeId, res, ctx.ioPolicy()); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send lock reply to remote node because it left grid: " + nodeId); fail = true; @@@ -1427,9 -1430,9 +1430,9 @@@ req.completedVersions(committed, rolledback); - ctx.io().send(n, req); + ctx.io().send(n, req, ctx.ioPolicy()); } - catch (ClusterTopologyException ignore) { + catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) log.debug("Node left while sending unlock request: " + n); } @@@ -1455,9 -1458,9 +1458,9 @@@ req.completedVersions(committed, rolledback); - ctx.io().send(n, req); + ctx.io().send(n, req, ctx.ioPolicy()); } - catch (ClusterTopologyException ignore) { + catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) log.debug("Node left while sending unlock request: " + n); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 78e0c45,83a1274..6f5284a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@@ -632,9 -634,9 +634,9 @@@ public class GridDhtTxLocal<K, V> exten nearFinMiniId, err); try { - cctx.io().send(nearNodeId, res, system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(nearNodeId, res, ioPolicy()); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { if (log.isDebugEnabled()) log.debug("Node left before sending finish response (transaction was committed) [node=" + nearNodeId + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index d918449,8d0a854..b8072ca --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@@ -38,8 -39,9 +39,10 @@@ import java.io.* import java.util.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.transactions.IgniteTxState.*; import static org.apache.ignite.events.IgniteEventType.*; + import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + import static org.apache.ignite.transactions.IgniteTxState.*; /** * @@@ -689,9 -691,9 +692,9 @@@ public final class GridDhtTxPrepareFutu //noinspection TryWithIdenticalCatches try { - cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(n, req, tx.ioPolicy()); } - catch (ClusterTopologyException e) { + catch (ClusterTopologyCheckedException e) { fut.onResult(e); } catch (IgniteCheckedException e) { @@@ -743,9 -745,9 +746,9 @@@ //noinspection TryWithIdenticalCatches try { - cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); } - catch (ClusterTopologyException e) { + catch (ClusterTopologyCheckedException e) { fut.onResult(e); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index efd952b,414d64b..0da58be --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@@ -2639,9 -2643,9 +2643,9 @@@ public class GridDhtAtomicCache<K, V> e */ private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse<K, V> res) { try { - ctx.io().send(nodeId, res); + ctx.io().send(nodeId, res, ctx.ioPolicy()); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send near update reply to node because it left grid: " + nodeId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 336a9d5,4a42c36..a4bda8e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@@ -352,9 -353,9 +353,9 @@@ public class GridDhtAtomicUpdateFuture< if (log.isDebugEnabled()) log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req); + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send update request to backup node because it left grid: " + req.nodeId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index c76fb47,370ffcd..b3a74a9 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@@ -869,9 -871,9 +870,9 @@@ public final class GridDhtColocatedLock if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(node, req, cctx.ioPolicy()); } - catch (ClusterTopologyException ex) { + catch (ClusterTopologyCheckedException ex) { assert fut != null; fut.onResult(ex); @@@ -884,9 -886,9 +885,9 @@@ if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(node, req, cctx.ioPolicy()); } - catch (ClusterTopologyException ex) { + catch (ClusterTopologyCheckedException ex) { assert fut != null; fut.onResult(ex); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 97e0ce8,17908e7..4d95419 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@@ -379,9 -380,9 +380,9 @@@ public class GridDhtPreloader<K, V> ext if (log.isDebugEnabled()) log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']'); - cctx.io().send(node, res); + cctx.io().send(node, res, cctx.ioPolicy()); } - catch (ClusterTopologyException ignore) { + catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() + ", req=" + msg + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 5e370aa,4a0f687..9c56540 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@@ -1119,9 -1121,9 +1120,9 @@@ public final class GridNearLockFuture<K if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(node, req, cctx.ioPolicy()); } - catch (ClusterTopologyException ex) { + catch (ClusterTopologyCheckedException ex) { assert fut != null; fut.onResult(ex); @@@ -1134,9 -1136,9 +1135,9 @@@ if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(node, req, cctx.ioPolicy()); } - catch (ClusterTopologyException ex) { + catch (ClusterTopologyCheckedException ex) { assert fut != null; fut.onResult(ex); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 0000000,a349731..79015dd mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@@ -1,0 -1,706 +1,712 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.transactions; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.internal.*; ++import org.apache.ignite.internal.managers.communication.*; + import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.processors.timeout.*; + import org.apache.ignite.internal.transactions.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.transactions.*; + import org.jetbrains.annotations.*; + + import java.util.*; + + /** + * Transaction managed by cache ({@code 'Ex'} stands for external). + */ + public interface IgniteInternalTx<K, V> extends AutoCloseable, GridTimeoutObject { + /** + * + */ + @SuppressWarnings("PublicInnerClass") + public enum FinalizationStatus { + /** Transaction was not finalized yet. */ + NONE, + + /** Transaction is being finalized by user. */ + USER_FINISH, + + /** Recovery request is received, user finish requests should be ignored. */ + RECOVERY_WAIT, + + /** Transaction is being finalized by recovery procedure. */ + RECOVERY_FINISH + } + + /** + * Gets unique identifier for this transaction. + * + * @return Transaction UID. + */ + public IgniteUuid xid(); + + /** + * ID of the node on which this transaction started. + * + * @return Originating node ID. + */ + public UUID nodeId(); + + /** + * ID of the thread in which this transaction started. + * + * @return Thread ID. + */ + public long threadId(); + + /** + * Start time of this transaction. + * + * @return Start time of this transaction on this node. + */ + public long startTime(); + + /** + * Cache transaction isolation level. + * + * @return Isolation level. + */ + public IgniteTxIsolation isolation(); + + /** + * Cache transaction concurrency mode. + * + * @return Concurrency mode. + */ + public IgniteTxConcurrency concurrency(); + + /** + * Flag indicating whether transaction was started automatically by the + * system or not. System will start transactions implicitly whenever + * any cache {@code put(..)} or {@code remove(..)} operation is invoked + * outside of transaction. + * + * @return {@code True} if transaction was started implicitly. + */ + public boolean implicit(); + + /** + * Get invalidation flag for this transaction. If set to {@code true}, then + * remote values will be {@code invalidated} (set to {@code null}) instead + * of updated. + * <p> + * Invalidation messages don't carry new values, so they are a lot lighter + * than update messages. However, when a value is accessed on a node after + * it's been invalidated, it must be loaded from persistent store. + * + * @return Invalidation flag. + */ + public boolean isInvalidate(); + + /** + * Gets current transaction state value. + * + * @return Current transaction state. + */ + public IgniteTxState state(); + + /** + * Gets timeout value in milliseconds for this transaction. If transaction times + * out prior to it's completion, {@link org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException} will be thrown. + * + * @return Transaction timeout value. + */ + public long timeout(); + + /** + * Sets transaction timeout value. This value can be set only before a first operation + * on transaction has been performed. + * + * @param timeout Transaction timeout value. + * @return Previous timeout. + */ + public long timeout(long timeout); + + /** + * Modify the transaction associated with the current thread such that the + * only possible outcome of the transaction is to roll back the + * transaction. + * + * @return {@code True} if rollback-only flag was set as a result of this operation, + * {@code false} if it was already set prior to this call or could not be set + * because transaction is already finishing up committing or rolling back. + */ + public boolean setRollbackOnly(); + + /** + * If transaction was marked as rollback-only. + * + * @return {@code True} if transaction can only be rolled back. + */ + public boolean isRollbackOnly(); + + /** + * Commits this transaction by initiating {@code two-phase-commit} process. + * + * @throws IgniteCheckedException If commit failed. + */ + @IgniteAsyncSupported + public void commit() throws IgniteCheckedException; + + /** + * Ends the transaction. Transaction will be rolled back if it has not been committed. + * + * @throws IgniteCheckedException If transaction could not be gracefully ended. + */ + @Override public void close() throws IgniteCheckedException; + + /** + * Rolls back this transaction. + * + * @throws IgniteCheckedException If rollback failed. + */ + @IgniteAsyncSupported + public void rollback() throws IgniteCheckedException; + + /** + * Removes metadata by name. + * + * @param name Name of the metadata to remove. + * @param <T> Type of the value. + * @return Value of removed metadata or {@code null}. + */ + @Nullable public <T> T removeMeta(String name); + + /** + * Gets metadata by name. + * + * @param name Metadata name. + * @param <T> Type of the value. + * @return Metadata value or {@code null}. + */ + @Nullable public <T> T meta(String name); + + /** + * Adds a new metadata. + * + * @param name Metadata name. + * @param val Metadata value. + * @param <T> Type of the value. + * @return Metadata previously associated with given name, or + * {@code null} if there was none. + */ + @Nullable public <T> T addMeta(String name, T val); + + /** + * @return Size of the transaction. + */ + public int size(); + + /** + * @return {@code True} if transaction is allowed to use store. + */ + public boolean storeEnabled(); + + /** + * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with + * store enabled. + */ + public boolean storeUsed(); + + /** + * Checks if this is system cache transaction. System transactions are isolated from user transactions + * because some of the public API methods may be invoked inside user transactions and internally start + * system cache transactions. + * + * @return {@code True} if transaction is started for system cache. + */ + public boolean system(); + + /** ++ * @return Pool where message for the given transaction must be processed. ++ */ ++ public GridIoPolicy ioPolicy(); ++ ++ /** + * @return Last recorded topology version. + */ + public long topologyVersion(); + + /** + * @return Flag indicating whether transaction is implicit with only one key. + */ + public boolean implicitSingle(); + + /** + * @return Collection of cache IDs involved in this transaction. + */ + public Collection<Integer> activeCacheIds(); + + /** + * Attempts to set topology version and returns the current value. + * If topology version was previously set, then it's value will + * be returned (but not updated). + * + * @param topVer Topology version. + * @return Recorded topology version. + */ + public long topologyVersion(long topVer); + + /** + * @return {@code True} if transaction is empty. + */ + public boolean empty(); + + /** + * @return {@code True} if transaction group-locked. + */ + public boolean groupLock(); + + /** + * @return Group lock key if {@link #groupLock()} is {@code true}. + */ + @Nullable public IgniteTxKey groupLockKey(); + + /** + * @return {@code True} if preparing flag was set with this call. + */ + public boolean markPreparing(); + + /** + * @param status Finalization status to set. + * @return {@code True} if could mark was set. + */ + public boolean markFinalizing(FinalizationStatus status); + + /** + * @param cacheCtx Cache context. + * @param part Invalid partition. + */ + public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part); + + /** + * @return Invalid partitions. + */ + public Set<Integer> invalidPartitions(); + + /** + * Gets owned version for near remote transaction. + * + * @param key Key to get version for. + * @return Owned version, if any. + */ + @Nullable public GridCacheVersion ownedVersion(IgniteTxKey<K> key); + + /** + * Gets ID of additional node involved. For example, in DHT case, other node is + * near node ID. + * + * @return Parent node IDs. + */ + @Nullable public UUID otherNodeId(); + + /** + * @return Event node ID. + */ + public UUID eventNodeId(); + + /** + * Gets node ID which directly started this transaction. In case of DHT local transaction it will be + * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote + * transaction it will be starter node ID. + * + * @return Originating node ID. + */ + public UUID originatingNodeId(); + + /** + * @return Master node IDs. + */ + public Collection<UUID> masterNodeIds(); + + /** + * @return Near transaction ID. + */ + @Nullable public GridCacheVersion nearXidVersion(); + + /** + * @return Transaction nodes mapping (primary node -> related backup nodes). + */ + @Nullable public Map<UUID, Collection<UUID>> transactionNodes(); + + /** + * @param entry Entry to check. + * @return {@code True} if lock is owned. + * @throws GridCacheEntryRemovedException If entry has been removed. + */ + public boolean ownsLock(GridCacheEntryEx<K, V> entry) throws GridCacheEntryRemovedException; + + /** + * @param entry Entry to check. + * @return {@code True} if lock is owned. + */ + public boolean ownsLockUnsafe(GridCacheEntryEx<K, V> entry); + + /** + * For Partitioned caches, this flag is {@code false} for remote DHT and remote NEAR + * transactions because serializability of transaction is enforced on primary node. All + * other transaction types must enforce it. + * + * @return Enforce serializable flag. + */ + public boolean enforceSerializable(); + + /** + * @return {@code True} if near transaction. + */ + public boolean near(); + + /** + * @return {@code True} if DHT transaction. + */ + public boolean dht(); + + /** + * @return {@code True} if dht colocated transaction. + */ + public boolean colocated(); + + /** + * @return {@code True} if transaction is local, {@code false} if it's remote. + */ + public boolean local(); + + /** + * @return {@code True} if transaction is replicated. + */ + public boolean replicated(); + + /** + * @return Subject ID initiated this transaction. + */ + public UUID subjectId(); + + /** + * Task name hash in case if transaction was initiated within task execution. + * + * @return Task name hash. + */ + public int taskNameHash(); + + /** + * @return {@code True} if transaction is user transaction, which means: + * <ul> + * <li>Explicit</li> + * <li>Local</li> + * <li>Not DHT</li> + * </ul> + */ + public boolean user(); + + /** + * @return {@code True} if transaction is configured with synchronous commit flag. + */ + public boolean syncCommit(); + + /** + * @return {@code True} if transaction is configured with synchronous rollback flag. + */ + public boolean syncRollback(); + + /** + * @param key Key to check. + * @return {@code True} if key is present. + */ + public boolean hasWriteKey(IgniteTxKey<K> key); + + /** + * @return Read set. + */ + public Set<IgniteTxKey<K>> readSet(); + + /** + * @return Write set. + */ + public Set<IgniteTxKey<K>> writeSet(); + + /** + * @return All transaction entries. + */ + public Collection<IgniteTxEntry<K, V>> allEntries(); + + /** + * @return Write entries. + */ + public Collection<IgniteTxEntry<K, V>> writeEntries(); + + /** + * @return Read entries. + */ + public Collection<IgniteTxEntry<K, V>> readEntries(); + + /** + * @return Transaction write map. + */ + public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap(); + + /** + * @return Transaction read map. + */ + public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap(); + + /** + * Gets pessimistic recovery writes, i.e. values that have never been sent to remote nodes with lock requests. + * + * @return Collection of recovery writes. + */ + public Collection<IgniteTxEntry<K, V>> recoveryWrites(); + + /** + * Gets a list of entries that needs to be locked on the next step of prepare stage of + * optimistic transaction. + * + * @return List of tx entries for optimistic locking. + */ + public Collection<IgniteTxEntry<K, V>> optimisticLockEntries(); + + /** + * Seals transaction for updates. + */ + public void seal(); + + /** + * @param key Key for the entry. + * @return Entry for the key (either from write set or read set). + */ + @Nullable public IgniteTxEntry<K, V> entry(IgniteTxKey<K> key); + + /** + * @param ctx Cache context. + * @param failFast Fail-fast flag. + * @param key Key to look up. + * @param filter Filter to check. + * @return Current value for the key within transaction. + * @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}. + */ + @Nullable public GridTuple<V> peek( + GridCacheContext<K, V> ctx, + boolean failFast, + K key, + @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) throws GridCacheFilterFailedException; + + /** + * @return Start version. + */ + public GridCacheVersion startVersion(); + + /** + * @return Transaction version. + */ + public GridCacheVersion xidVersion(); + + /** + * @return Version created at commit time. + */ + public GridCacheVersion commitVersion(); + + /** + * @param commitVer Commit version. + * @return {@code True} if version was set. + */ + public boolean commitVersion(GridCacheVersion commitVer); + + /** + * @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) + * assigned to this transaction at the end of write phase. + */ + public GridCacheVersion endVersion(); + + /** + * Prepare state. + * + * @throws IgniteCheckedException If failed. + */ + public void prepare() throws IgniteCheckedException; + + /** + * Prepare stage. + * + * @return Future for prepare step. + */ + public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync(); + + /** + * @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) + * assigned to this transaction at the end of write phase. + */ + public void endVersion(GridCacheVersion endVer); + + /** + * @return Transaction write version. For all transactions except DHT transactions, will be equal to + * {@link #xidVersion()}. + */ + public GridCacheVersion writeVersion(); + + /** + * Sets write version. + * + * @param ver Write version. + */ + public void writeVersion(GridCacheVersion ver); + + /** + * @return Future for transaction completion. + */ + public IgniteInternalFuture<IgniteInternalTx> finishFuture(); + + /** + * @param state Transaction state. + * @return {@code True} if transition was valid, {@code false} otherwise. + */ + public boolean state(IgniteTxState state); + + /** + * @param invalidate Invalidate flag. + */ + public void invalidate(boolean invalidate); + + /** + * @param sysInvalidate System invalidate flag. + */ + public void systemInvalidate(boolean sysInvalidate); + + /** + * @return System invalidate flag. + */ + public boolean isSystemInvalidate(); + + /** + * Asynchronously rollback this transaction. + * + * @return Rollback future. + */ + public IgniteInternalFuture<IgniteInternalTx> rollbackAsync(); + + /** + * Asynchronously commits this transaction by initiating {@code two-phase-commit} process. + * + * @return Future for commit operation. + */ + public IgniteInternalFuture<IgniteInternalTx> commitAsync(); + + /** + * Callback invoked whenever there is a lock that has been acquired + * by this transaction for any of the participating entries. + * + * @param entry Cache entry. + * @param owner Lock candidate that won ownership of the lock. + * @return {@code True} if transaction cared about notification. + */ + public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner); + + /** + * @return {@code True} if transaction timed out. + */ + public boolean timedOut(); + + /** + * @return {@code True} if transaction had completed successfully or unsuccessfully. + */ + public boolean done(); + + /** + * @return {@code True} for OPTIMISTIC transactions. + */ + public boolean optimistic(); + + /** + * @return {@code True} for PESSIMISTIC transactions. + */ + public boolean pessimistic(); + + /** + * @return {@code True} if read-committed. + */ + public boolean readCommitted(); + + /** + * @return {@code True} if repeatable-read. + */ + public boolean repeatableRead(); + + /** + * @return {@code True} if serializable. + */ + public boolean serializable(); + + /** + * Checks whether given key has been removed within transaction. + * + * @param key Key to check. + * @return {@code True} if key has been removed. + */ + public boolean removed(IgniteTxKey<K> key); + + /** + * Gets allowed remaining time for this transaction. + * + * @return Remaining time. + * @throws IgniteTxTimeoutCheckedException If transaction timed out. + */ + public long remainingTime() throws IgniteTxTimeoutCheckedException; + + /** + * @return Alternate transaction versions. + */ + public Collection<GridCacheVersion> alternateVersions(); + + /** + * @return {@code True} if transaction needs completed versions for processing. + */ + public boolean needsCompletedVersions(); + + /** + * @param base Base for committed versions. + * @param committed Committed transactions relative to base. + * @param rolledback Rolled back transactions relative to base. + */ + public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed, + Collection<GridCacheVersion> rolledback); + + /** + * @return {@code True} if transaction has at least one internal entry. + */ + public boolean internal(); + + /** + * @return {@code True} if transaction is a one-phase-commit transaction. + */ + public boolean onePhaseCommit(); + + /** + * @return {@code True} if transaction has transform entries. This flag will be only set for local + * transactions. + */ + public boolean hasTransforms(); + + /** + * @return Public API proxy. + */ + public IgniteTxProxy proxy(); + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 9400636,7007d61..07c506e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@@ -20,15 -20,11 +20,12 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; + import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.*; - import org.apache.ignite.lang.*; - import org.apache.ignite.transactions.*; - import org.apache.ignite.internal.processors.cache.distributed.near.*; - import org.apache.ignite.internal.util.typedef.*; - import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; @@@ -41,7 -41,7 +42,8 @@@ import java.util.concurrent.atomic.* import java.util.concurrent.locks.*; import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*; import static org.apache.ignite.transactions.IgniteTxConcurrency.*; import static org.apache.ignite.transactions.IgniteTxIsolation.*; @@@ -498,7 -496,7 +503,8 @@@ public abstract class IgniteTxAdapter<K } /** {@inheritDoc} */ -- @Override public boolean markPreparing() { ++ @Override ++ public boolean markPreparing() { return preparing.compareAndSet(false, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5851544a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 8c8da1a,a154957..bcf2018 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@@ -1406,9 -1407,9 +1407,9 @@@ public class IgniteTxHandler<K, V> if (log.isDebugEnabled()) log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - ctx.io().send(nodeId, res); + ctx.io().send(nodeId, res, SYSTEM_POOL); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { if (log.isDebugEnabled()) log.debug("Failed to send check prepared transaction response (did node leave grid?) [nodeId=" + nodeId + ", res=" + res + ']'); @@@ -1506,9 -1507,9 +1507,9 @@@ if (log.isDebugEnabled()) log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - ctx.io().send(nodeId, res); + ctx.io().send(nodeId, res, SYSTEM_POOL); } - catch (ClusterTopologyException ignored) { + catch (ClusterTopologyCheckedException ignored) { if (log.isDebugEnabled()) log.debug("Failed to send check committed transaction response (did node leave grid?) [nodeId=" + nodeId + ", res=" + res + ']');