# IGNITE-59 Support lock, lockAll. Move lock implementation to top level class CacheLockImpl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cd9a5ac9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cd9a5ac9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cd9a5ac9 Branch: refs/heads/ignite-86 Commit: cd9a5ac92a07a753e9e95ff99c92d9e0410d3589 Parents: 3f7dc7d Author: sevdokimov <sevdoki...@gridgain.com> Authored: Mon Jan 19 20:03:03 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Mon Jan 19 20:03:03 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheLockImpl.java | 160 +++++++++++++++++++ .../processors/cache/IgniteCacheProxy.java | 104 +----------- 2 files changed, 161 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd9a5ac9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLockImpl.java new file mode 100644 index 0000000..f60b157 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLockImpl.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +/** + * + * @param <K> + */ +class CacheLockImpl<K> implements CacheLock { + /** */ + private final GridCacheProjectionEx<K, ?> delegate; + + /** */ + private final Collection<? extends K> keys; + + /** + * @param delegate Delegate. + * @param keys Keys. + */ + CacheLockImpl(GridCacheProjectionEx<K, ?> delegate, Collection<? extends K> keys) { + this.delegate = delegate; + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public boolean isLocked() { + for (K key : keys) { + if (!delegate.isLocked(key)) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isLockedByThread() { + for (K key : keys) { + if (!delegate.isLockedByThread(key)) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> lockAsync() { + return delegate.lockAllAsync(keys, 0); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> lockAsync(long timeout, TimeUnit unit) { + return delegate.lockAllAsync(keys, unit.toMillis(timeout)); + } + + /** {@inheritDoc} */ + @Override public void lock() { + try { + delegate.lockAll(keys, 0); + } + catch (IgniteCheckedException e) { + throw new CacheException(e.getMessage(), e); + } + } + + /** {@inheritDoc} */ + @Override public void lockInterruptibly() throws InterruptedException { + tryLock(-1, TimeUnit.MILLISECONDS); + } + + /** {@inheritDoc} */ + @Override public boolean tryLock() { + try { + return delegate.lockAll(keys, -1); + } + catch (IgniteCheckedException e) { + throw new CacheException(e.getMessage(), e); + } + } + + /** {@inheritDoc} */ + @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + + try { + if (time <= 0) + return delegate.lockAll(keys, -1); + + IgniteFuture<Boolean> fut = null; + + try { + fut = delegate.lockAllAsync(keys, time <= 0 ? -1 : unit.toMillis(time)); + + return fut.get(); + } + catch (GridInterruptedException e) { + if (fut != null) { + if (!fut.cancel()) { + if (fut.isDone()) { + Boolean res = fut.get(); + + Thread.currentThread().interrupt(); + + return res; + } + } + } + + if (e.getCause() instanceof InterruptedException) + throw (InterruptedException)e.getCause(); + + throw new InterruptedException(); + } + } + catch (IgniteCheckedException e) { + throw new CacheException(e.getMessage(), e); + } + } + + /** {@inheritDoc} */ + @Override public void unlock() { + try { + delegate.unlockAll(keys); + } + catch (IgniteCheckedException e) { + throw new CacheException(e.getMessage(), e); + } + } + + /** {@inheritDoc} */ + @NotNull @Override public Condition newCondition() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd9a5ac9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 0506c5c..ca41043 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -23,7 +23,6 @@ import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -40,7 +39,6 @@ import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.locks.*; /** * Cache proxy. @@ -264,107 +262,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public CacheLock lockAll(final Collection<? extends K> keys) { - return new CacheLock() { - @Override public boolean isLocked() { - for (K key : keys) { - if (!delegate.isLocked(key)) - return false; - } - - return true; - } - - @Override public boolean isLockedByThread() { - for (K key : keys) { - if (!delegate.isLockedByThread(key)) - return false; - } - - return true; - } - - @Override public IgniteFuture<Boolean> lockAsync() { - return delegate.lockAllAsync(keys, 0); - } - - @Override public IgniteFuture<Boolean> lockAsync(long timeout, TimeUnit unit) { - return delegate.lockAllAsync(keys, unit.toMillis(timeout)); - } - - @Override public void lock() { - try { - delegate.lockAll(keys, 0); - } - catch (IgniteCheckedException e) { - throw new CacheException(e.getMessage(), e); - } - } - - @Override public void lockInterruptibly() throws InterruptedException { - tryLock(-1, TimeUnit.MILLISECONDS); - } - - @Override public boolean tryLock() { - try { - return delegate.lockAll(keys, -1); - } - catch (IgniteCheckedException e) { - throw new CacheException(e.getMessage(), e); - } - } - - @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - - try { - if (time <= 0) - return delegate.lockAll(keys, -1); - - IgniteFuture<Boolean> fut = null; - - try { - fut = delegate.lockAllAsync(keys, time <= 0 ? -1 : unit.toMillis(time)); - - return fut.get(); - } - catch (GridInterruptedException e) { - if (fut != null) { - if (!fut.cancel()) { - if (fut.isDone()) { - Boolean res = fut.get(); - - Thread.currentThread().interrupt(); - - return res; - } - } - } - - if (e.getCause() instanceof InterruptedException) - throw (InterruptedException)e.getCause(); - - throw new InterruptedException(); - } - } - catch (IgniteCheckedException e) { - throw new CacheException(e.getMessage(), e); - } - } - - @Override public void unlock() { - try { - delegate.unlockAll(keys); - } - catch (IgniteCheckedException e) { - throw new CacheException(e.getMessage(), e); - } - } - - @NotNull @Override public Condition newCondition() { - throw new UnsupportedOperationException(); - } - }; + return new CacheLockImpl<K>(delegate, keys); } /** {@inheritDoc} */