ignite-341 - fixed

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5066aead
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5066aead
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5066aead

Branch: refs/heads/ignite-443
Commit: 5066aeadbe942e66f59869370ba28143229a143a
Parents: d4236d0
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Thu Mar 26 19:06:06 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Thu Mar 26 19:06:06 2015 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2AbstractKeyValueRow.java | 97 +++++++++++---------
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  | 57 +++++++-----
 .../processors/query/h2/opt/GridH2Table.java    |  4 +-
 3 files changed, 89 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5066aead/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 2770a10..4a0809a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -92,6 +92,8 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
      * @throws IgniteSpiException If failed.
      */
     public static Value wrap(Object obj, int type) throws IgniteSpiException {
+        assert obj != null;
+
         switch (type) {
             case Value.BOOLEAN:
                 return ValueBoolean.get((Boolean)obj);
@@ -169,61 +171,74 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
     }
 
     /**
-     * @param val Value.
-     * @throws IgniteCheckedException
-     */
-    public synchronized void unswapBeforeRemove(Object val) throws 
IgniteCheckedException {
-        assert val != null;
-
-        Value oldVal = super.getValue(VAL_COL);
-
-        if (oldVal == null || oldVal instanceof WeakValue)
-            onUnswap(val);
-        // Else we would assert that val.equals(oldVal.getObject()) but value 
is not necessarily implements equals() correctly.
-    }
-
-    /**
      * Should be called when entry getting unswapped.
      *
      * @param val Value.
+     * @param beforeRmv If this is unswap before remove.
      * @throws IgniteCheckedException If failed.
      */
-    public synchronized void onUnswap(Object val) throws 
IgniteCheckedException {
+    public synchronized void onUnswap(Object val, boolean beforeRmv) throws 
IgniteCheckedException {
         setValue(VAL_COL, wrap(val, desc.valueType()));
+
+        notifyAll();
     }
 
     /**
      * Atomically updates weak value.
      *
-     * @param exp Expected value.
      * @param upd New value.
-     * @return Expected value if update succeeded, unexpected value otherwise.
+     * @return {@code null} If update succeeded, unexpected value otherwise.
      */
-    protected synchronized Value updateWeakValue(Value exp, Value upd) {
-        Value res = super.getValue(VAL_COL);
+    protected synchronized Value updateWeakValue(Value upd) {
+        Value res = peekValue(VAL_COL);
 
-        if (res != exp && !(res instanceof WeakValue))
+        if (res != null && !(res instanceof WeakValue))
             return res;
 
         setValue(VAL_COL, new WeakValue(upd));
 
-        return exp;
+        notifyAll();
+
+        return null;
     }
 
     /**
+     * @param attempt Attempt.
      * @return Synchronized value.
      */
-    protected synchronized Value syncValue() {
-        return super.getValue(VAL_COL);
+    protected synchronized Value syncValue(int attempt) {
+        Value v = peekValue(VAL_COL);
+
+        if (v == null && attempt != 0) {
+            try {
+                wait(attempt);
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedException(e);
+            }
+
+            v = peekValue(VAL_COL);
+        }
+
+        return v;
+    }
+
+    /**
+     * @param col Column index.
+     * @return Value if exists.
+     */
+    protected final Value peekValue(int col) {
+        return getValueList()[col];
     }
 
     /** {@inheritDoc} */
     @Override public Value getValue(int col) {
         if (col < DEFAULT_COLUMNS_COUNT) {
-            Value v = super.getValue(col);
+            Value v = peekValue(col);
 
             if (col == VAL_COL) {
-                int loops = 0;
+                long start = 0;
+                int attempt = 0;
 
                 while ((v = WeakValue.unwrap(v)) == null) {
                     v = getOffheapValue(VAL_COL);
@@ -231,7 +246,7 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
                     if (v != null) {
                         setValue(VAL_COL, v);
 
-                        if (super.getValue(KEY_COL) == null)
+                        if (peekValue(KEY_COL) == null)
                             cache();
 
                         return v;
@@ -245,28 +260,26 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
                         if (valObj != null) {
                             Value upd = wrap(valObj, desc.valueType());
 
-                            Value res = updateWeakValue(null, upd);
-
-                            if (res == null) {
-                                if (super.getValue(KEY_COL) == null)
-                                    cache();
-
-                                return upd;
-                            }
+                            v = updateWeakValue(upd);
 
-                            v = res;
+                            return v == null ? upd : v;
                         }
                         else {
                             // If nothing found in swap then we should be 
already unswapped.
-                            v = syncValue();
+                            v = syncValue(attempt);
                         }
-
-                        if (++loops == 1000_000)
-                            throw new IllegalStateException("Failed to get 
value for key: " + k);
                     }
                     catch (IgniteCheckedException e) {
                         throw new IgniteException(e);
                     }
+
+                    attempt++;
+
+                    if (start == 0)
+                        start = U.currentTimeMillis();
+                    else if (U.currentTimeMillis() - start > 15_000) // Loop 
for at most 15 seconds.
+                        throw new IgniteException("Failed to get value for 
key: " + k +
+                            ". This can happen due to a long GC pause.");
                 }
             }
 
@@ -279,7 +292,7 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
 
                 setValue(KEY_COL, v);
 
-                if (super.getValue(VAL_COL) == null)
+                if (peekValue(VAL_COL) == null)
                     cache();
             }
 
@@ -337,10 +350,10 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
 
         addOffheapRowId(sb);
 
-        Value v = super.getValue(KEY_COL);
+        Value v = peekValue(KEY_COL);
         sb.a("[ key: ").a(v == null ? "nil" : v.getString());
 
-        v = WeakValue.unwrap(super.getValue(VAL_COL));
+        v = WeakValue.unwrap(peekValue(VAL_COL));
         sb.a(", val: ").a(v == null ? "nil" : v.getString());
 
         sb.a(" ][ ");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5066aead/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index 9c1d90a..9c2c1b2 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -120,6 +120,7 @@ public class GridH2KeyValueRowOffheap extends 
GridH2AbstractKeyValueRow {
      */
     @SuppressWarnings("LockAcquiredButNotSafelyReleased")
     private static Lock lock(long ptr) {
+        assert ptr > 0 : ptr;
         assert (ptr & 7) == 0 : ptr; // Unsafe allocated pointers aligned.
 
         Lock l = lock.getLock(ptr >>> 3);
@@ -198,13 +199,14 @@ public class GridH2KeyValueRowOffheap extends 
GridH2AbstractKeyValueRow {
 
             final long valPtr = mem.readLongVolatile(p);
 
-            if (valPtr == 0)
-                return; // Nothing to swap.
+            if (valPtr <= 0)
+                throw new IllegalStateException("Already swapped: " + ptr);
+
+            if (!mem.casLong(p, valPtr, 0))
+                throw new IllegalStateException("Concurrent unswap: " + ptr);
 
             desc.guard().finalizeLater(new Runnable() {
                 @Override public void run() {
-                    mem.casLong(p, valPtr, 0); // If it was unswapped 
concurrently we will not update.
-
                     mem.release(valPtr, mem.readInt(valPtr) + OFFSET_VALUE);
                 }
             });
@@ -215,37 +217,40 @@ public class GridH2KeyValueRowOffheap extends 
GridH2AbstractKeyValueRow {
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized void unswapBeforeRemove(Object val) throws 
IgniteCheckedException {
-        assert val != null;
-
-        onUnswap(val);
-    }
-
-    /** {@inheritDoc} */
     @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
-    @Override protected Value updateWeakValue(Value exp, Value upd) {
+    @Override protected synchronized Value updateWeakValue(Value upd) {
         setValue(VAL_COL, upd);
 
-        return exp;
+        notifyAll();
+
+        return null;
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized void onUnswap(Object val) throws 
IgniteCheckedException {
-        super.onUnswap(val);
+    @Override public synchronized void onUnswap(Object val, boolean beforeRmv) 
throws IgniteCheckedException {
+        assert val != null;
 
-        long p = ptr;
-
-        assert p > 0 : p;
+        final long p = ptr;
 
         Lock l = lock(p);
 
         try {
             GridUnsafeMemory mem = desc.memory();
 
-            if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0)
-                return; // The offheap value is in its place, nothing to do 
here.
+            if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0) {
+                if (beforeRmv)
+                    return; // The offheap value is in its place, nothing to 
do here.
+                else
+                    throw new IllegalStateException("Unswap without swap: " + 
p);
+            }
 
-            Value v = getValue(VAL_COL); // We just set the value above, so it 
will be returned right away.
+            Value v = peekValue(VAL_COL);
+
+            if (v == null) {
+                setValue(VAL_COL, wrap(val, desc.valueType()));
+
+                v = peekValue(VAL_COL);
+            }
 
             byte[] bytes = new byte[SIZE_CALCULATOR.getValueLen(v)];
 
@@ -263,11 +268,13 @@ public class GridH2KeyValueRowOffheap extends 
GridH2AbstractKeyValueRow {
         finally {
             l.unlock();
         }
+
+        notifyAll();
     }
 
     /** {@inheritDoc} */
-    @Override protected synchronized Value syncValue() {
-        Value v = super.syncValue();
+    @Override protected Value syncValue(int attempt) {
+        Value v = super.syncValue(attempt);
 
         if (v != null)
             return v;
@@ -283,8 +290,8 @@ public class GridH2KeyValueRowOffheap extends 
GridH2AbstractKeyValueRow {
         GridUnsafeMemory mem = desc.memory();
 
         if (p == 0) { // Serialize data to offheap memory.
-            Value key = getValue(KEY_COL);
-            Value val = getValue(VAL_COL);
+            Value key = peekValue(KEY_COL);
+            Value val = peekValue(VAL_COL);
 
             assert key != null;
             assert val != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5066aead/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index e686c5e..62b3b5e 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -157,7 +157,7 @@ public class GridH2Table extends TableBase {
             if (val == null)
                 row.onSwap();
             else
-                row.onUnswap(val);
+                row.onUnswap(val, false);
 
             return true;
         }
@@ -382,7 +382,7 @@ public class GridH2Table extends TableBase {
                     Value v = row.getValue(GridH2AbstractKeyValueRow.VAL_COL);
 
                     if (v != null)
-                        
((GridH2AbstractKeyValueRow)old).unswapBeforeRemove(v.getObject());
+                        
((GridH2AbstractKeyValueRow)old).onUnswap(v.getObject(), true);
                 }
 
                 if (old != null) {

Reply via email to