#ingite-9655 - Manual merge of changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ebcbb95d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ebcbb95d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ebcbb95d Branch: refs/heads/ingite-9655-merge Commit: ebcbb95dc9299dd0e05015b2157e10231d47813f Parents: 0354a41 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Jan 29 12:01:30 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Jan 29 12:01:30 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/CacheMetricsImpl.java | 8 +- .../distributed/GridDistributedLockRequest.java | 74 ----------- .../GridDistributedTxFinishRequest.java | 126 ------------------- .../dht/GridDhtTransactionalCacheAdapter.java | 1 - .../distributed/dht/GridDhtTxFinishRequest.java | 71 ----------- .../distributed/dht/GridDhtTxPrepareFuture.java | 3 +- .../near/GridNearTxPrepareFuture.java | 28 +++-- .../near/GridNearTxPrepareResponse.java | 8 +- .../GridTcpCommunicationMessageFactory.java | 29 +++-- 9 files changed, 42 insertions(+), 306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebcbb95d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 60b2287..0de039b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -184,12 +184,12 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxCommittedVersionsSize() { - return cctx.tm().committedVersionsSize(); + return cctx.tm().completedVersionsSize(); } /** {@inheritDoc} */ @Override public int getTxRolledbackVersionsSize() { - return cctx.tm().rolledbackVersionsSize(); + return cctx.tm().completedVersionsSize(); } /** {@inheritDoc} */ @@ -219,12 +219,12 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxDhtCommittedVersionsSize() { - return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().committedVersionsSize() : -1; + return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().completedVersionsSize() : -1; } /** {@inheritDoc} */ @Override public int getTxDhtRolledbackVersionsSize() { - return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().rolledbackVersionsSize() : -1; + return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().completedVersionsSize() : -1; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebcbb95d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index da6ca72..52967ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -396,33 +396,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage } switch (commState.idx) { - case 8: - if (drVersByIdx != null) { - if (commState.it == null) { - if (!commState.putInt(drVersByIdx.length)) - return false; - - commState.it = arrayIterator(drVersByIdx); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - case 9: if (!commState.putGridUuid(futId)) return false; @@ -527,13 +500,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage return false; commState.idx++; - - case 23: - if (!commState.putByteArray(writeEntriesBytes)) - return false; - - commState.idx++; - } return true; @@ -548,35 +514,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage return false; switch (commState.idx) { - case 8: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (drVersByIdx == null) - drVersByIdx = new GridCacheVersion[commState.readSize]; - - for (int i = commState.readItems; i < commState.readSize; i++) { - GridCacheVersion _val = commState.getCacheVersion(); - - if (_val == CACHE_VER_NOT_READ) - return false; - - drVersByIdx[i] = (GridCacheVersion)_val; - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - case 9: IgniteUuid futId0 = commState.getGridUuid(); @@ -721,17 +658,6 @@ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage txSize = commState.getInt(); commState.idx++; - - case 23: - byte[] writeEntriesBytes0 = commState.getByteArray(); - - if (writeEntriesBytes0 == BYTE_ARR_NOT_READ) - return false; - - writeEntriesBytes = writeEntriesBytes0; - - commState.idx++; - } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebcbb95d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index 389cf30..6966987 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -195,20 +195,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes } /** - * @return Write entries. - */ - public Collection<IgniteTxEntry<K, V>> writes() { - return writeEntries; - } - - /** - * @return Recover entries. - */ - public Collection<IgniteTxEntry<K, V>> recoveryWrites() { - return recoveryWrites; - } - - /** * @return Expected tx size. */ public int txSize() { @@ -351,33 +337,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes commState.idx++; - case 16: - if (recoveryWritesBytes != null) { - if (commState.it == null) { - if (!commState.putInt(recoveryWritesBytes.size())) - return false; - - commState.it = recoveryWritesBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - case 17: if (!commState.putLong(threadId)) return false; @@ -390,33 +349,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes commState.idx++; - case 19: - if (writeEntriesBytes != null) { - if (commState.it == null) { - if (!commState.putInt(writeEntriesBytes.size())) - return false; - - commState.it = writeEntriesBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - case 20: if (!commState.putBoolean(sys)) return false; @@ -508,35 +440,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes commState.idx++; - case 16: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (recoveryWritesBytes == null) - recoveryWritesBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - recoveryWritesBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - case 17: if (buf.remaining() < 8) return false; @@ -553,35 +456,6 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes commState.idx++; - case 19: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (writeEntriesBytes == null) - writeEntriesBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - writeEntriesBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - case 20: if (buf.remaining() < 1) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebcbb95d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 4e06bc7..2642769 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -826,7 +826,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach entries, req.onePhaseCommit(), req.messageId(), - req.implicitTx(), req.txRead(), req.accessTtl()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebcbb95d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 1d311d4..323a2de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; @@ -346,39 +345,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 24: - if (nearWritesBytes != null) { - if (commState.it == null) { - if (!commState.putInt(nearWritesBytes.size())) - return false; - - commState.it = nearWritesBytes.iterator(); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - - case 25: - if (!commState.putBoolean(onePhaseCommit)) - return false; - - commState.idx++; - case 26: if (pendingVers != null) { if (commState.it == null) { @@ -492,43 +458,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 24: - if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; - - commState.readSize = commState.getInt(); - } - - if (commState.readSize >= 0) { - if (nearWritesBytes == null) - nearWritesBytes = new ArrayList<>(commState.readSize); - - for (int i = commState.readItems; i < commState.readSize; i++) { - byte[] _val = commState.getByteArray(); - - if (_val == BYTE_ARR_NOT_READ) - return false; - - nearWritesBytes.add((byte[])_val); - - commState.readItems++; - } - } - - commState.readSize = -1; - commState.readItems = 0; - - commState.idx++; - - case 25: - if (buf.remaining() < 1) - return false; - - onePhaseCommit = commState.getBoolean(); - - commState.idx++; - case 26: if (commState.readSize == -1) { if (buf.remaining() < 4) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebcbb95d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index e2bb7e7..0e616d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -539,7 +539,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } /** - * @throws GridException If failed to send response. + * @throws IgniteCheckedException If failed to send response. */ private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws IgniteCheckedException { if (!tx.nearNodeId().equals(cctx.localNodeId())) @@ -798,7 +798,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu tx.nearXidVersion(), true, tx.onePhaseCommit(), - lastBackup(n.id()), tx.subjectId(), tx.taskNameHash()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebcbb95d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index e6dc3e9..521c0e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -603,10 +603,9 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut tx, tx.optimistic() && tx.serializable() ? m.reads() : null, m.writes(), - /*grp lock key*/null, - /*part lock*/false, - tx.syncCommit(), - tx.syncRollback(), + /*grp lock key*/null, + /*part lock*/false, + m.near(), txMapping.transactionNodes(), true, txMapping.transactionNodes().get(node.id()), @@ -652,8 +651,15 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut IgniteTxEntry<K, V> dhtTxEntry = dhtTx.entry(key); - if (dhtTxEntry.op() == NOOP) - tx.entry(key).op(NOOP); + assert dhtTxEntry != null; + + if (dhtTxEntry.op() == NOOP) { + IgniteTxEntry<K, V> txEntry = tx.entry(key); + + assert txEntry != null; + + txEntry.op(NOOP); + } } tx.addDhtVersion(m.node().id(), dhtTx.xidVersion()); @@ -662,7 +668,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut GridCacheVersion min = dhtTx.minVersion(); - IgniteTxManager<K, V> tm = cctx.near().dht().context().tm(); + IgniteTxManager<K, V> tm = cctx.tm(); tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(), tm.committedVersions(min), tm.rolledbackVersions(min)); @@ -700,13 +706,17 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * One-phase commit can be done if transaction maps to one primary node and not more than one backup. */ private void checkOnePhase() { - if (cctx.isStoreEnabled()) + if (tx.storeUsed()) return; Map<UUID, Collection<UUID>> map = txMapping.transactionNodes(); if (map.size() == 1) { - Collection<UUID> backups = F.firstEntry(map).getValue(); + Map.Entry<UUID, Collection<UUID>> entry = F.firstEntry(map); + + assert entry != null; + + Collection<UUID> backups = entry.getValue(); if (backups.size() <= 1) tx.onePhaseCommit(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebcbb95d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 8cd1f12..4321f83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -78,7 +78,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes /** Filter failed keys. */ @GridDirectTransient - private Collection<K> filterFailedKeys; + private Collection<IgniteTxKey<K>> filterFailedKeys; /** Filter failed key bytes. */ private byte[] filterFailedKeyBytes; @@ -195,15 +195,15 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes /** * @param filterFailedKeys Collection of keys that did not pass the filter. */ - public void filterFailedKeys(Collection<K> filterFailedKeys) { + public void filterFailedKeys(Collection<IgniteTxKey<K>> filterFailedKeys) { this.filterFailedKeys = filterFailedKeys; } /** * @return Collection of keys that did not pass the filter. */ - public Collection<K> filterFailedKeys() { - return filterFailedKeys == null ? Collections.<K>emptyList() : filterFailedKeys; + public Collection<IgniteTxKey<K>> filterFailedKeys() { + return filterFailedKeys == null ? Collections.<IgniteTxKey<K>>emptyList() : filterFailedKeys; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ebcbb95d/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java index 9ef1b10..690582e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java @@ -110,12 +110,6 @@ public class GridTcpCommunicationMessageFactory { case 19: return new GridCacheOptimisticCheckPreparedTxResponse(); - case 20: - return new GridCachePessimisticCheckCommittedTxRequest(); - - case 21: - return new GridCachePessimisticCheckCommittedTxResponse(); - case 22: return new GridDistributedLockRequest(); @@ -279,7 +273,7 @@ public class GridTcpCommunicationMessageFactory { } } }, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, - 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, + 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, /* 65-72 - GGFS messages. */ 73, 74, 75, 76, 77, 78, 79, 80, 81, 82); @@ -290,14 +284,19 @@ public class GridTcpCommunicationMessageFactory { * @return New message. */ public static GridTcpCommunicationMessageAdapter create(byte type) { - if (type == TcpCommunicationSpi.NODE_ID_MSG_TYPE) - return new TcpCommunicationSpi.NodeIdMessage(); - else if (type == TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE) - return new TcpCommunicationSpi.RecoveryLastReceivedMessage(); - else if (type == TcpCommunicationSpi.HANDSHAKE_MSG_TYPE) - return new TcpCommunicationSpi.HandshakeMessage(); - else - return create0(type); + switch (type) { + case TcpCommunicationSpi.NODE_ID_MSG_TYPE: + return new TcpCommunicationSpi.NodeIdMessage(); + + case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE: + return new TcpCommunicationSpi.RecoveryLastReceivedMessage(); + + case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE: + return new TcpCommunicationSpi.HandshakeMessage(); + + default: + return create0(type); + } } /**