http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index b73b0d1,dccb2c3..0edaffe --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@@ -26,17 -26,10 +26,18 @@@ import org.apache.ignite.cluster.* import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.datastructures.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.mxbean.*; +import org.apache.ignite.plugin.security.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; + import org.apache.ignite.internal.cluster.*; + import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.processors.cache.affinity.*; -import org.apache.ignite.internal.processors.cache.datastructures.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.dr.*; import org.apache.ignite.internal.processors.cache.query.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 797cfe3,b1abc8e..b5c91a3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -30,12 -30,6 +30,13 @@@ import org.apache.ignite.fs.* import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.datastructures.*; +import org.apache.ignite.internal.processors.datastructures.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.spi.*; ++import org.apache.ignite.internal.processors.cache.datastructures.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 14b2673,b20628e..52c6c32 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@@ -23,15 -23,18 +23,17 @@@ import org.apache.ignite.cache.affinity import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; - import org.apache.ignite.internal.processors.cache.version.*; - import org.apache.ignite.lang.*; - import org.apache.ignite.mxbean.*; - import org.apache.ignite.transactions.*; + import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.affinity.*; -import org.apache.ignite.internal.processors.cache.datastructures.*; import org.apache.ignite.internal.processors.cache.dr.*; import org.apache.ignite.internal.processors.cache.query.*; + import org.apache.ignite.internal.processors.cache.transactions.*; + import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.mxbean.*; + import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java index a2bc582,0000000..131fa3c mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java @@@ -1,256 -1,0 +1,255 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; - import org.apache.ignite.cache.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; +import java.util.*; + +/** + * {@link org.apache.ignite.IgniteQueue} implementation using atomic cache. + */ +public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { + /** */ + private static final long RETRY_TIMEOUT = 3000; + + /** + * @param queueName Queue name. + * @param hdr Queue header. + * @param cctx Cache context. + */ + public GridAtomicCacheQueueImpl(String queueName, GridCacheQueueHeader hdr, GridCacheContext<?, ?> cctx) { + super(queueName, hdr, cctx); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean offer(T item) throws IgniteException { + try { + Long idx = transformHeader(new AddProcessor(id, 1)); + + if (idx == null) + return false; + + checkRemoved(idx); + + int cnt = 0; + + GridCacheQueueItemKey key = itemKey(idx); + + while (true) { + try { + cache.put(key, item); + + break; + } - catch (CachePartialUpdateException e) { ++ catch (CachePartialUpdateCheckedException e) { + if (cnt++ == MAX_UPDATE_RETRIES) + throw e; + else { + U.warn(log, "Failed to put queue item, will retry [err=" + e + ", idx=" + idx + ']'); + + U.sleep(RETRY_DELAY); + } + } + } + + return true; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public T poll() throws IgniteException { + try { + while (true) { + Long idx = transformHeader(new PollProcessor(id)); + + if (idx == null) + return null; + + checkRemoved(idx); + + GridCacheQueueItemKey key = itemKey(idx); + + int cnt = 0; + + long stop = 0; + + while (true) { + try { - T data = (T)cache.getAndRemove(key); ++ T data = (T)cache.remove(key, null); + + if (data != null) + return data; + + if (stop == 0) + stop = U.currentTimeMillis() + RETRY_TIMEOUT; + + while (U.currentTimeMillis() < stop ) { - data = (T)cache.getAndRemove(key); ++ data = (T)cache.remove(key, null); + + if (data != null) + return data; + } + + break; + } - catch (CachePartialUpdateException e) { ++ catch (CachePartialUpdateCheckedException e) { + if (cnt++ == MAX_UPDATE_RETRIES) + throw e; + else { + U.warn(log, "Failed to remove queue item, will retry [err=" + e + ']'); + + U.sleep(RETRY_DELAY); + } + } + } + + U.warn(log, "Failed to get item, will retry poll [queue=" + queueName + ", idx=" + idx + ']'); + } + } + catch (IgniteCheckedException e) { - throw new IgniteException(e); ++ throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean addAll(Collection<? extends T> items) { + A.notNull(items, "items"); + + try { + Long idx = transformHeader(new AddProcessor(id, items.size())); + + if (idx == null) + return false; + + checkRemoved(idx); + + Map<GridCacheQueueItemKey, T> putMap = new HashMap<>(); + + for (T item : items) { + putMap.put(itemKey(idx), item); + + idx++; + } + + int cnt = 0; + + while (true) { + try { - cache.putAll(putMap); ++ cache.putAll(putMap, null); + + break; + } - catch (CachePartialUpdateException e) { ++ catch (CachePartialUpdateCheckedException e) { + if (cnt++ == MAX_UPDATE_RETRIES) + throw e; + else { + U.warn(log, "Failed to add items, will retry [err=" + e + ']'); + + U.sleep(RETRY_DELAY); + } + } + } + + return true; + } + catch (IgniteCheckedException e) { - throw new IgniteException(e); ++ throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void removeItem(long rmvIdx) throws IgniteCheckedException { - Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)); ++ Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get(); + + if (idx != null) { + checkRemoved(idx); + + GridCacheQueueItemKey key = itemKey(idx); + + int cnt = 0; + + long stop = 0; + + while (true) { + try { - if (cache.remove(key)) ++ if (cache.removex(key, null)) + return; + + if (stop == 0) + stop = U.currentTimeMillis() + RETRY_TIMEOUT; + + while (U.currentTimeMillis() < stop ) { - if (cache.remove(key)) ++ if (cache.removex(key, null)) + return; + } + + break; + } - catch (CachePartialUpdateException e) { ++ catch (CachePartialUpdateCheckedException e) { + if (cnt++ == MAX_UPDATE_RETRIES) + throw e; + else { + U.warn(log, "Failed to add items, will retry [err=" + e + ']'); + + U.sleep(RETRY_DELAY); + } + } + } + + U.warn(log, "Failed to remove item, [queue=" + queueName + ", idx=" + idx + ']'); + } + } + + /** + * @param c EntryProcessor to be applied for queue header. + * @return Value computed by the entry processor. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + @Nullable private Long transformHeader(EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long> c) + throws IgniteCheckedException { + int cnt = 0; + + while (true) { + try { - return (Long)cache.invoke(queueKey, c); ++ return (Long)cache.invoke(queueKey, c).get(); + } - catch (CachePartialUpdateException e) { ++ catch (CachePartialUpdateCheckedException e) { + if (cnt++ == MAX_UPDATE_RETRIES) + throw e; + else { + U.warn(log, "Failed to update queue header, will retry [err=" + e + ']'); + + U.sleep(RETRY_DELAY); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAnnotationHelper.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAnnotationHelper.java index 13d09fc,0000000..7da8aa7 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAnnotationHelper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAnnotationHelper.java @@@ -1,240 -1,0 +1,240 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; - import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; ++import org.apache.ignite.lang.*; + +import java.lang.annotation.*; +import java.lang.reflect.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Utility class for getting annotated values from classes. + * Contains local cache of annotated methods and fields by classes for best performance. + */ +public class GridCacheAnnotationHelper<A extends Annotation> { + /** Number of entries to keep in annotation cache. */ + private static final int DFLT_CLASS_CACHE_SIZE = 1000; + + /** Field cache. */ + private final GridBoundedLinkedHashMap<Class<?>, List<Field>> fieldCache; + + /** Method cache. */ + private final GridBoundedLinkedHashMap<Class<?>, List<Method>> mtdCache; + + /** Annotation class. */ + private final Class<A> annCls; + + /** Mutex. */ + private final Object mux = new Object(); + + /** + * Constructor. + * + * @param annCls Annotation class. + */ + public GridCacheAnnotationHelper(Class<A> annCls) { + this(annCls, DFLT_CLASS_CACHE_SIZE); + } + + /** + * Constructor. + * + * @param annCls Annotation class. + * @param capacity Capacity of local caches. + */ + public GridCacheAnnotationHelper(Class<A> annCls, int capacity) { + assert annCls != null : "Annotated class mustn't be null."; + assert capacity > 0 : "Capacity must be more then zero."; + + this.annCls = annCls; + + fieldCache = new GridBoundedLinkedHashMap<>(capacity); + + mtdCache = new GridBoundedLinkedHashMap<>(capacity); + } + + /** + * Returns annotated value. + * + * @param target Object to find a value in. + * @return Value of annotated field or method. + * @throws IgniteCheckedException If failed to find. + */ + public Object annotatedValue(Object target) throws IgniteCheckedException { + IgniteBiTuple<Object, Boolean> res = annotatedValue(target, new HashSet<>(), false); + + assert res != null; + + return res.get1(); + } + + /** + * Returns annotated value. + * + * @param target Object to find a value in. + * @param visited Set of visited objects to avoid cycling. + * @param annFound Flag indicating if value has already been found. + * @return Value of annotated field or method. + * @throws IgniteCheckedException If failed to find. + */ + private IgniteBiTuple<Object, Boolean> annotatedValue(Object target, Set<Object> visited, boolean annFound) + throws IgniteCheckedException { + assert target != null; + + // To avoid infinite recursion. + if (visited.contains(target)) + return F.t(null, annFound); + + visited.add(target); + + Object val = null; + + for (Class<?> cls = target.getClass(); !cls.equals(Object.class); cls = cls.getSuperclass()) { + // Fields. + for (Field f : fieldsWithAnnotation(cls)) { + f.setAccessible(true); + + Object fieldVal; + + try { + fieldVal = f.get(target); + } + catch (IllegalAccessException e) { + throw new IgniteCheckedException("Failed to get annotated field value [cls=" + cls.getName() + + ", ann=" + annCls.getSimpleName()+']', e); + } + + if (needsRecursion(f)) { + if (fieldVal != null) { + // Recursion. + IgniteBiTuple<Object, Boolean> tup = annotatedValue(fieldVal, visited, annFound); + + if (!annFound && tup.get2()) + // Update value only if annotation was found in recursive call. + val = tup.get1(); + + annFound = tup.get2(); + } + } + else { + if (annFound) + throw new IgniteCheckedException("Multiple annotations has been found [cls=" + cls.getName() + + ", ann=" + annCls.getSimpleName() + ']'); + + val = fieldVal; + + annFound = true; + } + } + + // Methods. + for (Method m : methodsWithAnnotation(cls)) { + if (annFound) + throw new IgniteCheckedException("Multiple annotations has been found [cls=" + cls.getName() + + ", ann=" + annCls.getSimpleName() + ']'); + + m.setAccessible(true); + + try { + val = m.invoke(target); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to get annotated method value [cls=" + cls.getName() + + ", ann=" + annCls.getSimpleName()+']', e); + } + + annFound = true; + } + } + + return F.t(val, annFound); + } + + /** + * @param f Field. + * @return {@code true} if recursive inspection is required. + */ + private boolean needsRecursion(Field f) { + assert f != null; + + // Need to inspect anonymous classes, callable and runnable instances. + return f.getName().startsWith("this$") || f.getName().startsWith("val$") || + Callable.class.isAssignableFrom(f.getType()) || Runnable.class.isAssignableFrom(f.getType()); + } + + /** + * Gets all entries from the specified class or its super-classes that have + * been annotated with annotation provided. + * + * @param cls Class in which search for fields. + * @return Set of entries with given annotations. + */ + private Iterable<Field> fieldsWithAnnotation(Class<?> cls) { + synchronized (mux) { + List<Field> fields = fieldCache.get(cls); + if (fields == null) { + fields = new ArrayList<>(); + + for (Field field : cls.getDeclaredFields()) { + Annotation ann = field.getAnnotation(annCls); + + if (ann != null || needsRecursion(field)) + fields.add(field); + } + + if (!fields.isEmpty()) + fieldCache.put(cls, fields); + } + + return fields; + } + } + + /** + * Gets set of methods with given annotation. + * + * @param cls Class in which search for methods. + * @return Set of methods with given annotations. + */ + private Iterable<Method> methodsWithAnnotation(Class<?> cls) { + synchronized (mux) { + List<Method> mtds = mtdCache.get(cls); + + if (mtds == null) { + mtds = new ArrayList<>(); + + for (Method mtd : cls.getDeclaredMethods()) { + Annotation ann = mtd.getAnnotation(annCls); + + if (ann != null) + mtds.add(mtd); + } + + if (!mtds.isEmpty()) + mtdCache.put(cls, mtds); + } + + return mtds; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index 9bb71b2,0000000..10e8e6b mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@@ -1,510 -1,0 +1,510 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.lang.*; - import org.apache.ignite.transactions.*; ++import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Cache atomic long implementation. + */ +public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Deserialization stash. */ + private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash = + new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() { + @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() { + return F.t2(); + } + }; + + /** Logger. */ + private IgniteLogger log; + + /** Atomic long name. */ + private String name; + + /** Removed flag.*/ + private volatile boolean rmvd; + + /** Atomic long key. */ + private GridCacheInternalKey key; + + /** Atomic long projection. */ + private CacheProjection<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView; + + /** Cache context. */ + private GridCacheContext ctx; + + /** Callable for {@link #get()}. */ + private final Callable<Long> getCall = new Callable<Long>() { + @Override public Long call() throws Exception { + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + + return val.get(); + } + }; + + /** Callable for {@link #incrementAndGet()}. */ + private final Callable<Long> incAndGetCall = new Callable<Long>() { + @Override public Long call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + + long retVal = val.get() + 1; + + val.set(retVal); + + atomicView.put(key, val); + + tx.commit(); + + return retVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to increment and get: " + this, e); + + throw e; + } + } + }; + + /** Callable for {@link #getAndIncrement()}. */ + private final Callable<Long> getAndIncCall = new Callable<Long>() { + @Override public Long call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + + long retVal = val.get(); + + val.set(retVal + 1); + + atomicView.put(key, val); + + tx.commit(); + + return retVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to get and increment: " + this, e); + + throw e; + } + } + }; + + /** Callable for {@link #decrementAndGet()}. */ + private final Callable<Long> decAndGetCall = new Callable<Long>() { + @Override public Long call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + + long retVal = val.get() - 1; + + val.set(retVal); + + atomicView.put(key, val); + + tx.commit(); + + return retVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to decrement and get: " + this, e); + + throw e; + } + } + }; + + /** Callable for {@link #getAndDecrement()}. */ + private final Callable<Long> getAndDecCall = new Callable<Long>() { + @Override public Long call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + + long retVal = val.get(); + + val.set(retVal - 1); + + atomicView.put(key, val); + + tx.commit(); + + return retVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to get and decrement and get: " + this, e); + + throw e; + } + } + }; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridCacheAtomicLongImpl() { + // No-op. + } + + /** + * Default constructor. + * + * @param name Atomic long name. + * @param key Atomic long key. + * @param atomicView Atomic projection. + * @param ctx CacheContext. + */ + public GridCacheAtomicLongImpl(String name, GridCacheInternalKey key, + CacheProjection<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView, GridCacheContext ctx) { + assert key != null; + assert atomicView != null; + assert ctx != null; + assert name != null; + + this.ctx = ctx; + this.key = key; + this.atomicView = atomicView; + this.name = name; + + log = ctx.gridConfig().getGridLogger().getLogger(getClass()); + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public long get() throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(getCall, ctx); + } + + /** {@inheritDoc} */ + @Override public long incrementAndGet() throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(incAndGetCall, ctx); + } + + /** {@inheritDoc} */ + @Override public long getAndIncrement() throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(getAndIncCall, ctx); + } + + /** {@inheritDoc} */ + @Override public long addAndGet(long l) throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(internalAddAndGet(l), ctx); + } + + /** {@inheritDoc} */ + @Override public long getAndAdd(long l) throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(internalGetAndAdd(l), ctx); + } + + /** {@inheritDoc} */ + @Override public long decrementAndGet() throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(decAndGetCall, ctx); + } + + /** {@inheritDoc} */ + @Override public long getAndDecrement() throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(getAndDecCall, ctx); + } + + /** {@inheritDoc} */ + @Override public long getAndSet(long l) throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(internalGetAndSet(l), ctx); + } + + /** {@inheritDoc} */ + @Override public boolean compareAndSet(long expVal, long newVal) + throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(internalCompareAndSet(expVal, newVal), ctx); + } + + /** + * Check removed flag. + * + * @throws IgniteCheckedException If removed. + */ + private void checkRemoved() throws IgniteCheckedException { + if (rmvd) + throw new DataStructureRemovedException("Atomic long was removed from cache: " + name); + } + + /** {@inheritDoc} */ + @Override public boolean onRemoved() { + return rmvd = true; + } + + /** {@inheritDoc} */ + @Override public void onInvalid(@Nullable Exception err) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public GridCacheInternalKey key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return rmvd; + } + + /** {@inheritDoc} */ + @Override public void close() { + if (rmvd) + return; + + try { + ctx.kernalContext().dataStructures().removeAtomicLong(name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode. + * + * @param l Value will be added to atomic long. + * @return Callable for execution in async and sync mode. + */ + private Callable<Long> internalAddAndGet(final long l) { + return new Callable<Long>() { + @Override public Long call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + + long retVal = val.get() + l; + + val.set(retVal); + + atomicView.put(key, val); + + tx.commit(); + + return retVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to add and get: " + this, e); + + throw e; + } + } + }; + } + + /** + * Method returns callable for execution {@link #getAndAdd(long)} operation in async and sync mode. + * + * @param l Value will be added to atomic long. + * @return Callable for execution in async and sync mode. + */ + private Callable<Long> internalGetAndAdd(final long l) { + return new Callable<Long>() { + @Override public Long call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + + long retVal = val.get(); + + val.set(retVal + l); + + atomicView.put(key, val); + + tx.commit(); + + return retVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to get and add: " + this, e); + + throw e; + } + } + }; + } + + /** + * Method returns callable for execution {@link #getAndSet(long)} operation in async and sync mode. + * + * @param l Value will be added to atomic long. + * @return Callable for execution in async and sync mode. + */ + private Callable<Long> internalGetAndSet(final long l) { + return new Callable<Long>() { + @Override public Long call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + + long retVal = val.get(); + + val.set(l); + + atomicView.put(key, val); + + tx.commit(); + + return retVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to get and set: " + this, e); + + throw e; + } + } + }; + } + + /** + * Method returns callable for execution {@link #compareAndSet(long, long)} + * operation in async and sync mode. + * + * @param expVal Expected atomic long value. + * @param newVal New atomic long value. + * @return Callable for execution in async and sync mode. + */ + private Callable<Boolean> internalCompareAndSet(final long expVal, final long newVal) { + return new Callable<Boolean>() { + @Override public Boolean call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + + boolean retVal = val.get() == expVal; + + if (retVal) { + val.set(newVal); + + atomicView.put(key, val); + + tx.commit(); + } + + return retVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to compare and set: " + this, e); + + throw e; + } + } + }; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx.kernalContext()); + out.writeUTF(name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + t.set1((GridKernalContext)in.readObject()); + t.set2(in.readUTF()); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + private Object readResolve() throws ObjectStreamException { + try { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + return t.get1().dataStructures().atomicLong(t.get2(), 0L, false); + } + catch (IgniteCheckedException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheAtomicLongImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index c945a06,0000000..efdaac2 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@@ -1,325 -1,0 +1,325 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.lang.*; - import org.apache.ignite.transactions.*; ++import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Cache atomic reference implementation. + */ +public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicReferenceEx<T>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Deserialization stash. */ + private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash = + new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() { + @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() { + return F.t2(); + } + }; + + /** Logger. */ + private IgniteLogger log; + + /** Atomic reference name. */ + private String name; + + /** Status.*/ + private volatile boolean rmvd; + + /** Atomic reference key. */ + private GridCacheInternalKey key; + + /** Atomic reference projection. */ + private CacheProjection<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> atomicView; + + /** Cache context. */ + private GridCacheContext ctx; + + /** Callable for {@link #get} operation */ + private final Callable<T> getCall = new Callable<T>() { + @Override public T call() throws Exception { + GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); + + if (ref == null) + throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); + + return ref.get(); + } + }; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridCacheAtomicReferenceImpl() { + // No-op. + } + + /** + * Default constructor. + * + * @param name Atomic reference name. + * @param key Atomic reference key. + * @param atomicView Atomic projection. + * @param ctx Cache context. + */ + public GridCacheAtomicReferenceImpl(String name, + GridCacheInternalKey key, + CacheProjection<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> atomicView, + GridCacheContext ctx) { + assert key != null; + assert atomicView != null; + assert ctx != null; + assert name != null; + + this.ctx = ctx; + this.key = key; + this.atomicView = atomicView; + this.name = name; + + log = ctx.gridConfig().getGridLogger().getLogger(getClass()); + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public T get() throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(getCall, ctx); + } + + /** {@inheritDoc} */ + @Override public void set(T val) throws IgniteCheckedException { + checkRemoved(); + + CU.outTx(internalSet(val), ctx); + } + + /** {@inheritDoc} */ + @Override public boolean compareAndSet(T expVal, T newVal) throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(internalCompareAndSet(wrapperPredicate(expVal), wrapperClosure(newVal)), ctx); + } + + /** {@inheritDoc} */ + @Override public boolean onRemoved() { + return rmvd = true; + } + + /** {@inheritDoc} */ + @Override public void onInvalid(@Nullable Exception err) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public GridCacheInternalKey key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return rmvd; + } + + /** {@inheritDoc} */ + @Override public void close() { + if (rmvd) + return; + + try { + ctx.kernalContext().dataStructures().removeAtomicReference(name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * Method make wrapper predicate for existing value. + * + * @param val Value. + * @return Predicate. + */ + private IgnitePredicate<T> wrapperPredicate(final T val) { + return new IgnitePredicate<T>() { + @Override public boolean apply(T e) { + return val != null && val.equals(e); + } + }; + } + + /** + * Method make wrapper closure for existing value. + * + * @param val Value. + * @return Closure. + */ + private IgniteClosure<T, T> wrapperClosure(final T val) { + return new IgniteClosure<T, T>() { + @Override public T apply(T e) { + return val; + } + }; + } + + /** + * Method returns callable for execution {@link #set(Object)} operation in async and sync mode. + * + * @param val Value will be set in reference . + * @return Callable for execution in async and sync mode. + */ + private Callable<Boolean> internalSet(final T val) { + return new Callable<Boolean>() { + @Override public Boolean call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); + + if (ref == null) + throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); + + ref.set(val); + + atomicView.put(key, ref); + + tx.commit(); + + return true; + } + catch (Error | Exception e) { + U.error(log, "Failed to set value [val=" + val + ", atomicReference=" + this + ']', e); + + throw e; + } + } + }; + } + + /** + * Conditionally sets the new value. It will be set if {@code expValPred} is + * evaluate to {@code true}. + * + * @param expValPred Predicate which should evaluate to {@code true} for value to be set. + * @param newValClos Closure which generates new value. + * @return Callable for execution in async and sync mode. + */ + private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred, + final IgniteClosure<T, T> newValClos) { + return new Callable<Boolean>() { + @Override public Boolean call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); + + if (ref == null) + throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); + + if (!expValPred.apply(ref.get())) { + tx.setRollbackOnly(); + + return false; + } + else { + ref.set(newValClos.apply(ref.get())); + + atomicView.put(key, ref); + + tx.commit(); + + return true; + } + } + catch (Error | Exception e) { + U.error(log, "Failed to compare and value [expValPred=" + expValPred + ", newValClos" + + newValClos + ", atomicReference" + this + ']', e); + + throw e; + } + } + }; + } + + /** + * Check removed status. + * + * @throws IgniteCheckedException If removed. + */ + private void checkRemoved() throws IgniteCheckedException { + if (rmvd) + throw new DataStructureRemovedException("Atomic reference was removed from cache: " + name); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx.kernalContext()); + out.writeUTF(name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + t.set1((GridKernalContext)in.readObject()); + t.set2(in.readUTF()); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + @SuppressWarnings("unchecked") + private Object readResolve() throws ObjectStreamException { + try { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + return t.get1().dataStructures().atomicReference(t.get2(), null, false); + } + catch (IgniteCheckedException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheAtomicReferenceImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 2468289,0000000..c66a890 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@@ -1,536 -1,0 +1,536 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.lang.*; - import org.apache.ignite.transactions.*; ++import org.apache.ignite.internal.processors.cache.transactions.*; ++import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; - import org.apache.ignite.internal.util.future.*; ++import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Cache sequence implementation. + */ +public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenceEx, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** De-serialization stash. */ + private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash = + new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() { + @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() { + return F.t2(); + } + }; + + /** Logger. */ + private IgniteLogger log; + + /** Sequence name. */ + private String name; + + /** Removed flag. */ + private volatile boolean rmvd; + + /** Sequence key. */ + private GridCacheInternalKey key; + + /** Sequence projection. */ + private CacheProjection<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView; + + /** Cache context. */ + private volatile GridCacheContext ctx; + + /** Local value of sequence. */ + private long locVal; + + /** Upper bound of local counter. */ + private long upBound; + + /** Sequence batch size */ + private volatile int batchSize; + + /** Synchronization lock. */ + private final Lock lock = new ReentrantLock(); + + /** Await condition. */ + private Condition cond = lock.newCondition(); + + /** Callable for execution {@link #incrementAndGet} operation in async and sync mode. */ + private final Callable<Long> incAndGetCall = internalUpdate(1, true); + + /** Callable for execution {@link #getAndIncrement} operation in async and sync mode. */ + private final Callable<Long> getAndIncCall = internalUpdate(1, false); + + /** Add and get cache call guard. */ + private final AtomicBoolean updateGuard = new AtomicBoolean(); + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridCacheAtomicSequenceImpl() { + // No-op. + } + + /** + * Default constructor. + * + * @param name Sequence name. + * @param key Sequence key. + * @param seqView Sequence projection. + * @param ctx CacheContext. + * @param batchSize Sequence batch size. + * @param locVal Local counter. + * @param upBound Upper bound. + */ + public GridCacheAtomicSequenceImpl(String name, + GridCacheInternalKey key, + CacheProjection<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView, + GridCacheContext ctx, + int batchSize, + long locVal, + long upBound) + { + assert key != null; + assert seqView != null; + assert ctx != null; + assert locVal <= upBound; + + this.batchSize = batchSize; + this.ctx = ctx; + this.key = key; + this.seqView = seqView; + this.upBound = upBound; + this.locVal = locVal; + this.name = name; + + log = ctx.gridConfig().getGridLogger().getLogger(getClass()); + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public long get() throws IgniteCheckedException { + checkRemoved(); + + lock.lock(); + + try { + return locVal; + } + finally { + lock.unlock(); + } + } + + /** {@inheritDoc} */ + @Override public long incrementAndGet() throws IgniteCheckedException { + return internalUpdate(1, incAndGetCall, true); + } + + /** {@inheritDoc} */ + @Override public long getAndIncrement() throws IgniteCheckedException { + return internalUpdate(1, getAndIncCall, false); + } + + /** {@inheritDoc} */ + @Override public long addAndGet(long l) throws IgniteCheckedException { + A.ensure(l > 0, " Parameter mustn't be less then 1: " + l); + + return internalUpdate(l, null, true); + } + + /** {@inheritDoc} */ + @Override public long getAndAdd(long l) throws IgniteCheckedException { + A.ensure(l > 0, " Parameter mustn't be less then 1: " + l); + + return internalUpdate(l, null, false); + } + + /** + * Synchronous sequence update operation. Will add given amount to the sequence value. + * + * @param l Increment amount. + * @param updateCall Cache call that will update sequence reservation count in accordance with l. + * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value + * prior to update. + * @return Sequence value. + * @throws IgniteCheckedException If update failed. + */ + @SuppressWarnings("SignalWithoutCorrespondingAwait") + private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException { + checkRemoved(); + + assert l > 0; + + lock.lock(); + + try { + // If reserved range isn't exhausted. + if (locVal + l <= upBound) { + long curVal = locVal; + + locVal += l; + + return updated ? locVal : curVal; + } + } + finally { + lock.unlock(); + } + + if (updateCall == null) + updateCall = internalUpdate(l, updated); + + while (true) { + if (updateGuard.compareAndSet(false, true)) { + try { + // This call must be outside lock. + return CU.outTx(updateCall, ctx); + } + finally { + lock.lock(); + + try { + updateGuard.set(false); + + cond.signalAll(); + } + finally { + lock.unlock(); + } + } + } + else { + lock.lock(); + + try { + while (locVal >= upBound && updateGuard.get()) + U.await(cond, 500, MILLISECONDS); + + checkRemoved(); + + // If reserved range isn't exhausted. + if (locVal + l <= upBound) { + long curVal = locVal; + + locVal += l; + + return updated ? locVal : curVal; + } + } + finally { + lock.unlock(); + } + } + } + } + + /** + * Asynchronous sequence update operation. Will add given amount to the sequence value. + * + * @param l Increment amount. + * @param updateCall Cache call that will update sequence reservation count in accordance with l. + * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value + * prior to update. + * @return Future indicating sequence value. + * @throws IgniteCheckedException If update failed. + */ + @SuppressWarnings("SignalWithoutCorrespondingAwait") - private IgniteFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated) ++ private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated) + throws IgniteCheckedException { + checkRemoved(); + + A.ensure(l > 0, " Parameter mustn't be less then 1: " + l); + + lock.lock(); + + try { + // If reserved range isn't exhausted. + if (locVal + l <= upBound) { + long curVal = locVal; + + locVal += l; + + return new GridFinishedFuture<>(ctx.kernalContext(), updated ? locVal : curVal); + } + } + finally { + lock.unlock(); + } + + if (updateCall == null) + updateCall = internalUpdate(l, updated); + + while (true) { + if (updateGuard.compareAndSet(false, true)) { + try { + // This call must be outside lock. + return ctx.closures().callLocalSafe(updateCall, true); + } + finally { + lock.lock(); + + try { + updateGuard.set(false); + + cond.signalAll(); + } + finally { + lock.unlock(); + } + } + } + else { + lock.lock(); + + try { + while (locVal >= upBound && updateGuard.get()) + U.await(cond, 500, MILLISECONDS); + + checkRemoved(); + + // If reserved range isn't exhausted. + if (locVal + l <= upBound) { + long curVal = locVal; + + locVal += l; + + return new GridFinishedFuture<>(ctx.kernalContext(), updated ? locVal : curVal); + } + } + finally { + lock.unlock(); + } + } + } + } + + /** Get local batch size for this sequences. + * + * @return Sequence batch size. + */ + @Override public int batchSize() { + return batchSize; + } + + /** + * Set local batch size for this sequences. + * + * @param size Sequence batch size. Must be more then 0. + */ + @Override public void batchSize(int size) { + A.ensure(size > 0, " Batch size can't be less then 0: " + size); + + lock.lock(); + + try { + batchSize = size; + } + finally { + lock.unlock(); + } + } + + /** + * Check removed status. + * + * @throws IgniteCheckedException If removed. + */ + private void checkRemoved() throws IgniteCheckedException { + if (rmvd) + throw new DataStructureRemovedException("Sequence was removed from cache: " + name); + } + + /** {@inheritDoc} */ + @Override public boolean onRemoved() { + return rmvd = true; + } + + /** {@inheritDoc} */ + @Override public void onInvalid(@Nullable Exception err) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public GridCacheInternalKey key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return rmvd; + } + + /** {@inheritDoc} */ + @Override public void close() { + try { + if (rmvd) + return; + + ctx.kernalContext().dataStructures().removeSequence(name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * Method returns callable for execution all update operations in async and sync mode. + * + * @param l Value will be added to sequence. + * @param updated If {@code true}, will return updated value, if {@code false}, will return previous value. + * @return Callable for execution in async and sync mode. + */ + @SuppressWarnings("TooBroadScope") + private Callable<Long> internalUpdate(final long l, final boolean updated) { + return new Callable<Long>() { + @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ); ++ IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ); + + try { + GridCacheAtomicSequenceValue seq = seqView.get(key); + + checkRemoved(); + + assert seq != null; + + long curLocVal; + + long newUpBound; + + lock.lock(); + + try { + curLocVal = locVal; + + // If local range was already reserved in another thread. + if (locVal + l <= upBound) { + long retVal = locVal; + + locVal += l; + + return updated ? locVal : retVal; + } + + long curGlobalVal = seq.get(); + + long newLocVal; + + /* We should use offset because we already reserved left side of range.*/ + long off = batchSize > 1 ? batchSize - 1 : 1; + + // Calculate new values for local counter, global counter and upper bound. + if (curLocVal + l >= curGlobalVal) { + newLocVal = curLocVal + l; + + newUpBound = newLocVal + off; + } + else { + newLocVal = curGlobalVal; + + newUpBound = newLocVal + off; + } + + locVal = newLocVal; + upBound = newUpBound; + + if (updated) + curLocVal = newLocVal; + } + finally { + lock.unlock(); + } + + // Global counter must be more than reserved upper bound. + seq.set(newUpBound + 1); + + seqView.put(key, seq); + + tx.commit(); + + return curLocVal; + } + catch (Error | Exception e) { + U.error(log, "Failed to get and add: " + this, e); + + throw e; + } finally { + tx.close(); + } + } + }; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx.kernalContext()); + out.writeUTF(name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + t.set1((GridKernalContext)in.readObject()); + t.set2(in.readUTF()); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + private Object readResolve() throws ObjectStreamException { + try { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + return t.get1().dataStructures().sequence(t.get2(), 0L, false); + } + catch (IgniteCheckedException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheAtomicSequenceImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index 9605381,0000000..db73ebb mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@@ -1,355 -1,0 +1,355 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; ++import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.*; - import org.apache.ignite.lang.*; - import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Cache atomic stamped implementation. + */ +public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicStampedEx<T, S>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Deserialization stash. */ + private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash = + new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() { + @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() { + return F.t2(); + } + }; + + /** Logger. */ + private IgniteLogger log; + + /** Atomic stamped name. */ + private String name; + + /** Removed flag.*/ + private volatile boolean rmvd; + + /** Atomic stamped key. */ + private GridCacheInternalKey key; + + /** Atomic stamped projection. */ + private CacheProjection<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> atomicView; + + /** Cache context. */ + private GridCacheContext ctx; + + /** Callable for {@link #get()} operation */ + private final Callable<IgniteBiTuple<T, S>> getCall = new Callable<IgniteBiTuple<T, S>>() { + @Override public IgniteBiTuple<T, S> call() throws Exception { + GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + + if (stmp == null) + throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); + + return stmp.get(); + } + }; + + /** Callable for {@link #value()} operation */ + private final Callable<T> valCall = new Callable<T>() { + @Override public T call() throws Exception { + GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + + if (stmp == null) + throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); + + return stmp.value(); + } + }; + + /** Callable for {@link #stamp()} operation */ + private final Callable<S> stampCall = new Callable<S>() { + @Override public S call() throws Exception { + GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + + if (stmp == null) + throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); + + return stmp.stamp(); + } + }; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridCacheAtomicStampedImpl() { + // No-op. + } + + /** + * Default constructor. + * + * @param name Atomic stamped name. + * @param key Atomic stamped key. + * @param atomicView Atomic projection. + * @param ctx Cache context. + */ + public GridCacheAtomicStampedImpl(String name, GridCacheInternalKey key, CacheProjection<GridCacheInternalKey, + GridCacheAtomicStampedValue<T, S>> atomicView, GridCacheContext ctx) { + assert key != null; + assert atomicView != null; + assert ctx != null; + assert name != null; + + this.ctx = ctx; + this.key = key; + this.atomicView = atomicView; + this.name = name; + + log = ctx.gridConfig().getGridLogger().getLogger(getClass()); + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public IgniteBiTuple<T, S> get() throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(getCall, ctx); + } + + /** {@inheritDoc} */ + @Override public void set(T val, S stamp) throws IgniteCheckedException { + checkRemoved(); + + CU.outTx(internalSet(val, stamp), ctx); + } + + /** {@inheritDoc} */ + @Override public boolean compareAndSet(T expVal, T newVal, S expStamp, S newStamp) throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(internalCompareAndSet(F0.equalTo(expVal), wrapperClosure(newVal), + F0.equalTo(expStamp), wrapperClosure(newStamp)), ctx); + } + + /** {@inheritDoc} */ + @Override public S stamp() throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(stampCall, ctx); + } + + /** {@inheritDoc} */ + @Override public T value() throws IgniteCheckedException { + checkRemoved(); + + return CU.outTx(valCall, ctx); + } + + /** {@inheritDoc} */ + @Override public boolean onRemoved() { + return rmvd = true; + } + + /** {@inheritDoc} */ + @Override public void onInvalid(@Nullable Exception err) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public GridCacheInternalKey key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return rmvd; + } + + /** {@inheritDoc} */ + @Override public void close() { + if (rmvd) + return; + + try { + ctx.kernalContext().dataStructures().removeAtomicStamped(name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * Method make wrapper closure for existing value. + * + * @param val Value. + * @return Closure. + */ + private <N> IgniteClosure<N, N> wrapperClosure(final N val) { + return new IgniteClosure<N, N>() { + @Override public N apply(N e) { + return val; + } + }; + } + + /** + * Method returns callable for execution {@link #set(Object,Object)}} operation in async and sync mode. + * + * @param val Value will be set in the atomic stamped. + * @param stamp Stamp will be set in the atomic stamped. + * @return Callable for execution in async and sync mode. + */ + private Callable<Boolean> internalSet(final T val, final S stamp) { + return new Callable<Boolean>() { + @Override public Boolean call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + + if (stmp == null) + throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); + + stmp.set(val, stamp); + + atomicView.put(key, stmp); + + tx.commit(); + + return true; + } + catch (Error | Exception e) { + U.error(log, "Failed to set [val=" + val + ", stamp=" + stamp + ", atomicStamped=" + this + ']', e); + + throw e; + } + } + }; + } + + /** + * Conditionally asynchronously sets the new value and new stamp. They will be set if + * {@code expValPred} and {@code expStampPred} both evaluate to {@code true}. + * + * @param expValPred Predicate which should evaluate to {@code true} for value to be set + * @param newValClos Closure generates new value. + * @param expStampPred Predicate which should evaluate to {@code true} for value to be set + * @param newStampClos Closure generates new stamp value. + * @return Callable for execution in async and sync mode. + */ + private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred, + final IgniteClosure<T, T> newValClos, final IgnitePredicate<S> expStampPred, + final IgniteClosure<S, S> newStampClos) { + return new Callable<Boolean>() { + @Override public Boolean call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { ++ try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + + if (stmp == null) + throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); + + if (!(expValPred.apply(stmp.value()) && expStampPred.apply(stmp.stamp()))) { + tx.setRollbackOnly(); + + return false; + } + else { + stmp.set(newValClos.apply(stmp.value()), newStampClos.apply(stmp.stamp())); + + atomicView.put(key, stmp); + + tx.commit(); + + return true; + } + } + catch (Error | Exception e) { + U.error(log, "Failed to compare and set [expValPred=" + expValPred + ", newValClos=" + + newValClos + ", expStampPred=" + expStampPred + ", newStampClos=" + newStampClos + + ", atomicStamped=" + this + ']', e); + + throw e; + } + } + }; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx.kernalContext()); + out.writeUTF(name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + t.set1((GridKernalContext)in.readObject()); + t.set2(in.readUTF()); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + @SuppressWarnings("unchecked") + private Object readResolve() throws ObjectStreamException { + try { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + return t.get1().dataStructures().atomicStamped(t.get2(), null, null, false); + } + catch (IgniteCheckedException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } + + /** + * Check removed status. + * + * @throws IgniteCheckedException If removed. + */ + private void checkRemoved() throws IgniteCheckedException { + if (rmvd) + throw new DataStructureRemovedException("Atomic stamped was removed from cache: " + name); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(GridCacheAtomicStampedImpl.class, this); + } +}