IGNITE-264 - Process check committed finish response properly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e9fd4f7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e9fd4f7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e9fd4f7e Branch: refs/heads/ignite-264 Commit: e9fd4f7e36df7a1406af99040fd7b4a776bee2fa Parents: 30fef4a Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Feb 24 17:32:33 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Feb 24 17:32:33 2015 -0800 ---------------------------------------------------------------------- .../dht/GridDhtTxFinishResponse.java | 24 ++++++++++++ .../near/GridNearTxFinishFuture.java | 39 ++++++++++++++++---- .../cache/transactions/IgniteTxHandler.java | 37 +++++++++++++++---- 3 files changed, 85 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fd4f7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java index 03b8227..1627334 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java @@ -46,6 +46,9 @@ public class GridDhtTxFinishResponse<K, V> extends GridDistributedTxFinishRespon /** Serialized error. */ private byte[] errBytes; + /** Flag indicating if this is a check-committed response. */ + private boolean checkCommitted; + /** * Empty constructor required by {@link Externalizable}. */ @@ -80,6 +83,27 @@ public class GridDhtTxFinishResponse<K, V> extends GridDistributedTxFinishRespon return err; } + /** + * @param err Error for check committed backup requests. + */ + public void error(Throwable err) { + this.err = err; + } + + /** + * @return Check committed flag. + */ + public boolean checkCommitted() { + return checkCommitted; + } + + /** + * @param checkCommitted Check committed flag. + */ + public void checkCommitted(boolean checkCommitted) { + this.checkCommitted = checkCommitted; + } + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fd4f7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 0152b39..9b29ee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -213,6 +213,25 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } } + /** + * @param nodeId Sender. + * @param res Result. + */ + public void onResult(UUID nodeId, GridDhtTxFinishResponse<K, V> res) { + if (!isDone()) + for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + f.onResult(res); + } + } + } + } + /** {@inheritDoc} */ @Override public boolean onDone(IgniteInternalTx tx, Throwable err) { if ((initialized() || err != null)) { @@ -348,7 +367,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (!F.isEmpty(backups)) { assert backups.size() == 1; - UUID backup = F.first(backups); + UUID backupId = F.first(backups); + + ClusterNode backup = ctx.discovery().node(backupId); + + // Nothing to do if backup has left the grid. + if (backup == null) + return; MiniFuture mini = new MiniFuture(backup); @@ -512,7 +537,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu private GridDistributedTxMapping<K, V> m; /** Backup check flag. */ - private UUID backupId; + private ClusterNode backup; /** * Empty constructor required for {@link Externalizable}. @@ -531,12 +556,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** - * @param backupId Backup ID to check. + * @param backup Backup to check. */ - MiniFuture(UUID backupId) { + MiniFuture(ClusterNode backup) { super(cctx.kernalContext()); - this.backupId = backupId; + this.backup = backup; } /** @@ -586,7 +611,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param res Result callback. */ void onResult(GridNearTxFinishResponse<K, V> res) { - assert backupId == null; + assert backup == null; if (res.error() != null) onDone(res.error()); @@ -598,7 +623,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param res Response. */ void onResult(GridDhtTxFinishResponse<K, V> res) { - assert backupId != null; + assert backup != null; if (res.error() != null) onDone(res.error()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fd4f7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 880d6a5..0633964 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -409,17 +409,32 @@ public class IgniteTxHandler<K, V> { assert nodeId != null; assert res != null; - GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, V>)ctx.mvcc().<IgniteInternalTx>future(res.xid(), - res.futureId()); + if (res.checkCommitted()) { + GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, V>)ctx.mvcc().<IgniteInternalTx>future( + res.xid(), res.futureId()); - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Received response for unknown future (will ignore): " + res); - return; + return; + } + + fut.onResult(nodeId, res); } + else { + GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, V>)ctx.mvcc().<IgniteInternalTx>future( + res.xid(), res.futureId()); - fut.onResult(nodeId, res); + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Received response for unknown future (will ignore): " + res); + + return; + } + + fut.onResult(nodeId, res); + } } /** @@ -854,9 +869,15 @@ public class IgniteTxHandler<K, V> { */ protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req, boolean committed) { if (req.replyRequired()) { - GridCacheMessage<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId()); + GridDhtTxFinishResponse<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId()); + if (req.checkCommitted()) { + res.checkCommitted(true); + if (!committed) + res.error(new IgniteTxRollbackCheckedException("Failed to commit transaction (transaction has been " + + "rolled back on backup node): " + req.version())); + } try { ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);