ignite-341 - fixed
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5066aead Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5066aead Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5066aead Branch: refs/heads/ignite-443 Commit: 5066aeadbe942e66f59869370ba28143229a143a Parents: d4236d0 Author: S.Vladykin <svlady...@gridgain.com> Authored: Thu Mar 26 19:06:06 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Thu Mar 26 19:06:06 2015 +0300 ---------------------------------------------------------------------- .../query/h2/opt/GridH2AbstractKeyValueRow.java | 97 +++++++++++--------- .../query/h2/opt/GridH2KeyValueRowOffheap.java | 57 +++++++----- .../processors/query/h2/opt/GridH2Table.java | 4 +- 3 files changed, 89 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5066aead/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java index 2770a10..4a0809a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java @@ -92,6 +92,8 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { * @throws IgniteSpiException If failed. */ public static Value wrap(Object obj, int type) throws IgniteSpiException { + assert obj != null; + switch (type) { case Value.BOOLEAN: return ValueBoolean.get((Boolean)obj); @@ -169,61 +171,74 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { } /** - * @param val Value. - * @throws IgniteCheckedException - */ - public synchronized void unswapBeforeRemove(Object val) throws IgniteCheckedException { - assert val != null; - - Value oldVal = super.getValue(VAL_COL); - - if (oldVal == null || oldVal instanceof WeakValue) - onUnswap(val); - // Else we would assert that val.equals(oldVal.getObject()) but value is not necessarily implements equals() correctly. - } - - /** * Should be called when entry getting unswapped. * * @param val Value. + * @param beforeRmv If this is unswap before remove. * @throws IgniteCheckedException If failed. */ - public synchronized void onUnswap(Object val) throws IgniteCheckedException { + public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException { setValue(VAL_COL, wrap(val, desc.valueType())); + + notifyAll(); } /** * Atomically updates weak value. * - * @param exp Expected value. * @param upd New value. - * @return Expected value if update succeeded, unexpected value otherwise. + * @return {@code null} If update succeeded, unexpected value otherwise. */ - protected synchronized Value updateWeakValue(Value exp, Value upd) { - Value res = super.getValue(VAL_COL); + protected synchronized Value updateWeakValue(Value upd) { + Value res = peekValue(VAL_COL); - if (res != exp && !(res instanceof WeakValue)) + if (res != null && !(res instanceof WeakValue)) return res; setValue(VAL_COL, new WeakValue(upd)); - return exp; + notifyAll(); + + return null; } /** + * @param attempt Attempt. * @return Synchronized value. */ - protected synchronized Value syncValue() { - return super.getValue(VAL_COL); + protected synchronized Value syncValue(int attempt) { + Value v = peekValue(VAL_COL); + + if (v == null && attempt != 0) { + try { + wait(attempt); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + + v = peekValue(VAL_COL); + } + + return v; + } + + /** + * @param col Column index. + * @return Value if exists. + */ + protected final Value peekValue(int col) { + return getValueList()[col]; } /** {@inheritDoc} */ @Override public Value getValue(int col) { if (col < DEFAULT_COLUMNS_COUNT) { - Value v = super.getValue(col); + Value v = peekValue(col); if (col == VAL_COL) { - int loops = 0; + long start = 0; + int attempt = 0; while ((v = WeakValue.unwrap(v)) == null) { v = getOffheapValue(VAL_COL); @@ -231,7 +246,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { if (v != null) { setValue(VAL_COL, v); - if (super.getValue(KEY_COL) == null) + if (peekValue(KEY_COL) == null) cache(); return v; @@ -245,28 +260,26 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { if (valObj != null) { Value upd = wrap(valObj, desc.valueType()); - Value res = updateWeakValue(null, upd); - - if (res == null) { - if (super.getValue(KEY_COL) == null) - cache(); - - return upd; - } + v = updateWeakValue(upd); - v = res; + return v == null ? upd : v; } else { // If nothing found in swap then we should be already unswapped. - v = syncValue(); + v = syncValue(attempt); } - - if (++loops == 1000_000) - throw new IllegalStateException("Failed to get value for key: " + k); } catch (IgniteCheckedException e) { throw new IgniteException(e); } + + attempt++; + + if (start == 0) + start = U.currentTimeMillis(); + else if (U.currentTimeMillis() - start > 15_000) // Loop for at most 15 seconds. + throw new IgniteException("Failed to get value for key: " + k + + ". This can happen due to a long GC pause."); } } @@ -279,7 +292,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { setValue(KEY_COL, v); - if (super.getValue(VAL_COL) == null) + if (peekValue(VAL_COL) == null) cache(); } @@ -337,10 +350,10 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { addOffheapRowId(sb); - Value v = super.getValue(KEY_COL); + Value v = peekValue(KEY_COL); sb.a("[ key: ").a(v == null ? "nil" : v.getString()); - v = WeakValue.unwrap(super.getValue(VAL_COL)); + v = WeakValue.unwrap(peekValue(VAL_COL)); sb.a(", val: ").a(v == null ? "nil" : v.getString()); sb.a(" ][ "); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5066aead/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java index 9c1d90a..9c2c1b2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java @@ -120,6 +120,7 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { */ @SuppressWarnings("LockAcquiredButNotSafelyReleased") private static Lock lock(long ptr) { + assert ptr > 0 : ptr; assert (ptr & 7) == 0 : ptr; // Unsafe allocated pointers aligned. Lock l = lock.getLock(ptr >>> 3); @@ -198,13 +199,14 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { final long valPtr = mem.readLongVolatile(p); - if (valPtr == 0) - return; // Nothing to swap. + if (valPtr <= 0) + throw new IllegalStateException("Already swapped: " + ptr); + + if (!mem.casLong(p, valPtr, 0)) + throw new IllegalStateException("Concurrent unswap: " + ptr); desc.guard().finalizeLater(new Runnable() { @Override public void run() { - mem.casLong(p, valPtr, 0); // If it was unswapped concurrently we will not update. - mem.release(valPtr, mem.readInt(valPtr) + OFFSET_VALUE); } }); @@ -215,37 +217,40 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { } /** {@inheritDoc} */ - @Override public synchronized void unswapBeforeRemove(Object val) throws IgniteCheckedException { - assert val != null; - - onUnswap(val); - } - - /** {@inheritDoc} */ @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod") - @Override protected Value updateWeakValue(Value exp, Value upd) { + @Override protected synchronized Value updateWeakValue(Value upd) { setValue(VAL_COL, upd); - return exp; + notifyAll(); + + return null; } /** {@inheritDoc} */ - @Override public synchronized void onUnswap(Object val) throws IgniteCheckedException { - super.onUnswap(val); + @Override public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException { + assert val != null; - long p = ptr; - - assert p > 0 : p; + final long p = ptr; Lock l = lock(p); try { GridUnsafeMemory mem = desc.memory(); - if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0) - return; // The offheap value is in its place, nothing to do here. + if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0) { + if (beforeRmv) + return; // The offheap value is in its place, nothing to do here. + else + throw new IllegalStateException("Unswap without swap: " + p); + } - Value v = getValue(VAL_COL); // We just set the value above, so it will be returned right away. + Value v = peekValue(VAL_COL); + + if (v == null) { + setValue(VAL_COL, wrap(val, desc.valueType())); + + v = peekValue(VAL_COL); + } byte[] bytes = new byte[SIZE_CALCULATOR.getValueLen(v)]; @@ -263,11 +268,13 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { finally { l.unlock(); } + + notifyAll(); } /** {@inheritDoc} */ - @Override protected synchronized Value syncValue() { - Value v = super.syncValue(); + @Override protected Value syncValue(int attempt) { + Value v = super.syncValue(attempt); if (v != null) return v; @@ -283,8 +290,8 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { GridUnsafeMemory mem = desc.memory(); if (p == 0) { // Serialize data to offheap memory. - Value key = getValue(KEY_COL); - Value val = getValue(VAL_COL); + Value key = peekValue(KEY_COL); + Value val = peekValue(VAL_COL); assert key != null; assert val != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5066aead/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index e686c5e..62b3b5e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -157,7 +157,7 @@ public class GridH2Table extends TableBase { if (val == null) row.onSwap(); else - row.onUnswap(val); + row.onUnswap(val, false); return true; } @@ -382,7 +382,7 @@ public class GridH2Table extends TableBase { Value v = row.getValue(GridH2AbstractKeyValueRow.VAL_COL); if (v != null) - ((GridH2AbstractKeyValueRow)old).unswapBeforeRemove(v.getObject()); + ((GridH2AbstractKeyValueRow)old).onUnswap(v.getObject(), true); } if (old != null) {