IGNITE-59 Support lock, lockAll. Fix 'interrupt' problem.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0379c59f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0379c59f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0379c59f Branch: refs/heads/ignite-35 Commit: 0379c59f7bafb27b8722aa17cbf64aca36cbfd74 Parents: 9d4476d Author: sevdokimov <sevdoki...@gridgain.com> Authored: Mon Jan 19 16:20:08 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Mon Jan 19 16:20:08 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 55 ++++++++------ .../processors/cache/GridCacheAdapter.java | 20 ++++- .../cache/GridCacheBasicApiAbstractTest.java | 79 ++++++++++++++++++++ 3 files changed, 129 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0379c59f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index bd5d6ee..64e86dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -295,30 +295,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements try { delegate.lockAll(keys, 0); } - catch (GridInterruptedException ignored) { - - } catch (IgniteCheckedException e) { throw new CacheException(e.getMessage(), e); } } @Override public void lockInterruptibly() throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - - try { - delegate.lockAll(keys, 0); - } - catch (GridInterruptedException e) { - if (e.getCause() instanceof InterruptedException) - throw (InterruptedException)e.getCause(); - - throw new InterruptedException(); - } - catch (IgniteCheckedException e) { - throw new CacheException(e.getMessage(), e); - } + tryLock(-1, TimeUnit.MILLISECONDS); } @Override public boolean tryLock() { @@ -331,14 +314,38 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + try { - return delegate.lockAll(keys, unit.toMillis(time)); - } - catch (GridInterruptedException e) { - if (e.getCause() instanceof InterruptedException) - throw (InterruptedException)e.getCause(); + IgniteFuture<Boolean> fut = null; - throw new InterruptedException(); + try { + if (time <= 0) + return delegate.lockAll(keys, -1); + + fut = delegate.lockAllAsync(keys, time <= 0 ? -1 : unit.toMillis(time)); + + return fut.get(); + } + catch (GridInterruptedException e) { + if (fut != null) { + if (!fut.cancel()) { + if (fut.isDone()) { + Boolean res = fut.get(); + + Thread.currentThread().interrupt(); + + return res; + } + } + } + + if (e.getCause() instanceof InterruptedException) + throw (InterruptedException)e.getCause(); + + throw new InterruptedException(); + } } catch (IgniteCheckedException e) { throw new CacheException(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0379c59f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 1a31edd..f6748fe 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -3208,7 +3208,25 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im if (keyCheck) validateCacheKeys(keys); - return lockAllAsync(keys, timeout, filter).get(); + IgniteFuture<Boolean> fut = lockAllAsync(keys, timeout, filter); + + boolean isInterrupted = false; + + try { + while (true) { + try { + return fut.get(); + } + catch (GridInterruptedException ignored) { + // Interrupted status of current thread was cleared, retry to get lock. + isInterrupted = true; + } + } + } + finally { + if (isInterrupted) + Thread.currentThread().interrupt(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0379c59f/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java index 62f64ca..0910d42 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java @@ -33,6 +33,7 @@ import org.jetbrains.annotations.*; import javax.cache.expiry.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import static java.util.concurrent.TimeUnit.*; @@ -162,6 +163,84 @@ public abstract class GridCacheBasicApiAbstractTest extends GridCommonAbstractTe } /** + * + */ + public void testInterruptLock() throws InterruptedException { + final IgniteCache<Integer, String> cache = ignite.jcache(null); + +// cache.put(1, "a"); + + cache.lock(1).lock(); + + final AtomicBoolean isOk = new AtomicBoolean(false); + + Thread t = new Thread(new Runnable() { + @Override public void run() { + cache.lock(1).lock(); + + try { + assertTrue(cache.lock(1).isLockedByThread()); + } + finally { + cache.lock(1).unlock(); + } + + assertTrue(Thread.currentThread().isInterrupted()); + + isOk.set(true); + } + }); + + t.start(); + + t.interrupt(); + + cache.lock(1).unlock(); + + t.join(); + + assertTrue(isOk.get()); + } + + /** + * + */ + public void _testInterruptLockWithTimeout() throws InterruptedException { + final IgniteCache<Integer, String> cache = ignite.jcache(null); + +// cache.put(1, "a"); +// cache.put(2, "b"); + + cache.lock(2).lock(); + + final AtomicBoolean isOk = new AtomicBoolean(false); + + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + cache.lockAll(Arrays.asList(1, 2)).tryLock(5000, MILLISECONDS); + } + catch (InterruptedException ignored) { + isOk.set(true); + } + } + }); + + t.start(); + + t.interrupt(); + + cache.lock(2).unlock(); + + t.join(); + + assertFalse(cache.lock(1).isLocked()); + assertFalse(cache.lock(2).isLocked()); + + assertTrue(isOk.get()); + } + + /** * @throws IgniteCheckedException If test failed. */ public void testManyLockReentries() throws IgniteCheckedException {