IGNITE-59 Support lock, lockAll. Fix 'interrupt' problem.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0379c59f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0379c59f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0379c59f

Branch: refs/heads/ignite-36
Commit: 0379c59f7bafb27b8722aa17cbf64aca36cbfd74
Parents: 9d4476d
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Mon Jan 19 16:20:08 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Mon Jan 19 16:20:08 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      | 55 ++++++++------
 .../processors/cache/GridCacheAdapter.java      | 20 ++++-
 .../cache/GridCacheBasicApiAbstractTest.java    | 79 ++++++++++++++++++++
 3 files changed, 129 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0379c59f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index bd5d6ee..64e86dd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -295,30 +295,13 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
                 try {
                     delegate.lockAll(keys, 0);
                 }
-                catch (GridInterruptedException ignored) {
-
-                }
                 catch (IgniteCheckedException e) {
                     throw new CacheException(e.getMessage(), e);
                 }
             }
 
             @Override public void lockInterruptibly() throws 
InterruptedException {
-                if (Thread.interrupted())
-                    throw new InterruptedException();
-
-                try {
-                    delegate.lockAll(keys, 0);
-                }
-                catch (GridInterruptedException e) {
-                    if (e.getCause() instanceof InterruptedException)
-                        throw (InterruptedException)e.getCause();
-
-                    throw new InterruptedException();
-                }
-                catch (IgniteCheckedException e) {
-                    throw new CacheException(e.getMessage(), e);
-                }
+                tryLock(-1, TimeUnit.MILLISECONDS);
             }
 
             @Override public boolean tryLock() {
@@ -331,14 +314,38 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
             }
 
             @Override public boolean tryLock(long time, TimeUnit unit) throws 
InterruptedException {
+                if (Thread.interrupted())
+                    throw new InterruptedException();
+
                 try {
-                    return delegate.lockAll(keys, unit.toMillis(time));
-                }
-                catch (GridInterruptedException e) {
-                    if (e.getCause() instanceof InterruptedException)
-                        throw (InterruptedException)e.getCause();
+                    IgniteFuture<Boolean> fut = null;
 
-                    throw new InterruptedException();
+                    try {
+                        if (time <= 0)
+                            return delegate.lockAll(keys, -1);
+
+                        fut = delegate.lockAllAsync(keys, time <= 0 ? -1 : 
unit.toMillis(time));
+
+                        return fut.get();
+                    }
+                    catch (GridInterruptedException e) {
+                        if (fut != null) {
+                            if (!fut.cancel()) {
+                                if (fut.isDone()) {
+                                    Boolean res = fut.get();
+
+                                    Thread.currentThread().interrupt();
+
+                                    return res;
+                                }
+                            }
+                        }
+
+                        if (e.getCause() instanceof InterruptedException)
+                            throw (InterruptedException)e.getCause();
+
+                        throw new InterruptedException();
+                    }
                 }
                 catch (IgniteCheckedException e) {
                     throw new CacheException(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0379c59f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 1a31edd..f6748fe 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -3208,7 +3208,25 @@ public abstract class GridCacheAdapter<K, V> extends 
GridMetadataAwareAdapter im
         if (keyCheck)
             validateCacheKeys(keys);
 
-        return lockAllAsync(keys, timeout, filter).get();
+        IgniteFuture<Boolean> fut = lockAllAsync(keys, timeout, filter);
+
+        boolean isInterrupted = false;
+
+        try {
+            while (true) {
+                try {
+                    return fut.get();
+                }
+                catch (GridInterruptedException ignored) {
+                    // Interrupted status of current thread was cleared, retry 
to get lock.
+                    isInterrupted = true;
+                }
+            }
+        }
+        finally {
+            if (isInterrupted)
+                Thread.currentThread().interrupt();
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0379c59f/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java
index 62f64ca..0910d42 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheBasicApiAbstractTest.java
@@ -33,6 +33,7 @@ import org.jetbrains.annotations.*;
 import javax.cache.expiry.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
 import static java.util.concurrent.TimeUnit.*;
@@ -162,6 +163,84 @@ public abstract class GridCacheBasicApiAbstractTest 
extends GridCommonAbstractTe
     }
 
     /**
+     *
+     */
+    public void testInterruptLock() throws InterruptedException {
+        final IgniteCache<Integer, String> cache = ignite.jcache(null);
+
+//        cache.put(1, "a");
+
+        cache.lock(1).lock();
+
+        final AtomicBoolean isOk = new AtomicBoolean(false);
+
+        Thread t = new Thread(new Runnable() {
+            @Override public void run() {
+                cache.lock(1).lock();
+
+                try {
+                    assertTrue(cache.lock(1).isLockedByThread());
+                }
+                finally {
+                    cache.lock(1).unlock();
+                }
+
+                assertTrue(Thread.currentThread().isInterrupted());
+
+                isOk.set(true);
+            }
+        });
+
+        t.start();
+
+        t.interrupt();
+
+        cache.lock(1).unlock();
+
+        t.join();
+
+        assertTrue(isOk.get());
+    }
+
+    /**
+     *
+     */
+    public void _testInterruptLockWithTimeout() throws InterruptedException {
+        final IgniteCache<Integer, String> cache = ignite.jcache(null);
+
+//        cache.put(1, "a");
+//        cache.put(2, "b");
+
+        cache.lock(2).lock();
+
+        final AtomicBoolean isOk = new AtomicBoolean(false);
+
+        Thread t = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    cache.lockAll(Arrays.asList(1, 2)).tryLock(5000, 
MILLISECONDS);
+                }
+                catch (InterruptedException ignored) {
+                    isOk.set(true);
+                }
+            }
+        });
+
+        t.start();
+
+        t.interrupt();
+
+        cache.lock(2).unlock();
+
+        t.join();
+
+        assertFalse(cache.lock(1).isLocked());
+        assertFalse(cache.lock(2).isLocked());
+
+        assertTrue(isOk.get());
+    }
+
+    /**
      * @throws IgniteCheckedException If test failed.
      */
     public void testManyLockReentries() throws IgniteCheckedException {

Reply via email to