IGNITE-264 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f286bb42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f286bb42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f286bb42 Branch: refs/heads/ignite-264 Commit: f286bb42f0d200d4adccd83d15e0577fa37b893a Parents: 6356e5e Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Feb 24 17:55:30 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Feb 24 17:55:30 2015 -0800 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxFinishRequest.java | 7 ++++++ .../dht/GridDhtTxFinishResponse.java | 26 ++++++++++---------- .../near/GridNearTxFinishFuture.java | 10 +++++--- .../cache/transactions/IgniteTxHandler.java | 8 ++++-- 4 files changed, 33 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f286bb42/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index d9e8b18..60211e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -192,6 +192,13 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest } /** + * @param checkCommitted Check committed flag. + */ + public void checkCommitted(boolean checkCommitted) { + this.checkCommitted = checkCommitted; + } + + /** * @return Topology version. */ @Override public long topologyVersion() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f286bb42/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java index 9f3c38e..45b1f8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java @@ -41,10 +41,10 @@ public class GridDhtTxFinishResponse<K, V> extends GridDistributedTxFinishRespon /** Error. */ @GridDirectTransient - private Throwable err; + private Throwable checkCommittedErr; /** Serialized error. */ - private byte[] errBytes; + private byte[] checkCommittedErrBytes; /** Flag indicating if this is a check-committed response. */ private boolean checkCommitted; @@ -79,15 +79,15 @@ public class GridDhtTxFinishResponse<K, V> extends GridDistributedTxFinishRespon /** * @return Error for check committed backup requests. */ - public Throwable error() { - return err; + public Throwable checkCommittedError() { + return checkCommittedErr; } /** - * @param err Error for check committed backup requests. + * @param checkCommittedErr Error for check committed backup requests. */ - public void error(Throwable err) { - this.err = err; + public void checkCommittedError(Throwable checkCommittedErr) { + this.checkCommittedErr = checkCommittedErr; } /** @@ -108,8 +108,8 @@ public class GridDhtTxFinishResponse<K, V> extends GridDistributedTxFinishRespon @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (err != null) - errBytes = ctx.marshaller().marshal(err); + if (checkCommittedErr != null) + checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr); } /** {@inheritDoc} */ @@ -117,8 +117,8 @@ public class GridDhtTxFinishResponse<K, V> extends GridDistributedTxFinishRespon throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (errBytes != null) - err = ctx.marshaller().unmarshal(errBytes, ldr); + if (checkCommittedErrBytes != null) + checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, ldr); } /** {@inheritDoc} */ @@ -148,7 +148,7 @@ public class GridDhtTxFinishResponse<K, V> extends GridDistributedTxFinishRespon writer.incrementState(); case 6: - if (!writer.writeByteArray("errBytes", errBytes)) + if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes)) return false; writer.incrementState(); @@ -184,7 +184,7 @@ public class GridDhtTxFinishResponse<K, V> extends GridDistributedTxFinishRespon reader.incrementState(); case 6: - errBytes = reader.readByteArray("errBytes"); + checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f286bb42/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 9b29ee3..e6930d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -398,6 +398,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu null, 0); + finishReq.checkCommitted(true); + try { cctx.io().send(backup, finishReq, tx.ioPolicy()); } @@ -575,7 +577,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @return Node ID. */ public ClusterNode node() { - return m.node(); + assert m != null || backup != null; + + return m != null ? m.node() : backup; } /** @@ -625,8 +629,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu void onResult(GridDhtTxFinishResponse<K, V> res) { assert backup != null; - if (res.error() != null) - onDone(res.error()); + if (res.checkCommittedError() != null) + onDone(res.checkCommittedError()); else onDone(tx); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f286bb42/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 0633964..3680f0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -736,6 +736,8 @@ public class IgniteTxHandler<K, V> { nearTx = null; if (req.checkCommitted()) { + assert dhtTx == null || dhtTx.onePhaseCommit() : "Invalid transaction: " + dhtTx; + boolean committed = true; if (dhtTx == null) { @@ -744,6 +746,8 @@ public class IgniteTxHandler<K, V> { } sendReply(nodeId, req, committed); + + return; } else finish(nodeId, dhtTx, req); @@ -875,8 +879,8 @@ public class IgniteTxHandler<K, V> { res.checkCommitted(true); if (!committed) - res.error(new IgniteTxRollbackCheckedException("Failed to commit transaction (transaction has been " + - "rolled back on backup node): " + req.version())); + res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " + + "(transaction has been rolled back on backup node): " + req.version())); } try {