Repository: incubator-ignite Updated Branches: refs/heads/master ebb5d4ad9 -> 0399ccd83
IGNITE-1265 - Properly handle invalid partitions in DHT prepare response. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7a43dde7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7a43dde7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7a43dde7 Branch: refs/heads/master Commit: 7a43dde77b47478e6b02bbab9d81ad70a2299c51 Parents: 5faffb9 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Aug 18 10:35:59 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Aug 18 10:35:59 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 1 - .../processors/cache/GridCacheUtils.java | 20 +++++++++++++++++ .../distributed/dht/GridDhtTxPrepareFuture.java | 23 +++++++++++++++++++- .../dht/GridDhtTxPrepareResponse.java | 17 +++++++++++++++ .../near/GridNearTxPrepareResponse.java | 3 --- .../cache/transactions/IgniteInternalTx.java | 2 +- .../cache/transactions/IgniteTxAdapter.java | 19 +++++++++++----- .../cache/transactions/IgniteTxHandler.java | 5 ++--- .../ignite/internal/util/lang/GridFunc.java | 14 ++++++++++++ 9 files changed, 90 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 29e3551..c128aa6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -493,7 +493,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { req.version(), null, null, - null, null); res.error(req.classError()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 41e3896..1e3cd67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1681,6 +1681,26 @@ public class GridCacheUtils { } /** + * @param partsMap Cache ID to partition IDs collection map. + * @return Cache ID to partition ID array map. + */ + public static Map<Integer, Integer[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) { + Map<Integer, Integer[]> res = new HashMap<>(partsMap.size()); + + for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) { + Set<Integer> parts = entry.getValue(); + + Integer[] partsArray = new Integer[parts.size()]; + + partsArray = parts.toArray(partsArray); + + res.put(entry.getKey(), partsArray); + } + + return res; + } + + /** * Stops store session listeners. * * @param ctx Kernal context. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index af0fbdf..27de8cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -574,13 +574,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter // Send reply back to originating near node. Throwable prepErr = err.get(); + assert F.isEmpty(tx.invalidPartitions()); + GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( tx.nearXidVersion(), tx.colocated() ? tx.xid() : tx.nearFutureId(), nearMiniId == null ? tx.xid() : nearMiniId, tx.xidVersion(), tx.writeVersion(), - tx.invalidPartitions(), ret, prepErr, null); @@ -1194,6 +1195,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } // Process invalid partitions (no need to remap). + // Keep this loop for backward compatibility. if (!F.isEmpty(res.invalidPartitions())) { for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) { IgniteTxEntry entry = it.next(); @@ -1206,6 +1208,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); } } + } + + // Process invalid partitions (no need to remap). + if (!F.isEmpty(res.invalidPartitionsByCacheId())) { + Map<Integer, Integer[]> invalidPartsMap = res.invalidPartitionsByCacheId(); + + for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) { + IgniteTxEntry entry = it.next(); + + Integer[] invalidParts = invalidPartsMap.get(entry.cacheId()); + + if (F.contains(invalidParts, entry.cached().partition())) { + it.remove(); + + if (log.isDebugEnabled()) + log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() + + ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); + } + } if (dhtMapping.empty()) { dhtMap.remove(nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java index 753c117..cc85628 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java @@ -55,6 +55,9 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { @GridDirectCollection(int.class) private Collection<Integer> invalidParts; + /** Invalid partitions by cache ID. */ + private Map<Integer, Integer[]> invalidPartsByCacheId; + /** Preload entries. */ @GridDirectCollection(GridCacheEntryInfo.class) private List<GridCacheEntryInfo> preloadEntries; @@ -140,6 +143,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { } /** + * @return Map from cacheId to an array of invalid partitions. + */ + public Map<Integer, Integer[]> invalidPartitionsByCacheId() { + return invalidPartsByCacheId; + } + + /** + * @param invalidPartsByCacheId Map from cache ID to an array of invalid partitions. + */ + public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> invalidPartsByCacheId) { + this.invalidPartsByCacheId = CU.convertInvalidPartitions(invalidPartsByCacheId); + } + + /** * Gets preload entries found on backup node. * * @return Collection of entry infos need to be preloaded. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index b418500..b24c62d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -101,7 +101,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse * @param miniId Mini future ID. * @param dhtVer DHT version. * @param writeVer Write version. - * @param invalidParts Invalid partitions. * @param retVal Return value. * @param err Error. * @param clientRemapVer Not {@code null} if client node should remap transaction. @@ -112,7 +111,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse IgniteUuid miniId, GridCacheVersion dhtVer, GridCacheVersion writeVer, - Collection<Integer> invalidParts, GridCacheReturn retVal, Throwable err, AffinityTopologyVersion clientRemapVer @@ -127,7 +125,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse this.miniId = miniId; this.dhtVer = dhtVer; this.writeVer = writeVer; - this.invalidParts = invalidParts; this.retVal = retVal; this.clientRemapVer = clientRemapVer; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index b16e950..f2f20dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -302,7 +302,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { /** * @return Invalid partitions. */ - public Set<Integer> invalidPartitions(); + public Map<Integer, Set<Integer>> invalidPartitions(); /** * Gets owned version for near remote transaction. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 709c208..4fc6f0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -162,7 +162,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter private AtomicBoolean preparing = new AtomicBoolean(); /** */ - private Set<Integer> invalidParts = new GridLeanSet<>(); + private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3); /** * Transaction state. Note that state is not protected, as we want to @@ -671,16 +671,25 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public Set<Integer> invalidPartitions() { + @Override public Map<Integer, Set<Integer>> invalidPartitions() { return invalidParts; } /** {@inheritDoc} */ @Override public void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int part) { - invalidParts.add(part); + Set<Integer> parts = invalidParts.get(cacheCtx.cacheId()); + + if (parts == null) { + parts = new GridLeanSet<>(); + + invalidParts.put(cacheCtx.cacheId(), parts); + } + + parts.add(part); if (log.isDebugEnabled()) - log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']'); + log.debug("Added invalid partition for transaction [cache=" + cacheCtx.name() + ", part=" + part + + ", tx=" + this + ']'); } /** {@inheritDoc} */ @@ -1765,7 +1774,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public Set<Integer> invalidPartitions() { + @Override public Map<Integer, Set<Integer>> invalidPartitions() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index e481e25..227cb34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -290,7 +290,6 @@ public class IgniteTxHandler { req.version(), null, null, - null, top.topologyVersion()); try { @@ -803,7 +802,7 @@ public class IgniteTxHandler { res.nearEvicted(nearTx.evicted()); if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions())) - res.invalidPartitions(dhtTx.invalidPartitions()); + res.invalidPartitionsByCacheId(dhtTx.invalidPartitions()); if (req.onePhaseCommit()) { assert req.last(); @@ -1154,7 +1153,7 @@ public class IgniteTxHandler { if (req.last()) tx.state(PREPARED); - res.invalidPartitions(tx.invalidPartitions()); + res.invalidPartitionsByCacheId(tx.invalidPartitions()); if (tx.empty() && req.last()) { tx.rollback(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 6f544e0..8a354ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -4083,6 +4083,20 @@ public class GridFunc { * @param val Value to find. * @return {@code True} if array contains given value. */ + public static boolean contains(Integer[] arr, int val) { + for (Integer el : arr) { + if (el == val) + return true; + } + + return false; + } + + /** + * @param arr Array. + * @param val Value to find. + * @return {@code True} if array contains given value. + */ @SuppressWarnings("ForLoopReplaceableByForEach") public static boolean contains(long[] arr, long val) { for (int i = 0; i < arr.length; i++) {