Repository: incubator-ignite
Updated Branches:
  refs/heads/master ebb5d4ad9 -> 0399ccd83


IGNITE-1265 - Properly handle invalid partitions in DHT prepare response.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7a43dde7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7a43dde7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7a43dde7

Branch: refs/heads/master
Commit: 7a43dde77b47478e6b02bbab9d81ad70a2299c51
Parents: 5faffb9
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Tue Aug 18 10:35:59 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Tue Aug 18 10:35:59 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |  1 -
 .../processors/cache/GridCacheUtils.java        | 20 +++++++++++++++++
 .../distributed/dht/GridDhtTxPrepareFuture.java | 23 +++++++++++++++++++-
 .../dht/GridDhtTxPrepareResponse.java           | 17 +++++++++++++++
 .../near/GridNearTxPrepareResponse.java         |  3 ---
 .../cache/transactions/IgniteInternalTx.java    |  2 +-
 .../cache/transactions/IgniteTxAdapter.java     | 19 +++++++++++-----
 .../cache/transactions/IgniteTxHandler.java     |  5 ++---
 .../ignite/internal/util/lang/GridFunc.java     | 14 ++++++++++++
 9 files changed, 90 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 29e3551..c128aa6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -493,7 +493,6 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
                     req.version(),
                     null,
                     null,
-                    null,
                     null);
 
                 res.error(req.classError());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 41e3896..1e3cd67 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1681,6 +1681,26 @@ public class GridCacheUtils {
     }
 
     /**
+     * @param partsMap Cache ID to partition IDs collection map.
+     * @return Cache ID to partition ID array map.
+     */
+    public static Map<Integer, Integer[]> 
convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
+        Map<Integer, Integer[]> res = new HashMap<>(partsMap.size());
+
+        for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) {
+            Set<Integer> parts = entry.getValue();
+
+            Integer[] partsArray = new Integer[parts.size()];
+
+            partsArray = parts.toArray(partsArray);
+
+            res.put(entry.getKey(), partsArray);
+        }
+
+        return res;
+    }
+
+    /**
      * Stops store session listeners.
      *
      * @param ctx Kernal context.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index af0fbdf..27de8cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -574,13 +574,14 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
         // Send reply back to originating near node.
         Throwable prepErr = err.get();
 
+        assert F.isEmpty(tx.invalidPartitions());
+
         GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
             tx.nearXidVersion(),
             tx.colocated() ? tx.xid() : tx.nearFutureId(),
             nearMiniId == null ? tx.xid() : nearMiniId,
             tx.xidVersion(),
             tx.writeVersion(),
-            tx.invalidPartitions(),
             ret,
             prepErr,
             null);
@@ -1194,6 +1195,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
                 }
 
                 // Process invalid partitions (no need to remap).
+                // Keep this loop for backward compatibility.
                 if (!F.isEmpty(res.invalidPartitions())) {
                     for (Iterator<IgniteTxEntry> it = 
dhtMapping.entries().iterator(); it.hasNext();) {
                         IgniteTxEntry entry  = it.next();
@@ -1206,6 +1208,25 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
                                     ", tx=" + tx + ", dhtMapping=" + 
dhtMapping + ']');
                         }
                     }
+                }
+
+                // Process invalid partitions (no need to remap).
+                if (!F.isEmpty(res.invalidPartitionsByCacheId())) {
+                    Map<Integer, Integer[]> invalidPartsMap = 
res.invalidPartitionsByCacheId();
+
+                    for (Iterator<IgniteTxEntry> it = 
dhtMapping.entries().iterator(); it.hasNext();) {
+                        IgniteTxEntry entry  = it.next();
+
+                        Integer[] invalidParts = 
invalidPartsMap.get(entry.cacheId());
+
+                        if (F.contains(invalidParts, 
entry.cached().partition())) {
+                            it.remove();
+
+                            if (log.isDebugEnabled())
+                                log.debug("Removed mapping for entry from dht 
mapping [key=" + entry.key() +
+                                    ", tx=" + tx + ", dhtMapping=" + 
dhtMapping + ']');
+                        }
+                    }
 
                     if (dhtMapping.empty()) {
                         dhtMap.remove(nodeId);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 753c117..cc85628 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -55,6 +55,9 @@ public class GridDhtTxPrepareResponse extends 
GridDistributedTxPrepareResponse {
     @GridDirectCollection(int.class)
     private Collection<Integer> invalidParts;
 
+    /** Invalid partitions by cache ID. */
+    private Map<Integer, Integer[]> invalidPartsByCacheId;
+
     /** Preload entries. */
     @GridDirectCollection(GridCacheEntryInfo.class)
     private List<GridCacheEntryInfo> preloadEntries;
@@ -140,6 +143,20 @@ public class GridDhtTxPrepareResponse extends 
GridDistributedTxPrepareResponse {
     }
 
     /**
+     * @return Map from cacheId to an array of invalid partitions.
+     */
+    public Map<Integer, Integer[]> invalidPartitionsByCacheId() {
+        return invalidPartsByCacheId;
+    }
+
+    /**
+     * @param invalidPartsByCacheId Map from cache ID to an array of invalid 
partitions.
+     */
+    public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> 
invalidPartsByCacheId) {
+        this.invalidPartsByCacheId = 
CU.convertInvalidPartitions(invalidPartsByCacheId);
+    }
+
+    /**
      * Gets preload entries found on backup node.
      *
      * @return Collection of entry infos need to be preloaded.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index b418500..b24c62d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -101,7 +101,6 @@ public class GridNearTxPrepareResponse extends 
GridDistributedTxPrepareResponse
      * @param miniId Mini future ID.
      * @param dhtVer DHT version.
      * @param writeVer Write version.
-     * @param invalidParts Invalid partitions.
      * @param retVal Return value.
      * @param err Error.
      * @param clientRemapVer Not {@code null} if client node should remap 
transaction.
@@ -112,7 +111,6 @@ public class GridNearTxPrepareResponse extends 
GridDistributedTxPrepareResponse
         IgniteUuid miniId,
         GridCacheVersion dhtVer,
         GridCacheVersion writeVer,
-        Collection<Integer> invalidParts,
         GridCacheReturn retVal,
         Throwable err,
         AffinityTopologyVersion clientRemapVer
@@ -127,7 +125,6 @@ public class GridNearTxPrepareResponse extends 
GridDistributedTxPrepareResponse
         this.miniId = miniId;
         this.dhtVer = dhtVer;
         this.writeVer = writeVer;
-        this.invalidParts = invalidParts;
         this.retVal = retVal;
         this.clientRemapVer = clientRemapVer;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index b16e950..f2f20dd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -302,7 +302,7 @@ public interface IgniteInternalTx extends AutoCloseable, 
GridTimeoutObject {
     /**
      * @return Invalid partitions.
      */
-    public Set<Integer> invalidPartitions();
+    public Map<Integer, Set<Integer>> invalidPartitions();
 
     /**
      * Gets owned version for near remote transaction.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 709c208..4fc6f0c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -162,7 +162,7 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
     private AtomicBoolean preparing = new AtomicBoolean();
 
     /** */
-    private Set<Integer> invalidParts = new GridLeanSet<>();
+    private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3);
 
     /**
      * Transaction state. Note that state is not protected, as we want to
@@ -671,16 +671,25 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public Set<Integer> invalidPartitions() {
+    @Override public Map<Integer, Set<Integer>> invalidPartitions() {
         return invalidParts;
     }
 
     /** {@inheritDoc} */
     @Override public void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, 
int part) {
-        invalidParts.add(part);
+        Set<Integer> parts = invalidParts.get(cacheCtx.cacheId());
+
+        if (parts == null) {
+            parts = new GridLeanSet<>();
+
+            invalidParts.put(cacheCtx.cacheId(), parts);
+        }
+
+        parts.add(part);
 
         if (log.isDebugEnabled())
-            log.debug("Added invalid partition for transaction [part=" + part 
+ ", tx=" + this + ']');
+            log.debug("Added invalid partition for transaction [cache=" + 
cacheCtx.name() + ", part=" + part +
+                ", tx=" + this + ']');
     }
 
     /** {@inheritDoc} */
@@ -1765,7 +1774,7 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
-        @Override public Set<Integer> invalidPartitions() {
+        @Override public Map<Integer, Set<Integer>> invalidPartitions() {
             throw new IllegalStateException("Deserialized transaction can only 
be used as read-only.");
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e481e25..227cb34 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -290,7 +290,6 @@ public class IgniteTxHandler {
                         req.version(),
                         null,
                         null,
-                        null,
                         top.topologyVersion());
 
                     try {
@@ -803,7 +802,7 @@ public class IgniteTxHandler {
                 res.nearEvicted(nearTx.evicted());
 
             if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
-                res.invalidPartitions(dhtTx.invalidPartitions());
+                res.invalidPartitionsByCacheId(dhtTx.invalidPartitions());
 
             if (req.onePhaseCommit()) {
                 assert req.last();
@@ -1154,7 +1153,7 @@ public class IgniteTxHandler {
             if (req.last())
                 tx.state(PREPARED);
 
-            res.invalidPartitions(tx.invalidPartitions());
+            res.invalidPartitionsByCacheId(tx.invalidPartitions());
 
             if (tx.empty() && req.last()) {
                 tx.rollback();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 6f544e0..8a354ad 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -4083,6 +4083,20 @@ public class GridFunc {
      * @param val Value to find.
      * @return {@code True} if array contains given value.
      */
+    public static boolean contains(Integer[] arr, int val) {
+        for (Integer el : arr) {
+            if (el == val)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param arr Array.
+     * @param val Value to find.
+     * @return {@code True} if array contains given value.
+     */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     public static boolean contains(long[] arr, long val) {
         for (int i = 0; i < arr.length; i++) {

Reply via email to