http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java index bf8cf0d..221bc39 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java @@ -18,18 +18,28 @@ package org.apache.ignite.cache.eviction.fifo; import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.jsr166.*; import org.jsr166.ConcurrentLinkedDeque8.*; import java.io.*; import java.util.*; +import static org.apache.ignite.configuration.CacheConfiguration.*; + /** * Eviction policy based on {@code First In First Out (FIFO)} algorithm and supports batch eviction. * <p> - * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size. + * The eviction starts in the following cases: + * <ul> + * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li> + * <li> + * The size of cache entries in bytes becomes greater than the maximum memory size. + * The size of cache entry calculates as sum of key size and value size. + * </li> + * </ul> + * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}). * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}. * <p> * This implementation is very efficient since it does not create any additional @@ -41,11 +51,17 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict private static final long serialVersionUID = 0L; /** Maximum size. */ - private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; + private volatile int max = DFLT_CACHE_SIZE; /** Batch size. */ private volatile int batchSize = 1; + /** Max memory size. */ + private volatile long maxMemSize; + + /** Memory size. */ + private final LongAdder8 memSize = new LongAdder8(); + /** FIFO queue. */ private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue = new ConcurrentLinkedDeque8<>(); @@ -63,7 +79,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict * @param max Maximum allowed size of cache before entry will start getting evicted. */ public FifoEvictionPolicy(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } @@ -75,7 +91,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict * @param batchSize Batch size. */ public FifoEvictionPolicy(int max, int batchSize) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); A.ensure(batchSize > 0, "batchSize > 0"); this.max = max; @@ -97,7 +113,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict * @param max Maximum allowed size of cache before entry will start getting evicted. */ @Override public void setMaxSize(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } @@ -119,6 +135,23 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict return queue.size(); } + /** {@inheritDoc} */ + @Override public long getMaxMemorySize() { + return maxMemSize; + } + + /** {@inheritDoc} */ + @Override public void setMaxMemorySize(long maxMemSize) { + A.ensure(maxMemSize >= 0, "maxMemSize >= 0"); + + this.maxMemSize = maxMemSize; + } + + /** {@inheritDoc} */ + @Override public long getCurrentMemorySize() { + return memSize.longValue(); + } + /** * Gets read-only view on internal {@code FIFO} queue in proper order. * @@ -141,8 +174,11 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict else { Node<EvictableEntry<K, V>> node = entry.removeMeta(); - if (node != null) + if (node != null) { queue.unlinkx(node); + + memSize.add(-entry.size()); + } } } @@ -173,11 +209,18 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict return false; } + memSize.add(entry.size()); + return true; } // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle. else if (!entry.removeMeta(node)) return false; + else { + memSize.add(-entry.size()); + + return true; + } } } @@ -189,38 +232,74 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict * Shrinks FIFO queue to maximum allowed size. */ private void shrink() { + long maxMem = this.maxMemSize; + + if (maxMem > 0) { + long startMemSize = memSize.longValue(); + + if (startMemSize >= maxMem) + for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) { + int size = shrink0(); + + if (size == -1) + break; + + i += size; + } + } + int max = this.max; - int batchSize = this.batchSize; + if (max > 0) { + int startSize = queue.sizex(); + + // Shrink only if queue is full. + if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize)) + for (int i = max; i < startSize && queue.sizex() > max; i++) + if (shrink0() == -1) + break; + } + } - int startSize = queue.sizex(); + /** + * Tries to remove one item from queue. + * + * @return number of bytes that was free. {@code -1} if queue is empty. + */ + private int shrink0() { + EvictableEntry<K, V> entry = queue.poll(); - // Shrink only if queue is full. - if (startSize >= max + batchSize) { - for (int i = max; i < startSize && queue.sizex() > max; i++) { - EvictableEntry<K, V> entry = queue.poll(); + if (entry == null) + return -1; - if (entry == null) - break; + int size = 0; - Node<EvictableEntry<K, V>> meta = entry.removeMeta(); + Node<EvictableEntry<K, V>> meta = entry.removeMeta(); - if (meta != null && !entry.evict()) - touch(entry); - } + if (meta != null) { + size = entry.size(); + + memSize.add(-size); + + if (!entry.evict()) + touch(entry); } + + return size; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(max); out.writeInt(batchSize); + out.writeLong(maxMemSize); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { max = in.readInt(); batchSize = in.readInt(); + maxMemSize = in.readLong(); } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java index 63a413e..793aa66 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java @@ -63,4 +63,26 @@ public interface FifoEvictionPolicyMBean { */ @MXBeanDescription("Current FIFO queue size.") public int getCurrentSize(); + + /** + * Gets maximum allowed cache size in bytes. + * + * @return maximum allowed cache size in bytes. + */ + @MXBeanDescription("Maximum allowed cache size in bytes.") + public long getMaxMemorySize(); + + /** + * Sets maximum allowed cache size in bytes. + */ + @MXBeanDescription("Set maximum allowed cache size in bytes.") + public void setMaxMemorySize(long maxMemSize); + + /** + * Gets current queue size in bytes. + * + * @return current queue size in bytes. + */ + @MXBeanDescription("Current FIFO queue size in bytes.") + public long getCurrentMemorySize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java index 309d577..0be26c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java @@ -18,26 +18,48 @@ package org.apache.ignite.cache.eviction.lru; import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.jsr166.*; import org.jsr166.ConcurrentLinkedDeque8.*; import java.io.*; import java.util.*; +import static org.apache.ignite.configuration.CacheConfiguration.*; + /** - * Eviction policy based on {@code Least Recently Used (LRU)} algorithm. This - * implementation is very efficient since it is lock-free and does not - * create any additional table-like data structures. The {@code LRU} ordering - * information is maintained by attaching ordering metadata to cache entries. + * Eviction policy based on {@code Least Recently Used (LRU)} algorithm and supports batch eviction. + * <p> + * The eviction starts in the following cases: + * <ul> + * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li> + * <li> + * The size of cache entries in bytes becomes greater than the maximum memory size. + * The size of cache entry calculates as sum of key size and value size. + * </li> + * </ul> + * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}). + * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}. + + * This implementation is very efficient since it is lock-free and does not create any additional table-like + * data structures. The {@code LRU} ordering information is maintained by attaching ordering metadata to cache entries. */ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictionPolicyMBean, Externalizable { /** */ private static final long serialVersionUID = 0L; /** Maximum size. */ - private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; + private volatile int max = DFLT_CACHE_SIZE; + + /** Batch size. */ + private volatile int batchSize = 1; + + /** Max memory size. */ + private volatile long maxMemSize; + + /** Memory size. */ + private final LongAdder8 memSize = new LongAdder8(); /** Queue. */ private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue = @@ -56,7 +78,7 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio * @param max Maximum allowed size of cache before entry will start getting evicted. */ public LruEvictionPolicy(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } @@ -76,16 +98,45 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio * @param max Maximum allowed size of cache before entry will start getting evicted. */ @Override public void setMaxSize(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } /** {@inheritDoc} */ + @Override public int getBatchSize() { + return batchSize; + } + + /** {@inheritDoc} */ + @Override public void setBatchSize(int batchSize) { + A.ensure(batchSize > 0, "batchSize > 0"); + + this.batchSize = batchSize; + } + + /** {@inheritDoc} */ @Override public int getCurrentSize() { return queue.size(); } + /** {@inheritDoc} */ + @Override public long getMaxMemorySize() { + return maxMemSize; + } + + /** {@inheritDoc} */ + @Override public void setMaxMemorySize(long maxMemSize) { + A.ensure(maxMemSize >= 0, "maxMemSize >= 0"); + + this.maxMemSize = maxMemSize; + } + + /** {@inheritDoc} */ + @Override public long getCurrentMemorySize() { + return memSize.longValue(); + } + /** * Gets read-only view on internal {@code FIFO} queue in proper order. * @@ -107,8 +158,11 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio else { Node<EvictableEntry<K, V>> node = entry.removeMeta(); - if (node != null) + if (node != null) { queue.unlinkx(node); + + memSize.add(-entry.size()); + } } } @@ -139,11 +193,18 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio return false; } + memSize.add(entry.size()); + return true; } // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle. else if (!entry.removeMeta(node)) return false; + else { + memSize.add(-entry.size()); + + return true; + } } } else if (queue.unlinkx(node)) { @@ -163,31 +224,73 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio * Shrinks queue to maximum allowed size. */ private void shrink() { + long maxMem = this.maxMemSize; + + if (maxMem > 0) { + long startMemSize = memSize.longValue(); + + if (startMemSize >= maxMem) + for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) { + int size = shrink0(); + + if (size == -1) + break; + + i += size; + } + } + int max = this.max; - int startSize = queue.sizex(); + if (max > 0) { + int startSize = queue.sizex(); - for (int i = 0; i < startSize && queue.sizex() > max; i++) { - EvictableEntry<K, V> entry = queue.poll(); + if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize)) + for (int i = max; i < startSize && queue.sizex() > max; i++) + if (shrink0() == -1) + break; + } + } - if (entry == null) - break; + /** + * Tries to remove one item from queue. + * + * @return number of bytes that was free. {@code -1} if queue is empty. + */ + private int shrink0() { + EvictableEntry<K, V> entry = queue.poll(); - Node<EvictableEntry<K, V>> meta = entry.removeMeta(); + if (entry == null) + return -1; - if (meta != null && !entry.evict()) + int size = 0; + + Node<EvictableEntry<K, V>> meta = entry.removeMeta(); + + if (meta != null) { + size = entry.size(); + + memSize.add(-size); + + if (!entry.evict()) touch(entry); } + + return size; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(max); + out.writeInt(batchSize); + out.writeLong(maxMemSize); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { max = in.readInt(); + batchSize = in.readInt(); + maxMemSize = in.readLong(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java index c243374..e17c057 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java @@ -41,10 +41,48 @@ public interface LruEvictionPolicyMBean { public void setMaxSize(int max); /** + * Gets batch size. + * + * @return batch size. + */ + @MXBeanDescription("Batch size.") + public int getBatchSize(); + + /** + * Sets batch size. + * + * @param batchSize Batch size. + */ + @MXBeanDescription("Set batch size.") + public void setBatchSize(int batchSize); + + /** * Gets current queue size. * * @return Current queue size. */ @MXBeanDescription("Current queue size.") public int getCurrentSize(); + + /** + * Gets maximum allowed cache size in bytes. + * + * @return maximum allowed cache size in bytes. + */ + @MXBeanDescription("Maximum allowed cache size in bytes.") + public long getMaxMemorySize(); + + /** + * Sets maximum allowed cache size in bytes. + */ + @MXBeanDescription("Set maximum allowed cache size in bytes.") + public void setMaxMemorySize(long maxMemSize); + + /** + * Gets current queue size in bytes. + * + * @return current queue size in bytes. + */ + @MXBeanDescription("Current queue size in bytes.") + public long getCurrentMemorySize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java index c88b31d..00a912f 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java @@ -18,20 +18,22 @@ package org.apache.ignite.cache.eviction.random; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.internal.*; import javax.cache.*; import java.io.*; +import static org.apache.ignite.configuration.CacheConfiguration.*; + /** * Cache eviction policy which will select random cache entry for eviction if cache * size exceeds the {@link #getMaxSize()} parameter. This implementation is * extremely light weight, lock-free, and does not create any data structures to maintain * any order for eviction. * <p> - * Random eviction will provide the best performance over any key set in which every + * Random eviction will provide the best performance over any key queue in which every * key has the same probability of being accessed. */ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomEvictionPolicyMBean, Externalizable { @@ -39,7 +41,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE private static final long serialVersionUID = 0L; /** Maximum size. */ - private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; + private volatile int max = DFLT_CACHE_SIZE; /** * Constructs random eviction policy with all defaults. @@ -87,7 +89,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE IgniteCache<K, V> cache = entry.unwrap(IgniteCache.class); - int size = cache.size(); + int size = cache.localSize(CachePeekMode.ONHEAP); for (int i = max; i < size; i++) { Cache.Entry<K, V> e = cache.randomEntry(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java index 7965c97..b8b82fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java @@ -34,7 +34,15 @@ import static org.apache.ignite.configuration.CacheConfiguration.*; /** * Cache eviction policy which will select the minimum cache entry for eviction. * <p> - * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size. + * The eviction starts in the following cases: + * <ul> + * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li> + * <li> + * The size of cache entries in bytes becomes greater than the maximum memory size. + * The size of cache entry calculates as sum of key size and value size. + * </li> + * </ul> + * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}). * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}. * <p> * Entries comparison based on {@link Comparator} instance if provided. @@ -48,18 +56,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE private static final long serialVersionUID = 0L; /** Maximum size. */ - private volatile int max; + private volatile int max = DFLT_CACHE_SIZE; /** Batch size. */ private volatile int batchSize = 1; + /** Max memory size. */ + private volatile long maxMemSize; + + /** Memory size. */ + private final LongAdder8 memSize = new LongAdder8(); + /** Comparator. */ private Comparator<Holder<K, V>> comp; /** Order. */ private final AtomicLong orderCnt = new AtomicLong(); - /** Backed sorted set. */ + /** Backed sorted queue. */ private final GridConcurrentSkipListSetEx<K, V> set; /** @@ -96,7 +110,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE * @param comp Entries comparator. */ public SortedEvictionPolicy(int max, int batchSize, @Nullable Comparator<EvictableEntry<K, V>> comp) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); A.ensure(batchSize > 0, "batchSize > 0"); this.max = max; @@ -106,6 +120,16 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE } /** + * Constructs sorted eviction policy with given maximum size and given entry comparator. + * + * @param comp Entries comparator. + */ + public SortedEvictionPolicy(@Nullable Comparator<EvictableEntry<K, V>> comp) { + this.comp = comp == null ? new DefaultHolderComparator<K, V>() : new HolderComparator<>(comp); + this.set = new GridConcurrentSkipListSetEx<>(this.comp); + } + + /** * Gets maximum allowed size of cache before entry will start getting evicted. * * @return Maximum allowed size of cache before entry will start getting evicted. @@ -120,7 +144,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE * @param max Maximum allowed size of cache before entry will start getting evicted. */ @Override public void setMaxSize(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } @@ -142,12 +166,29 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE return set.sizex(); } + /** {@inheritDoc} */ + @Override public long getMaxMemorySize() { + return maxMemSize; + } + + /** {@inheritDoc} */ + @Override public void setMaxMemorySize(long maxMemSize) { + A.ensure(maxMemSize >= 0, "maxMemSize >= 0"); + + this.maxMemSize = maxMemSize; + } + + /** {@inheritDoc} */ + @Override public long getCurrentMemorySize() { + return memSize.longValue(); + } + /** - * Gets read-only view of backed set in proper order. + * Gets read-only view of backed queue in proper order. * - * @return Read-only view of backed set. + * @return Read-only view of backed queue. */ - public Collection<EvictableEntry<K, V>> set() { + public Collection<EvictableEntry<K, V>> queue() { Set<EvictableEntry<K, V>> cp = new LinkedHashSet<>(); for (Holder<K, V> holder : set) @@ -168,19 +209,22 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE else { Holder<K, V> holder = entry.removeMeta(); - if (holder != null) + if (holder != null) { removeHolder(holder); + + memSize.add(-entry.size()); + } } } /** * @param entry Entry to touch. - * @return {@code True} if backed set has been changed by this call. + * @return {@code True} if backed queue has been changed by this call. */ private boolean touch(EvictableEntry<K, V> entry) { Holder<K, V> holder = entry.meta(); - // Entry has not been add yet to backed set.. + // Entry has not been add yet to backed queue.. if (holder == null) { while (true) { holder = new Holder<>(entry, orderCnt.incrementAndGet()); @@ -188,7 +232,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE set.add(holder); if (entry.putMetaIfAbsent(holder) != null) { - // Was concurrently added, need to remove it from set. + // Was concurrently added, need to remove it from queue. removeHolder(holder); // Set has not been changed. @@ -196,17 +240,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE } else if (holder.order > 0) { if (!entry.isCached()) { - // Was concurrently evicted, need to remove it from set. + // Was concurrently evicted, need to remove it from queue. removeHolder(holder); return false; } + memSize.add(entry.size()); + return true; } // If holder was removed by concurrent shrink() call, we must repeat the whole cycle. else if (!entry.removeMeta(holder)) return false; + else { + memSize.add(-entry.size()); + + return true; + } } } @@ -215,34 +266,71 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE } /** - * Shrinks backed set to maximum allowed size. + * Shrinks backed queue to maximum allowed size. */ private void shrink() { - int max = this.max; + long maxMem = this.maxMemSize; + + if (maxMem > 0) { + long startMemSize = memSize.longValue(); - int batchSize = this.batchSize; + if (startMemSize >= maxMem) + for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) { + int size = shrink0(); - int startSize = set.sizex(); + if (size == -1) + break; - if (startSize >= max + batchSize) { - for (int i = max; i < startSize && set.sizex() > max; i++) { - Holder<K, V> h = set.pollFirst(); + i += size; + } + } - if (h == null) - break; + int max = this.max; - EvictableEntry<K, V> entry = h.entry; + if (max > 0) { + int startSize = set.sizex(); - if (h.order > 0 && entry.removeMeta(h) && !entry.evict()) - touch(entry); + if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize)) { + for (int i = max; i < startSize && set.sizex() > max; i++) { + if (shrink0() == -1) + break; + } } } } + /** + * Tries to remove one item from queue. + * + * @return number of bytes that was free. {@code -1} if queue is empty. + */ + private int shrink0() { + Holder<K, V> h = set.pollFirst(); + + if (h == null) + return -1; + + int size = 0; + + EvictableEntry<K, V> entry = h.entry; + + if (h.order > 0 && entry.removeMeta(h)) { + size = entry.size(); + + memSize.add(-size); + + if (!entry.evict()) + touch(entry); + } + + return size; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(max); out.writeInt(batchSize); + out.writeLong(maxMemSize); out.writeObject(comp); } @@ -251,11 +339,12 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { max = in.readInt(); batchSize = in.readInt(); + maxMemSize = in.readLong(); comp = (Comparator<Holder<K, V>>)in.readObject(); } /** - * Removes holder from backed set and marks holder as removed. + * Removes holder from backed queue and marks holder as removed. * * @param holder Holder. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java index bc696ff..7283453 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java @@ -63,4 +63,26 @@ public interface SortedEvictionPolicyMBean { */ @MXBeanDescription("Current sorted key set size.") public int getCurrentSize(); + + /** + * Gets maximum allowed cache size in bytes. + * + * @return maximum allowed cache size in bytes. + */ + @MXBeanDescription("Maximum allowed cache size in bytes.") + public long getMaxMemorySize(); + + /** + * Sets maximum allowed cache size in bytes. + */ + @MXBeanDescription("Set maximum allowed cache size in bytes.") + public void setMaxMemorySize(long maxMemSize); + + /** + * Gets current sorted entries queue size in bytes. + * + * @return current sorted entries queue size in bytes. + */ + @MXBeanDescription("Current sorted entries set size in bytes.") + public long getCurrentMemorySize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java index e66b32d..ef8fc49 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java @@ -20,9 +20,9 @@ package org.apache.ignite.cache.query; import org.apache.ignite.internal.processors.cache.query.*; /** - * Cache query metrics used to obtain statistics on query. You can get metrics for - * particular query via {@link CacheQuery#metrics()} method or accumulated metrics - * for all queries via {@link GridCacheQueryManager#metrics()}. + * Cache query metrics used to obtain statistics on query. Metrics for particular query + * can be get via {@link CacheQuery#metrics()} method or aggregated metrics for all queries + * via {@link CacheQuery#metrics()}. */ public interface QueryMetrics { /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java index d018298..5bfdda1 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java @@ -94,6 +94,8 @@ public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> { * @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases * may bring cache transaction into {@link TransactionState#UNKNOWN} which will * consequently cause all transacted entries to be invalidated. + * @deprecated Use {@link CacheStoreSessionListener} instead (refer to its JavaDoc for details). */ + @Deprecated public void sessionEnd(boolean commit) throws CacheWriterException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java index 640d4a3..329e994 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java @@ -19,6 +19,7 @@ package org.apache.ignite.cache.store; import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -52,6 +53,27 @@ public interface CacheStoreSession { public boolean isWithinTransaction(); /** + * Attaches the given object to this session. + * <p> + * An attached object may later be retrieved via the {@link #attachment()} + * method. Invoking this method causes any previous attachment to be + * discarded. To attach additional objects use {@link #properties()} map. + * <p> + * The current attachment may be discarded by attaching {@code null}. + * + * @param attachment The object to be attached (or {@code null} to discard current attachment). + * @return Previously attached object, if any. + */ + @Nullable public <T> T attach(@Nullable Object attachment); + + /** + * Retrieves the current attachment or {@code null} if there is no attachment. + * + * @return Currently attached object, if any. + */ + @Nullable public <T> T attachment(); + + /** * Gets current session properties. You can add properties directly to the * returned map. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java new file mode 100644 index 0000000..1543bf9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.configuration.*; + +import javax.cache.configuration.*; +import javax.sql.*; + +/** + * Cache store session listener that allows to implement callbacks + * for session lifecycle. + * <p> + * The most common use case for session listeners is database + * connection and transaction management. Store can be invoked one + * or several times during one session, depending on whether it's + * executed within cache transaction or not. In any case, you have + * to create a connection when session is started and commit it or + * rollback when session is finished. + * <p> + * Cache store session listener allows to implement this and other + * scenarios providing to callback methods: + * <ul> + * <li> + * {@link #onSessionStart(CacheStoreSession)} - called + * before any store operation within a session is invoked. + * </li> + * <li> + * {@link #onSessionEnd(CacheStoreSession, boolean)} - called + * after all operations within a session are invoked. + * </li> + * </ul> + * <h2>Implementations</h2> + * Ignites provides several out-of-the-box implementations + * of session listener (refer to individual JavaDocs for more + * details): + * <ul> + * <li> + * {@link CacheJdbcStoreSessionListener} - JDBC-based session + * listener. For each session it gets a new JDBC connection from + * provided {@link DataSource} and commits (or rolls back) it + * when session ends. + * </li> + * <li> + * {@ignitelink org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListener} - + * session listener based on Spring transaction management. + * It starts a new DB transaction for each session and commits + * (or rolls back) it when session ends. If there is no ongoing + * cache transaction, this listener is no-op. + * </li> + * <li> + * {@ignitelink org.apache.ignite.cache.store.hibernate.CacheHibernateStoreSessionListener} - + * Hibernate-based session listener. It creates a new Hibernate + * session for each Ignite session. If there is an ongoing cache + * transaction, a corresponding Hibernate transaction is created + * as well. + * </li> + * </ul> + * <h2>Configuration</h2> + * There are two ways to configure a session listener: + * <ul> + * <li> + * Provide a global listener for all caches via + * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. This will we called for any store + * session, not depending on what caches participate in + * transaction. + * </li> + * <li> + * Provide a listener for a particular cache via + * {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. This will be called only if the + * cache participates in transaction. + * </li> + * </ul> + * For example, here is how global {@link CacheJdbcStoreSessionListener} + * can be configured in Spring XML configuration file: + * <pre name="code" class="xml"> + * <bean class="org.apache.ignite.configuration.IgniteConfiguration"> + * ... + * + * <property name="CacheStoreSessionListenerFactories"> + * <list> + * <bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory"> + * <constructor-arg> + * <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener"> + * <!-- Inject external data source. --> + * <property name="dataSource" ref="jdbc-data-source"/> + * </bean> + * </constructor-arg> + * </bean> + * </list> + * </property> + * </bean> + * </pre> + */ +public interface CacheStoreSessionListener { + /** + * On session start callback. + * <p> + * Called before any store operation within a session is invoked. + * + * @param ses Current session. + */ + public void onSessionStart(CacheStoreSession ses); + + /** + * On session end callback. + * <p> + * Called after all operations within a session are invoked. + * + * @param ses Current session. + * @param commit {@code True} if persistence store transaction + * should commit, {@code false} for rollback. + */ + public void onSessionEnd(CacheStoreSession ses, boolean commit); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java new file mode 100644 index 0000000..a20e535 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lifecycle.*; + +import javax.cache.*; +import javax.cache.integration.*; +import javax.sql.*; +import java.sql.*; + +/** + * Cache store session listener based on JDBC connection. + * <p> + * For each session this listener gets a new JDBC connection + * from provided {@link DataSource} and commits (or rolls + * back) it when session ends. + * <p> + * The connection is saved as a store session + * {@link CacheStoreSession#attachment() attachment}. + * The listener guarantees that the connection will be + * available for any store operation. If there is an + * ongoing cache transaction, all operations within this + * transaction will be committed or rolled back only when + * session ends. + * <p> + * As an example, here is how the {@link CacheStore#write(Cache.Entry)} + * method can be implemented if {@link CacheJdbcStoreSessionListener} + * is configured: + * <pre name="code" class="java"> + * private static class Store extends CacheStoreAdapter<Integer, Integer> { + * @CacheStoreSessionResource + * private CacheStoreSession ses; + * + * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException { + * // Get connection from the current session. + * Connection conn = ses.attachment(); + * + * // Execute update SQL query. + * try { + * conn.createStatement().executeUpdate("..."); + * } + * catch (SQLException e) { + * throw new CacheWriterException("Failed to update the store.", e); + * } + * } + * } + * </pre> + * JDBC connection will be automatically created by the listener + * at the start of the session and closed when it ends. + */ +public class CacheJdbcStoreSessionListener implements CacheStoreSessionListener, LifecycleAware { + /** Data source. */ + private DataSource dataSrc; + + /** + * Sets data source. + * <p> + * This is a required parameter. If data source is not set, + * exception will be thrown on startup. + * + * @param dataSrc Data source. + */ + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * Gets data source. + * + * @return Data source. + */ + public DataSource getDataSource() { + return dataSrc; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (dataSrc == null) + throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.'); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + if (ses.attachment() == null) { + try { + Connection conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + ses.attach(conn); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + Connection conn = ses.attach(null); + + if (conn != null) { + try { + if (commit) + conn.commit(); + else + conn.rollback(); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + finally { + U.closeQuiet(conn); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java index 9cb5d3d..85fd08a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java @@ -18,7 +18,9 @@ package org.apache.ignite.cluster; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.*; import org.jetbrains.annotations.*; import java.util.*; @@ -33,7 +35,7 @@ import java.util.*; * You can use cluster node attributes to provide static information about a node. * This information is initialized once within a cluster, during the node startup, and * remains the same throughout the lifetime of a node. Use - * {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method to initialize your custom + * {@link IgniteConfiguration#getUserAttributes()} method to initialize your custom * node attributes at startup. Here is an example of how to assign an attribute to a node at startup: * <pre name="code" class="xml"> * <bean class="org.apache.ignite.configuration.IgniteConfiguration"> @@ -114,7 +116,7 @@ public interface ClusterNode { /** * Gets a node attribute. Attributes are assigned to nodes at startup - * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method. + * via {@link IgniteConfiguration#getUserAttributes()} method. * <p> * The system adds the following attributes automatically: * <ul> @@ -149,7 +151,7 @@ public interface ClusterNode { /** * Gets all node attributes. Attributes are assigned to nodes at startup - * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method. + * via {@link IgniteConfiguration#getUserAttributes()} method. * <p> * The system adds the following attributes automatically: * <ul> @@ -167,7 +169,7 @@ public interface ClusterNode { /** * Gets collection of addresses this node is known by. * <p> - * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that + * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that * address for all communications and returned collection will contain only that address. * If it is {@code null} then local wildcard address will be used, and Ignite * will make the best effort to supply all addresses of that node in returned collection. @@ -179,12 +181,12 @@ public interface ClusterNode { /** * Gets collection of host names this node is known by. * <p> - * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use + * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use * the host name of that resolved address for all communications and * returned collection will contain only that host name. * If that host name can not be resolved then ip address returned by method {@link #addresses()} is used. * <p> - * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used, + * If {@link IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used, * and this method returns host names of all addresses of that node. * * @return Collection of host names. @@ -238,9 +240,17 @@ public interface ClusterNode { public boolean isDaemon(); /** - * Tests whether or not this node is a client node. + * Tests whether or not this node is connected to cluster as a client. + * <p> + * Do not confuse client in terms of + * discovery {@link DiscoverySpi#isClientMode()} and client in terms of cache + * {@link IgniteConfiguration#isClientMode()}. Cache clients cannot carry data, + * while topology clients connect to topology in a different way. * * @return {@code True} if this node is a client node, {@code false} otherwise. + * @see IgniteConfiguration#isClientMode() + * @see Ignition#isClientMode() + * @see DiscoverySpi#isClientMode() */ public boolean isClient(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index df6b2ee..1aa4fd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -25,7 +25,6 @@ import org.apache.ignite.cache.eviction.*; import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.*; @@ -145,9 +144,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Default value for 'readFromBackup' flag. */ public static final boolean DFLT_READ_FROM_BACKUP = true; - /** Filter that accepts only server nodes. */ - public static final IgnitePredicate<ClusterNode> SERVER_NODES = new IgniteServerNodePredicate(); - /** Filter that accepts all nodes. */ public static final IgnitePredicate<ClusterNode> ALL_NODES = new IgniteAllNodesPredicate(); @@ -316,6 +312,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Cache topology validator. */ private TopologyValidator topValidator; + /** Cache store session listeners. */ + private Factory<? extends CacheStoreSessionListener>[] storeSesLsnrs; + /** Empty constructor (all values are initialized to their defaults). */ public CacheConfiguration() { /* No-op. */ @@ -389,6 +388,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { sqlOnheapRowCacheSize = cc.getSqlOnheapRowCacheSize(); startSize = cc.getStartSize(); storeFactory = cc.getCacheStoreFactory(); + storeSesLsnrs = cc.getCacheStoreSessionListenerFactories(); swapEnabled = cc.isSwapEnabled(); tmLookupClsName = cc.getTransactionManagerLookupClassName(); topValidator = cc.getTopologyValidator(); @@ -1664,7 +1664,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0, "Number of indexed types is expected to be even. Refer to method javadoc for details."); - this.indexedTypes = indexedTypes; + if (indexedTypes != null) { + int len = indexedTypes.length; + + Class<?>[] newIndexedTypes = new Class<?>[len]; + + for (int i = 0; i < len; i++) + newIndexedTypes[i] = U.box(indexedTypes[i]); + + this.indexedTypes = newIndexedTypes; + } + else + this.indexedTypes = null; return this; } @@ -1734,30 +1745,37 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { return this; } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheConfiguration.class, this); + /** + * Gets cache store session listener factories. + * + * @return Cache store session listener factories. + * @see CacheStoreSessionListener + */ + public Factory<? extends CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() { + return storeSesLsnrs; } /** - * Filter that accepts only server nodes. + * Cache store session listener factories. + * <p> + * These listeners override global listeners provided in + * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. + * + * @param storeSesLsnrs Cache store session listener factories. + * @return {@code this} for chaining. + * @see CacheStoreSessionListener */ - public static class IgniteServerNodePredicate implements IgnitePredicate<ClusterNode> { - /** */ - private static final long serialVersionUID = 0L; - - @Override public boolean apply(ClusterNode n) { - Boolean attr = n.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); - - return attr != null && !attr; - } + public CacheConfiguration setCacheStoreSessionListenerFactories( + Factory<? extends CacheStoreSessionListener>... storeSesLsnrs) { + this.storeSesLsnrs = storeSesLsnrs; - @Override public boolean equals(Object obj) { - if (obj == null) - return false; + return this; + } - return obj.getClass().equals(this.getClass()); - } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheConfiguration.class, this); } /** @@ -1767,10 +1785,12 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** */ private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public boolean apply(ClusterNode clusterNode) { return true; } + /** {@inheritDoc} */ @Override public boolean equals(Object obj) { if (obj == null) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index ebe2b8e..2d36c7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -18,6 +18,7 @@ package org.apache.ignite.configuration; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.events.*; @@ -52,6 +53,7 @@ import org.apache.ignite.spi.loadbalancing.roundrobin.*; import org.apache.ignite.spi.swapspace.*; import org.apache.ignite.spi.swapspace.file.*; +import javax.cache.configuration.*; import javax.cache.event.*; import javax.cache.expiry.*; import javax.cache.integration.*; @@ -334,9 +336,6 @@ public class IgniteConfiguration { /** Cache configurations. */ private CacheConfiguration[] cacheCfg; - /** Client cache configurations. */ - private NearCacheConfiguration[] nearCacheCfg; - /** Client mode flag. */ private Boolean clientMode; @@ -398,6 +397,9 @@ public class IgniteConfiguration { /** User's class loader. */ private ClassLoader classLdr; + /** Cache store session listeners. */ + private Factory<CacheStoreSessionListener>[] storeSesLsnrs; + /** * Creates valid grid configuration with all default values. */ @@ -478,6 +480,7 @@ public class IgniteConfiguration { segResolvers = cfg.getSegmentationResolvers(); sndRetryCnt = cfg.getNetworkSendRetryCount(); sndRetryDelay = cfg.getNetworkSendRetryDelay(); + storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories(); svcCfgs = cfg.getServiceConfiguration(); sysPoolSize = cfg.getSystemThreadPoolSize(); timeSrvPortBase = cfg.getTimeServerPortBase(); @@ -1823,9 +1826,11 @@ public class IgniteConfiguration { } /** - * Gets client mode flag. + * Gets client mode flag. Client node cannot hold data in the caches. It's recommended to use + * {@link DiscoverySpi} in client mode if this property is {@code true}. * * @return Client mode flag. + * @see TcpDiscoverySpi#setForceServerMode(boolean) */ public Boolean isClientMode() { return clientMode; @@ -2188,15 +2193,21 @@ public class IgniteConfiguration { } /** + * Gets plugin configurations. + * * @return Plugin configurations. + * @see PluginProvider */ public PluginConfiguration[] getPluginConfigurations() { return pluginCfgs; } /** + * Sets plugin configurations. + * * @param pluginCfgs Plugin configurations. * @return {@code this} for chaining. + * @see PluginProvider */ public IgniteConfiguration setPluginConfigurations(PluginConfiguration... pluginCfgs) { this.pluginCfgs = pluginCfgs; @@ -2242,6 +2253,35 @@ public class IgniteConfiguration { return classLdr; } + /** + * Gets cache store session listener factories. + * + * @return Cache store session listener factories. + * @see CacheStoreSessionListener + */ + public Factory<CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() { + return storeSesLsnrs; + } + + /** + * Cache store session listener factories. + * <p> + * These are global store session listeners, so they are applied to + * all caches. If you need to override listeners for a + * particular cache, use {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. + * + * @param storeSesLsnrs Cache store session listener factories. + * @return {@code this} for chaining. + * @see CacheStoreSessionListener + */ + public IgniteConfiguration setCacheStoreSessionListenerFactories( + Factory<CacheStoreSessionListener>... storeSesLsnrs) { + this.storeSesLsnrs = storeSesLsnrs; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java new file mode 100644 index 0000000..5a65bdb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.concurrent.*; + +/** + * Provides ability to execute IGFS code in a context of a specific user. + */ +public abstract class IgfsUserContext { + /** Thread local to hold the current user context. */ + private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>(); + + /** + * Executes given callable in the given user context. + * The main contract of this method is that {@link #currentUser()} method invoked + * inside closure always returns 'user' this callable executed with. + * @param user the user name to invoke closure on behalf of. + * @param clo the closure to execute + * @param <T> The type of closure result. + * @return the result of closure execution. + * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. + */ + public static <T> T doAs(String user, final IgniteOutClosure<T> clo) { + if (F.isEmpty(user)) + throw new IllegalArgumentException("Failed to use null or empty user name."); + + final String ctxUser = userStackThreadLocal.get(); + + if (F.eq(ctxUser, user)) + return clo.apply(); // correct context is already there + + userStackThreadLocal.set(user); + + try { + return clo.apply(); + } + finally { + userStackThreadLocal.set(ctxUser); + } + } + + /** + * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts + * callable that throws checked Exception. + * The Exception is not ever wrapped anyhow. + * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is: + * <pre name="code" class="java"> + * public Foo myOperation() throws MyCheckedException1, MyCheckedException2 { + * try { + * return IgfsUserContext.doAs(user, new Callable<Foo>() { + * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 { + * return makeSomeFoo(); // do the job + * } + * }); + * } + * catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) { + * throw e; + * } + * catch (Exception e) { + * throw new AssertionError("Must never go there."); + * } + * } + * </pre> + * @param user the user name to invoke closure on behalf of. + * @param clbl the Callable to execute + * @param <T> The type of callable result. + * @return the result of closure execution. + * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. + */ + public static <T> T doAs(String user, final Callable<T> clbl) throws Exception { + if (F.isEmpty(user)) + throw new IllegalArgumentException("Failed to use null or empty user name."); + + final String ctxUser = userStackThreadLocal.get(); + + if (F.eq(ctxUser, user)) + return clbl.call(); // correct context is already there + + userStackThreadLocal.set(user); + + try { + return clbl.call(); + } + finally { + userStackThreadLocal.set(ctxUser); + } + } + + /** + * Gets the current context user. + * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will + * return null. Otherwise it will return the user name set in the most lower + * {@link #doAs(String, IgniteOutClosure)} call on the call stack. + * @return The current user, may be null. + */ + @Nullable public static String currentUser() { + return userStackThreadLocal.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java index 9026eac..cb69352 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java @@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem { * @return Map of properties. */ public Map<String,String> properties(); + + + /** + * Closes the secondary file system. + * @throws IgniteException in case of an error. + */ + public void close() throws IgniteException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java index 4d5d146..6da45ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java @@ -1247,6 +1247,20 @@ public class ClusterMetricsSnapshot implements ClusterMetrics { /** * Serializes node metrics into byte array. * + * @param metrics Node metrics to serialize. + * @return New offset. + */ + public static byte[] serialize(ClusterMetrics metrics) { + byte[] buf = new byte[METRICS_SIZE]; + + serialize(buf, 0, metrics); + + return buf; + } + + /** + * Serializes node metrics into byte array. + * * @param data Byte array. * @param off Offset into byte array. * @param metrics Node metrics to serialize. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 505204d..f33fa39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -20,9 +20,9 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.interop.*; import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.continuous.*; @@ -131,41 +131,91 @@ class GridEventConsumeHandler implements GridContinuousHandler { final boolean loc = nodeId.equals(ctx.localNodeId()); lsnr = new GridLocalEventListener() { + /** node ID, routine ID, event */ + private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>(); + + private boolean notificationInProgress; + @Override public void onEvent(Event evt) { - if (filter == null || filter.apply(evt)) { - if (loc) { - if (!cb.apply(nodeId, evt)) - ctx.continuous().stopRoutine(routineId); - } - else { - GridDiscoveryManager disco = ctx.discovery(); + if (filter != null && !filter.apply(evt)) + return; + + if (loc) { + if (!cb.apply(nodeId, evt)) + ctx.continuous().stopRoutine(routineId); + } + else { + if (ctx.discovery().node(nodeId) == null) + return; + + synchronized (notificationQueue) { + notificationQueue.add(new T3<>(nodeId, routineId, evt)); + + if (!notificationInProgress) { + ctx.getSystemExecutorService().submit(new Runnable() { + @Override public void run() { + if (!ctx.continuous().lockStopping()) + return; - ClusterNode node = disco.node(nodeId); + try { + while (true) { + T3<UUID, UUID, Event> t3; - if (node != null) { - try { - EventWrapper wrapper = new EventWrapper(evt); + synchronized (notificationQueue) { + t3 = notificationQueue.poll(); - if (evt instanceof CacheEvent) { - String cacheName = ((CacheEvent)evt).cacheName(); + if (t3 == null) { + notificationInProgress = false; - if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) { - wrapper.p2pMarshal(ctx.config().getMarshaller()); + return; + } + } - wrapper.cacheName = cacheName; + try { + Event evt = t3.get3(); - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); + EventWrapper wrapper = new EventWrapper(evt); - depMgr.prepare(wrapper); + if (evt instanceof CacheEvent) { + String cacheName = ((CacheEvent)evt).cacheName(); + + ClusterNode node = ctx.discovery().node(t3.get1()); + + if (node == null) + continue; + + if (ctx.config().isPeerClassLoadingEnabled() + && ctx.discovery().cacheNode(node, cacheName)) { + wrapper.p2pMarshal(ctx.config().getMarshaller()); + + wrapper.cacheName = cacheName; + + GridCacheDeploymentManager depMgr = ctx.cache() + .internalCache(cacheName).context().deploy(); + + depMgr.prepare(wrapper); + } + } + + ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false, + false); + } + catch (ClusterTopologyCheckedException ignored) { + // No-op. + } + catch (Throwable e) { + U.error(ctx.log(GridEventConsumeHandler.class), + "Failed to send event notification to node: " + nodeId, e); + } + } + } + finally { + ctx.continuous().unlockStopping(); } } + }); - ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false); - } - catch (IgniteCheckedException e) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e); - } + notificationInProgress = true; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index ad7d562..d6542f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -552,4 +552,9 @@ public interface GridKernalContext extends Iterable<GridComponent> { * @return Marshaller context. */ public MarshallerContextImpl marshallerContext(); + + /** + * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). + */ + public boolean clientNode(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 1ff483e..f921d49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -894,6 +894,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public boolean clientNode() { + return cfg.isClientMode(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index c4b93b8..4f5e365 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.datastructures.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.job.*; import org.apache.ignite.internal.processors.jobmetrics.*; +import org.apache.ignite.internal.processors.nodevalidation.*; import org.apache.ignite.internal.processors.offheap.*; import org.apache.ignite.internal.processors.plugin.*; import org.apache.ignite.internal.processors.port.*; @@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.processors.segmentation.*; import org.apache.ignite.internal.processors.service.*; import org.apache.ignite.internal.processors.session.*; -import org.apache.ignite.internal.processors.nodevalidation.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -169,11 +169,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** */ @GridToStringExclude - private Timer starveTimer; + private GridTimeoutProcessor.CancelableTask starveTask; /** */ @GridToStringExclude - private Timer metricsLogTimer; + private GridTimeoutProcessor.CancelableTask metricsLogTask; /** Indicate error on grid stop. */ @GridToStringExclude @@ -867,13 +867,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (starveCheck) { final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr); - starveTimer = new Timer("ignite-starvation-checker"); - - starveTimer.scheduleAtFixedRate(new GridTimerTask() { + starveTask = ctx.timeout().schedule(new Runnable() { /** Last completed task count. */ private long lastCompletedCnt; - @Override protected void safeRun() { + @Override public void run() { if (!(execSvc instanceof ThreadPoolExecutor)) return; @@ -896,13 +894,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { long metricsLogFreq = cfg.getMetricsLogFrequency(); if (metricsLogFreq > 0) { - metricsLogTimer = new Timer("ignite-metrics-logger"); - - metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() { - /** */ + metricsLogTask = ctx.timeout().schedule(new Runnable() { private final DecimalFormat dblFmt = new DecimalFormat("#.##"); - @Override protected void safeRun() { + @Override public void run() { if (log.isInfoEnabled()) { ClusterMetrics m = cluster().localNode().metrics(); @@ -963,8 +958,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { sysPoolQSize = exec.getQueue().size(); } + String id = U.id8(localNode().id()); + String msg = NL + "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL + + " ^-- Node [id=" + id + ", name=" + name() + "]" + NL + " ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL + " ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" + dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL + @@ -1165,6 +1163,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { add(ATTR_CLIENT_MODE, cfg.isClientMode()); + add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)); + // Build a string from JVM arguments, because parameters with spaces are split. SB jvmArgs = new SB(512); @@ -1550,7 +1550,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ">>> Grid name: " + gridName + NL + ">>> Local node [" + "ID=" + locNode.id().toString().toUpperCase() + - ", order=" + locNode.order() + + ", order=" + locNode.order() + ", clientMode=" + ctx.clientNode() + "]" + NL + ">>> Local node addresses: " + U.addressesAsString(locNode) + NL + ">>> Local ports: " + sb + NL; @@ -1713,12 +1713,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (updateNtfTimer != null) updateNtfTimer.cancel(); - if (starveTimer != null) - starveTimer.cancel(); + if (starveTask != null) + starveTask.close(); - // Cancel metrics log timer. - if (metricsLogTimer != null) - metricsLogTimer.cancel(); + if (metricsLogTask != null) + metricsLogTask.close(); boolean interrupted = false; @@ -2370,7 +2369,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true).get(); - return ctx.cache().publicJCache(cacheName); + IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName); + + checkNearCacheStarted(cache); + + return cache; } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -2397,7 +2400,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get(); } - return ctx.cache().publicJCache(cacheName); + IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName); + + checkNearCacheStarted(cache); + + return cache; } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -2407,6 +2414,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } + /** + * @param cache Cache. + */ + private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) { + if (!cache.context().isNear()) + throw new IgniteException("Failed to start near cache " + + "(a cache with the same name without near cache is already started)"); + } + /** {@inheritDoc} */ @Override public void destroyCache(String cacheName) { guard(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 98cc3a7..928db5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -126,9 +126,12 @@ public final class IgniteNodeAttributes { /** Security subject for authenticated node. */ public static final String ATTR_SECURITY_SUBJECT = ATTR_PREFIX + ".security.subject"; - /** Cache interceptors. */ + /** Client mode flag. */ public static final String ATTR_CLIENT_MODE = ATTR_PREFIX + ".cache.client"; + /** Configuration consistency check disabled flag. */ + public static final String ATTR_CONSISTENCY_CHECK_SKIPPED = ATTR_PREFIX + ".consistency.check.skipped"; + /** * Enforces singleton. */