http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 0d87326..799aace 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -30,21 +30,21 @@ public interface CacheMetrics { /** * The number of get requests that were satisfied by the cache. * - * @return the number of hits + * @return The number of hits. */ public long getCacheHits(); /** * This is a measure of cache efficiency. * - * @return the percentage of successful hits, as a decimal e.g 75. + * @return The percentage of successful hits, as a decimal e.g 75. */ public float getCacheHitPercentage(); /** * A miss is a get request that is not satisfied. * - * @return the number of misses + * @return The number of misses. */ public long getCacheMisses(); @@ -52,7 +52,7 @@ public interface CacheMetrics { * Returns the percentage of cache accesses that did not find a requested entry * in the cache. * - * @return the percentage of accesses that failed to find anything + * @return The percentage of accesses that failed to find anything. */ public float getCacheMissPercentage(); @@ -60,14 +60,14 @@ public interface CacheMetrics { * The total number of requests to the cache. This will be equal to the sum of * the hits and misses. * - * @return the number of gets + * @return The number of gets. */ public long getCacheGets(); /** * The total number of puts to the cache. * - * @return the number of puts + * @return The number of puts. */ public long getCachePuts(); @@ -75,7 +75,7 @@ public interface CacheMetrics { * The total number of removals from the cache. This does not include evictions, * where the cache itself initiates the removal to make space. * - * @return the number of removals + * @return The number of removals. */ public long getCacheRemovals(); @@ -84,28 +84,28 @@ public interface CacheMetrics { * initiated by the cache itself to free up space. An eviction is not treated as * a removal and does not appear in the removal counts. * - * @return the number of evictions + * @return The number of evictions. */ public long getCacheEvictions(); /** * The mean time to execute gets. * - * @return the time in µs + * @return The time in µs. */ public float getAverageGetTime(); /** * The mean time to execute puts. * - * @return the time in µs + * @return The time in µs. */ public float getAveragePutTime(); /** * The mean time to execute removes. * - * @return the time in µs + * @return The time in µs. */ public float getAverageRemoveTime(); @@ -113,7 +113,7 @@ public interface CacheMetrics { /** * The mean time to execute tx commit. * - * @return the time in µs + * @return The time in µs. */ public float getAverageTxCommitTime(); @@ -124,7 +124,6 @@ public interface CacheMetrics { */ public float getAverageTxRollbackTime(); - /** * Gets total number of transaction commits. * @@ -154,6 +153,62 @@ public interface CacheMetrics { public long getOverflowSize(); /** + * The total number of get requests to the off-heap memory. + * + * @return The number of gets. + */ + public long getOffHeapGets(); + + /** + * The total number of put requests to the off-heap memory. + * + * @return The number of puts. + */ + public long getOffHeapPuts(); + + /** + * The total number of removals from the off-heap memory. This does not include evictions. + * + * @return The number of removals. + */ + public long getOffHeapRemovals(); + + /** + * The total number of evictions from the off-heap memory. + * + * @return The number of evictions. + */ + public long getOffHeapEvictions(); + + /** + * The number of get requests that were satisfied by the off-heap memory. + * + * @return The off-heap hits number. + */ + public long getOffHeapHits(); + + /** + * Gets the percentage of hits on off-heap memory. + * + * @return The percentage of hits on off-heap memory. + */ + public float getOffHeapHitPercentage(); + + /** + * A miss is a get request that is not satisfied by off-heap memory. + * + * @return The off-heap misses number. + */ + public long getOffHeapMisses(); + + /** + * Gets the percentage of misses on off-heap memory. + * + * @return The percentage of misses on off-heap memory. + */ + public float getOffHeapMissPercentage(); + + /** * Gets number of entries stored in off-heap memory. * * @return Number of entries stored in off-heap memory. @@ -161,6 +216,20 @@ public interface CacheMetrics { public long getOffHeapEntriesCount(); /** + * Gets number of primary entries stored in off-heap memory. + * + * @return Number of primary entries stored in off-heap memory. + */ + public long getOffHeapPrimaryEntriesCount(); + + /** + * Gets number of backup entries stored in off-heap memory. + * + * @return Number of backup entries stored in off-heap memory. + */ + public long getOffHeapBackupEntriesCount(); + + /** * Gets memory size allocated in off-heap. * * @return Memory size allocated in off-heap. @@ -168,6 +237,76 @@ public interface CacheMetrics { public long getOffHeapAllocatedSize(); /** + * Gets off-heap memory maximum size. + * + * @return Off-heap memory maximum size. + */ + public long getOffHeapMaxSize(); + + /** + * The total number of get requests to the swap. + * + * @return The number of gets. + */ + public long getSwapGets(); + + /** + * The total number of put requests to the swap. + * + * @return The number of puts. + */ + public long getSwapPuts(); + + /** + * The total number of removals from the swap. + * + * @return The number of removals. + */ + public long getSwapRemovals(); + + /** + * The number of get requests that were satisfied by the swap. + * + * @return The swap hits number. + */ + public long getSwapHits(); + + /** + * A miss is a get request that is not satisfied by swap. + * + * @return The swap misses number. + */ + public long getSwapMisses(); + + /** + * Gets number of entries stored in swap. + * + * @return Number of entries stored in swap. + */ + public long getSwapEntriesCount(); + + /** + * Gets size of swap. + * + * @return Size of swap. + */ + public long getSwapSize(); + + /** + * Gets the percentage of hits on swap. + * + * @return The percentage of hits on swap. + */ + public float getSwapHitPercentage(); + + /** + * Gets the percentage of misses on swap. + * + * @return The percentage of misses on swap. + */ + public float getSwapMissPercentage(); + + /** * Gets number of non-{@code null} values in the cache. * * @return Number of non-{@code null} values in the cache. @@ -184,7 +323,7 @@ public interface CacheMetrics { /** * Returns {@code true} if this cache is empty. * - * @return {@code true} if this cache is empty. + * @return {@code True} if this cache is empty. */ public boolean isEmpty(); @@ -294,7 +433,7 @@ public interface CacheMetrics { public int getTxDhtRolledbackVersionsSize(); /** - * Returns {@code True} if write-behind is enabled. + * Returns {@code true} if write-behind is enabled. * * @return {@code True} if write-behind is enabled. */ @@ -372,16 +511,16 @@ public interface CacheMetrics { /** * Determines the required type of keys for this {@link Cache}, if any. * - * @return the fully qualified class name of the key type, - * or "java.lang.Object" if the type is undefined. + * @return The fully qualified class name of the key type, + * or {@code "java.lang.Object"} if the type is undefined. */ public String getKeyType(); /** * Determines the required type of values for this {@link Cache}, if any. * - * @return the fully qualified class name of the value type, - * or "java.lang.Object" if the type is undefined. + * @return The fully qualified class name of the value type, + * or {@code "java.lang.Object"} if the type is undefined. */ public String getValueType(); @@ -407,7 +546,7 @@ public interface CacheMetrics { * <p> * The default value is {@code true}. * - * @return true if the cache is store by value + * @return {@code True} if the cache is store by value. */ public boolean isStoreByValue(); @@ -416,7 +555,7 @@ public interface CacheMetrics { * <p> * The default value is {@code false}. * - * @return true if statistics collection is enabled + * @return {@code True} if statistics collection is enabled. */ public boolean isStatisticsEnabled(); @@ -425,7 +564,7 @@ public interface CacheMetrics { * <p> * The default value is {@code false}. * - * @return true if management is enabled + * @return {@code true} if management is enabled. */ public boolean isManagementEnabled(); @@ -434,7 +573,7 @@ public interface CacheMetrics { * <p> * The default value is {@code false} * - * @return {@code true} when a {@link Cache} is in + * @return {@code True} when a {@link Cache} is in * "read-through" mode. * @see CacheLoader */ @@ -448,7 +587,7 @@ public interface CacheMetrics { * <p> * The default value is {@code false} * - * @return {@code true} when a {@link Cache} is in "write-through" mode. + * @return {@code True} when a {@link Cache} is in "write-through" mode. * @see CacheWriter */ public boolean isWriteThrough();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java index d87109f..9f1889a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java @@ -46,6 +46,13 @@ public interface EvictableEntry<K, V> extends Cache.Entry<K, V> { public boolean isCached(); /** + * Returns entry size in bytes. + * + * @return entry size in bytes. + */ + public int size(); + + /** * Gets metadata added by eviction policy. * * @return Metadata value or {@code null}. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java index f409e9b..07c269d 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java @@ -20,6 +20,7 @@ package org.apache.ignite.cache.eviction; import org.apache.ignite.cache.eviction.fifo.*; import org.apache.ignite.cache.eviction.lru.*; import org.apache.ignite.cache.eviction.random.*; +import org.apache.ignite.cache.eviction.sorted.*; /** * Pluggable cache eviction policy. Usually, implementations will internally order @@ -32,6 +33,7 @@ import org.apache.ignite.cache.eviction.random.*; * <li>{@link LruEvictionPolicy}</li> * <li>{@link RandomEvictionPolicy}</li> * <li>{@link FifoEvictionPolicy}</li> + * <li>{@link SortedEvictionPolicy}</li> * </ul> * <p> * The eviction policy thread-safety is ensured by Ignition. Implementations of this interface should http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java index bf8cf0d..221bc39 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java @@ -18,18 +18,28 @@ package org.apache.ignite.cache.eviction.fifo; import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.jsr166.*; import org.jsr166.ConcurrentLinkedDeque8.*; import java.io.*; import java.util.*; +import static org.apache.ignite.configuration.CacheConfiguration.*; + /** * Eviction policy based on {@code First In First Out (FIFO)} algorithm and supports batch eviction. * <p> - * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size. + * The eviction starts in the following cases: + * <ul> + * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li> + * <li> + * The size of cache entries in bytes becomes greater than the maximum memory size. + * The size of cache entry calculates as sum of key size and value size. + * </li> + * </ul> + * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}). * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}. * <p> * This implementation is very efficient since it does not create any additional @@ -41,11 +51,17 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict private static final long serialVersionUID = 0L; /** Maximum size. */ - private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; + private volatile int max = DFLT_CACHE_SIZE; /** Batch size. */ private volatile int batchSize = 1; + /** Max memory size. */ + private volatile long maxMemSize; + + /** Memory size. */ + private final LongAdder8 memSize = new LongAdder8(); + /** FIFO queue. */ private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue = new ConcurrentLinkedDeque8<>(); @@ -63,7 +79,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict * @param max Maximum allowed size of cache before entry will start getting evicted. */ public FifoEvictionPolicy(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } @@ -75,7 +91,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict * @param batchSize Batch size. */ public FifoEvictionPolicy(int max, int batchSize) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); A.ensure(batchSize > 0, "batchSize > 0"); this.max = max; @@ -97,7 +113,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict * @param max Maximum allowed size of cache before entry will start getting evicted. */ @Override public void setMaxSize(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } @@ -119,6 +135,23 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict return queue.size(); } + /** {@inheritDoc} */ + @Override public long getMaxMemorySize() { + return maxMemSize; + } + + /** {@inheritDoc} */ + @Override public void setMaxMemorySize(long maxMemSize) { + A.ensure(maxMemSize >= 0, "maxMemSize >= 0"); + + this.maxMemSize = maxMemSize; + } + + /** {@inheritDoc} */ + @Override public long getCurrentMemorySize() { + return memSize.longValue(); + } + /** * Gets read-only view on internal {@code FIFO} queue in proper order. * @@ -141,8 +174,11 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict else { Node<EvictableEntry<K, V>> node = entry.removeMeta(); - if (node != null) + if (node != null) { queue.unlinkx(node); + + memSize.add(-entry.size()); + } } } @@ -173,11 +209,18 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict return false; } + memSize.add(entry.size()); + return true; } // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle. else if (!entry.removeMeta(node)) return false; + else { + memSize.add(-entry.size()); + + return true; + } } } @@ -189,38 +232,74 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict * Shrinks FIFO queue to maximum allowed size. */ private void shrink() { + long maxMem = this.maxMemSize; + + if (maxMem > 0) { + long startMemSize = memSize.longValue(); + + if (startMemSize >= maxMem) + for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) { + int size = shrink0(); + + if (size == -1) + break; + + i += size; + } + } + int max = this.max; - int batchSize = this.batchSize; + if (max > 0) { + int startSize = queue.sizex(); + + // Shrink only if queue is full. + if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize)) + for (int i = max; i < startSize && queue.sizex() > max; i++) + if (shrink0() == -1) + break; + } + } - int startSize = queue.sizex(); + /** + * Tries to remove one item from queue. + * + * @return number of bytes that was free. {@code -1} if queue is empty. + */ + private int shrink0() { + EvictableEntry<K, V> entry = queue.poll(); - // Shrink only if queue is full. - if (startSize >= max + batchSize) { - for (int i = max; i < startSize && queue.sizex() > max; i++) { - EvictableEntry<K, V> entry = queue.poll(); + if (entry == null) + return -1; - if (entry == null) - break; + int size = 0; - Node<EvictableEntry<K, V>> meta = entry.removeMeta(); + Node<EvictableEntry<K, V>> meta = entry.removeMeta(); - if (meta != null && !entry.evict()) - touch(entry); - } + if (meta != null) { + size = entry.size(); + + memSize.add(-size); + + if (!entry.evict()) + touch(entry); } + + return size; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(max); out.writeInt(batchSize); + out.writeLong(maxMemSize); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { max = in.readInt(); batchSize = in.readInt(); + maxMemSize = in.readLong(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java index 63a413e..793aa66 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java @@ -63,4 +63,26 @@ public interface FifoEvictionPolicyMBean { */ @MXBeanDescription("Current FIFO queue size.") public int getCurrentSize(); + + /** + * Gets maximum allowed cache size in bytes. + * + * @return maximum allowed cache size in bytes. + */ + @MXBeanDescription("Maximum allowed cache size in bytes.") + public long getMaxMemorySize(); + + /** + * Sets maximum allowed cache size in bytes. + */ + @MXBeanDescription("Set maximum allowed cache size in bytes.") + public void setMaxMemorySize(long maxMemSize); + + /** + * Gets current queue size in bytes. + * + * @return current queue size in bytes. + */ + @MXBeanDescription("Current FIFO queue size in bytes.") + public long getCurrentMemorySize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java index 309d577..0be26c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java @@ -18,26 +18,48 @@ package org.apache.ignite.cache.eviction.lru; import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.jsr166.*; import org.jsr166.ConcurrentLinkedDeque8.*; import java.io.*; import java.util.*; +import static org.apache.ignite.configuration.CacheConfiguration.*; + /** - * Eviction policy based on {@code Least Recently Used (LRU)} algorithm. This - * implementation is very efficient since it is lock-free and does not - * create any additional table-like data structures. The {@code LRU} ordering - * information is maintained by attaching ordering metadata to cache entries. + * Eviction policy based on {@code Least Recently Used (LRU)} algorithm and supports batch eviction. + * <p> + * The eviction starts in the following cases: + * <ul> + * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li> + * <li> + * The size of cache entries in bytes becomes greater than the maximum memory size. + * The size of cache entry calculates as sum of key size and value size. + * </li> + * </ul> + * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}). + * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}. + + * This implementation is very efficient since it is lock-free and does not create any additional table-like + * data structures. The {@code LRU} ordering information is maintained by attaching ordering metadata to cache entries. */ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictionPolicyMBean, Externalizable { /** */ private static final long serialVersionUID = 0L; /** Maximum size. */ - private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; + private volatile int max = DFLT_CACHE_SIZE; + + /** Batch size. */ + private volatile int batchSize = 1; + + /** Max memory size. */ + private volatile long maxMemSize; + + /** Memory size. */ + private final LongAdder8 memSize = new LongAdder8(); /** Queue. */ private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue = @@ -56,7 +78,7 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio * @param max Maximum allowed size of cache before entry will start getting evicted. */ public LruEvictionPolicy(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } @@ -76,16 +98,45 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio * @param max Maximum allowed size of cache before entry will start getting evicted. */ @Override public void setMaxSize(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } /** {@inheritDoc} */ + @Override public int getBatchSize() { + return batchSize; + } + + /** {@inheritDoc} */ + @Override public void setBatchSize(int batchSize) { + A.ensure(batchSize > 0, "batchSize > 0"); + + this.batchSize = batchSize; + } + + /** {@inheritDoc} */ @Override public int getCurrentSize() { return queue.size(); } + /** {@inheritDoc} */ + @Override public long getMaxMemorySize() { + return maxMemSize; + } + + /** {@inheritDoc} */ + @Override public void setMaxMemorySize(long maxMemSize) { + A.ensure(maxMemSize >= 0, "maxMemSize >= 0"); + + this.maxMemSize = maxMemSize; + } + + /** {@inheritDoc} */ + @Override public long getCurrentMemorySize() { + return memSize.longValue(); + } + /** * Gets read-only view on internal {@code FIFO} queue in proper order. * @@ -107,8 +158,11 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio else { Node<EvictableEntry<K, V>> node = entry.removeMeta(); - if (node != null) + if (node != null) { queue.unlinkx(node); + + memSize.add(-entry.size()); + } } } @@ -139,11 +193,18 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio return false; } + memSize.add(entry.size()); + return true; } // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle. else if (!entry.removeMeta(node)) return false; + else { + memSize.add(-entry.size()); + + return true; + } } } else if (queue.unlinkx(node)) { @@ -163,31 +224,73 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio * Shrinks queue to maximum allowed size. */ private void shrink() { + long maxMem = this.maxMemSize; + + if (maxMem > 0) { + long startMemSize = memSize.longValue(); + + if (startMemSize >= maxMem) + for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) { + int size = shrink0(); + + if (size == -1) + break; + + i += size; + } + } + int max = this.max; - int startSize = queue.sizex(); + if (max > 0) { + int startSize = queue.sizex(); - for (int i = 0; i < startSize && queue.sizex() > max; i++) { - EvictableEntry<K, V> entry = queue.poll(); + if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize)) + for (int i = max; i < startSize && queue.sizex() > max; i++) + if (shrink0() == -1) + break; + } + } - if (entry == null) - break; + /** + * Tries to remove one item from queue. + * + * @return number of bytes that was free. {@code -1} if queue is empty. + */ + private int shrink0() { + EvictableEntry<K, V> entry = queue.poll(); - Node<EvictableEntry<K, V>> meta = entry.removeMeta(); + if (entry == null) + return -1; - if (meta != null && !entry.evict()) + int size = 0; + + Node<EvictableEntry<K, V>> meta = entry.removeMeta(); + + if (meta != null) { + size = entry.size(); + + memSize.add(-size); + + if (!entry.evict()) touch(entry); } + + return size; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(max); + out.writeInt(batchSize); + out.writeLong(maxMemSize); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { max = in.readInt(); + batchSize = in.readInt(); + maxMemSize = in.readLong(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java index c243374..e17c057 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java @@ -41,10 +41,48 @@ public interface LruEvictionPolicyMBean { public void setMaxSize(int max); /** + * Gets batch size. + * + * @return batch size. + */ + @MXBeanDescription("Batch size.") + public int getBatchSize(); + + /** + * Sets batch size. + * + * @param batchSize Batch size. + */ + @MXBeanDescription("Set batch size.") + public void setBatchSize(int batchSize); + + /** * Gets current queue size. * * @return Current queue size. */ @MXBeanDescription("Current queue size.") public int getCurrentSize(); + + /** + * Gets maximum allowed cache size in bytes. + * + * @return maximum allowed cache size in bytes. + */ + @MXBeanDescription("Maximum allowed cache size in bytes.") + public long getMaxMemorySize(); + + /** + * Sets maximum allowed cache size in bytes. + */ + @MXBeanDescription("Set maximum allowed cache size in bytes.") + public void setMaxMemorySize(long maxMemSize); + + /** + * Gets current queue size in bytes. + * + * @return current queue size in bytes. + */ + @MXBeanDescription("Current queue size in bytes.") + public long getCurrentMemorySize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java index c88b31d..00a912f 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java @@ -18,20 +18,22 @@ package org.apache.ignite.cache.eviction.random; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.internal.*; import javax.cache.*; import java.io.*; +import static org.apache.ignite.configuration.CacheConfiguration.*; + /** * Cache eviction policy which will select random cache entry for eviction if cache * size exceeds the {@link #getMaxSize()} parameter. This implementation is * extremely light weight, lock-free, and does not create any data structures to maintain * any order for eviction. * <p> - * Random eviction will provide the best performance over any key set in which every + * Random eviction will provide the best performance over any key queue in which every * key has the same probability of being accessed. */ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomEvictionPolicyMBean, Externalizable { @@ -39,7 +41,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE private static final long serialVersionUID = 0L; /** Maximum size. */ - private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; + private volatile int max = DFLT_CACHE_SIZE; /** * Constructs random eviction policy with all defaults. @@ -87,7 +89,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE IgniteCache<K, V> cache = entry.unwrap(IgniteCache.class); - int size = cache.size(); + int size = cache.localSize(CachePeekMode.ONHEAP); for (int i = max; i < size; i++) { Cache.Entry<K, V> e = cache.randomEntry(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java index 7965c97..b8b82fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java @@ -34,7 +34,15 @@ import static org.apache.ignite.configuration.CacheConfiguration.*; /** * Cache eviction policy which will select the minimum cache entry for eviction. * <p> - * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size. + * The eviction starts in the following cases: + * <ul> + * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li> + * <li> + * The size of cache entries in bytes becomes greater than the maximum memory size. + * The size of cache entry calculates as sum of key size and value size. + * </li> + * </ul> + * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}). * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}. * <p> * Entries comparison based on {@link Comparator} instance if provided. @@ -48,18 +56,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE private static final long serialVersionUID = 0L; /** Maximum size. */ - private volatile int max; + private volatile int max = DFLT_CACHE_SIZE; /** Batch size. */ private volatile int batchSize = 1; + /** Max memory size. */ + private volatile long maxMemSize; + + /** Memory size. */ + private final LongAdder8 memSize = new LongAdder8(); + /** Comparator. */ private Comparator<Holder<K, V>> comp; /** Order. */ private final AtomicLong orderCnt = new AtomicLong(); - /** Backed sorted set. */ + /** Backed sorted queue. */ private final GridConcurrentSkipListSetEx<K, V> set; /** @@ -96,7 +110,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE * @param comp Entries comparator. */ public SortedEvictionPolicy(int max, int batchSize, @Nullable Comparator<EvictableEntry<K, V>> comp) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); A.ensure(batchSize > 0, "batchSize > 0"); this.max = max; @@ -106,6 +120,16 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE } /** + * Constructs sorted eviction policy with given maximum size and given entry comparator. + * + * @param comp Entries comparator. + */ + public SortedEvictionPolicy(@Nullable Comparator<EvictableEntry<K, V>> comp) { + this.comp = comp == null ? new DefaultHolderComparator<K, V>() : new HolderComparator<>(comp); + this.set = new GridConcurrentSkipListSetEx<>(this.comp); + } + + /** * Gets maximum allowed size of cache before entry will start getting evicted. * * @return Maximum allowed size of cache before entry will start getting evicted. @@ -120,7 +144,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE * @param max Maximum allowed size of cache before entry will start getting evicted. */ @Override public void setMaxSize(int max) { - A.ensure(max > 0, "max > 0"); + A.ensure(max >= 0, "max >= 0"); this.max = max; } @@ -142,12 +166,29 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE return set.sizex(); } + /** {@inheritDoc} */ + @Override public long getMaxMemorySize() { + return maxMemSize; + } + + /** {@inheritDoc} */ + @Override public void setMaxMemorySize(long maxMemSize) { + A.ensure(maxMemSize >= 0, "maxMemSize >= 0"); + + this.maxMemSize = maxMemSize; + } + + /** {@inheritDoc} */ + @Override public long getCurrentMemorySize() { + return memSize.longValue(); + } + /** - * Gets read-only view of backed set in proper order. + * Gets read-only view of backed queue in proper order. * - * @return Read-only view of backed set. + * @return Read-only view of backed queue. */ - public Collection<EvictableEntry<K, V>> set() { + public Collection<EvictableEntry<K, V>> queue() { Set<EvictableEntry<K, V>> cp = new LinkedHashSet<>(); for (Holder<K, V> holder : set) @@ -168,19 +209,22 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE else { Holder<K, V> holder = entry.removeMeta(); - if (holder != null) + if (holder != null) { removeHolder(holder); + + memSize.add(-entry.size()); + } } } /** * @param entry Entry to touch. - * @return {@code True} if backed set has been changed by this call. + * @return {@code True} if backed queue has been changed by this call. */ private boolean touch(EvictableEntry<K, V> entry) { Holder<K, V> holder = entry.meta(); - // Entry has not been add yet to backed set.. + // Entry has not been add yet to backed queue.. if (holder == null) { while (true) { holder = new Holder<>(entry, orderCnt.incrementAndGet()); @@ -188,7 +232,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE set.add(holder); if (entry.putMetaIfAbsent(holder) != null) { - // Was concurrently added, need to remove it from set. + // Was concurrently added, need to remove it from queue. removeHolder(holder); // Set has not been changed. @@ -196,17 +240,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE } else if (holder.order > 0) { if (!entry.isCached()) { - // Was concurrently evicted, need to remove it from set. + // Was concurrently evicted, need to remove it from queue. removeHolder(holder); return false; } + memSize.add(entry.size()); + return true; } // If holder was removed by concurrent shrink() call, we must repeat the whole cycle. else if (!entry.removeMeta(holder)) return false; + else { + memSize.add(-entry.size()); + + return true; + } } } @@ -215,34 +266,71 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE } /** - * Shrinks backed set to maximum allowed size. + * Shrinks backed queue to maximum allowed size. */ private void shrink() { - int max = this.max; + long maxMem = this.maxMemSize; + + if (maxMem > 0) { + long startMemSize = memSize.longValue(); - int batchSize = this.batchSize; + if (startMemSize >= maxMem) + for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) { + int size = shrink0(); - int startSize = set.sizex(); + if (size == -1) + break; - if (startSize >= max + batchSize) { - for (int i = max; i < startSize && set.sizex() > max; i++) { - Holder<K, V> h = set.pollFirst(); + i += size; + } + } - if (h == null) - break; + int max = this.max; - EvictableEntry<K, V> entry = h.entry; + if (max > 0) { + int startSize = set.sizex(); - if (h.order > 0 && entry.removeMeta(h) && !entry.evict()) - touch(entry); + if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize)) { + for (int i = max; i < startSize && set.sizex() > max; i++) { + if (shrink0() == -1) + break; + } } } } + /** + * Tries to remove one item from queue. + * + * @return number of bytes that was free. {@code -1} if queue is empty. + */ + private int shrink0() { + Holder<K, V> h = set.pollFirst(); + + if (h == null) + return -1; + + int size = 0; + + EvictableEntry<K, V> entry = h.entry; + + if (h.order > 0 && entry.removeMeta(h)) { + size = entry.size(); + + memSize.add(-size); + + if (!entry.evict()) + touch(entry); + } + + return size; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(max); out.writeInt(batchSize); + out.writeLong(maxMemSize); out.writeObject(comp); } @@ -251,11 +339,12 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { max = in.readInt(); batchSize = in.readInt(); + maxMemSize = in.readLong(); comp = (Comparator<Holder<K, V>>)in.readObject(); } /** - * Removes holder from backed set and marks holder as removed. + * Removes holder from backed queue and marks holder as removed. * * @param holder Holder. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java index bc696ff..7283453 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java @@ -63,4 +63,26 @@ public interface SortedEvictionPolicyMBean { */ @MXBeanDescription("Current sorted key set size.") public int getCurrentSize(); + + /** + * Gets maximum allowed cache size in bytes. + * + * @return maximum allowed cache size in bytes. + */ + @MXBeanDescription("Maximum allowed cache size in bytes.") + public long getMaxMemorySize(); + + /** + * Sets maximum allowed cache size in bytes. + */ + @MXBeanDescription("Set maximum allowed cache size in bytes.") + public void setMaxMemorySize(long maxMemSize); + + /** + * Gets current sorted entries queue size in bytes. + * + * @return current sorted entries queue size in bytes. + */ + @MXBeanDescription("Current sorted entries set size in bytes.") + public long getCurrentMemorySize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java index e66b32d..ef8fc49 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java @@ -20,9 +20,9 @@ package org.apache.ignite.cache.query; import org.apache.ignite.internal.processors.cache.query.*; /** - * Cache query metrics used to obtain statistics on query. You can get metrics for - * particular query via {@link CacheQuery#metrics()} method or accumulated metrics - * for all queries via {@link GridCacheQueryManager#metrics()}. + * Cache query metrics used to obtain statistics on query. Metrics for particular query + * can be get via {@link CacheQuery#metrics()} method or aggregated metrics for all queries + * via {@link CacheQuery#metrics()}. */ public interface QueryMetrics { /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java index d018298..5bfdda1 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java @@ -94,6 +94,8 @@ public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> { * @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases * may bring cache transaction into {@link TransactionState#UNKNOWN} which will * consequently cause all transacted entries to be invalidated. + * @deprecated Use {@link CacheStoreSessionListener} instead (refer to its JavaDoc for details). */ + @Deprecated public void sessionEnd(boolean commit) throws CacheWriterException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java index 640d4a3..329e994 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java @@ -19,6 +19,7 @@ package org.apache.ignite.cache.store; import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -52,6 +53,27 @@ public interface CacheStoreSession { public boolean isWithinTransaction(); /** + * Attaches the given object to this session. + * <p> + * An attached object may later be retrieved via the {@link #attachment()} + * method. Invoking this method causes any previous attachment to be + * discarded. To attach additional objects use {@link #properties()} map. + * <p> + * The current attachment may be discarded by attaching {@code null}. + * + * @param attachment The object to be attached (or {@code null} to discard current attachment). + * @return Previously attached object, if any. + */ + @Nullable public <T> T attach(@Nullable Object attachment); + + /** + * Retrieves the current attachment or {@code null} if there is no attachment. + * + * @return Currently attached object, if any. + */ + @Nullable public <T> T attachment(); + + /** * Gets current session properties. You can add properties directly to the * returned map. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java new file mode 100644 index 0000000..1543bf9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.configuration.*; + +import javax.cache.configuration.*; +import javax.sql.*; + +/** + * Cache store session listener that allows to implement callbacks + * for session lifecycle. + * <p> + * The most common use case for session listeners is database + * connection and transaction management. Store can be invoked one + * or several times during one session, depending on whether it's + * executed within cache transaction or not. In any case, you have + * to create a connection when session is started and commit it or + * rollback when session is finished. + * <p> + * Cache store session listener allows to implement this and other + * scenarios providing to callback methods: + * <ul> + * <li> + * {@link #onSessionStart(CacheStoreSession)} - called + * before any store operation within a session is invoked. + * </li> + * <li> + * {@link #onSessionEnd(CacheStoreSession, boolean)} - called + * after all operations within a session are invoked. + * </li> + * </ul> + * <h2>Implementations</h2> + * Ignites provides several out-of-the-box implementations + * of session listener (refer to individual JavaDocs for more + * details): + * <ul> + * <li> + * {@link CacheJdbcStoreSessionListener} - JDBC-based session + * listener. For each session it gets a new JDBC connection from + * provided {@link DataSource} and commits (or rolls back) it + * when session ends. + * </li> + * <li> + * {@ignitelink org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListener} - + * session listener based on Spring transaction management. + * It starts a new DB transaction for each session and commits + * (or rolls back) it when session ends. If there is no ongoing + * cache transaction, this listener is no-op. + * </li> + * <li> + * {@ignitelink org.apache.ignite.cache.store.hibernate.CacheHibernateStoreSessionListener} - + * Hibernate-based session listener. It creates a new Hibernate + * session for each Ignite session. If there is an ongoing cache + * transaction, a corresponding Hibernate transaction is created + * as well. + * </li> + * </ul> + * <h2>Configuration</h2> + * There are two ways to configure a session listener: + * <ul> + * <li> + * Provide a global listener for all caches via + * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. This will we called for any store + * session, not depending on what caches participate in + * transaction. + * </li> + * <li> + * Provide a listener for a particular cache via + * {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. This will be called only if the + * cache participates in transaction. + * </li> + * </ul> + * For example, here is how global {@link CacheJdbcStoreSessionListener} + * can be configured in Spring XML configuration file: + * <pre name="code" class="xml"> + * <bean class="org.apache.ignite.configuration.IgniteConfiguration"> + * ... + * + * <property name="CacheStoreSessionListenerFactories"> + * <list> + * <bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory"> + * <constructor-arg> + * <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener"> + * <!-- Inject external data source. --> + * <property name="dataSource" ref="jdbc-data-source"/> + * </bean> + * </constructor-arg> + * </bean> + * </list> + * </property> + * </bean> + * </pre> + */ +public interface CacheStoreSessionListener { + /** + * On session start callback. + * <p> + * Called before any store operation within a session is invoked. + * + * @param ses Current session. + */ + public void onSessionStart(CacheStoreSession ses); + + /** + * On session end callback. + * <p> + * Called after all operations within a session are invoked. + * + * @param ses Current session. + * @param commit {@code True} if persistence store transaction + * should commit, {@code false} for rollback. + */ + public void onSessionEnd(CacheStoreSession ses, boolean commit); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java new file mode 100644 index 0000000..a20e535 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lifecycle.*; + +import javax.cache.*; +import javax.cache.integration.*; +import javax.sql.*; +import java.sql.*; + +/** + * Cache store session listener based on JDBC connection. + * <p> + * For each session this listener gets a new JDBC connection + * from provided {@link DataSource} and commits (or rolls + * back) it when session ends. + * <p> + * The connection is saved as a store session + * {@link CacheStoreSession#attachment() attachment}. + * The listener guarantees that the connection will be + * available for any store operation. If there is an + * ongoing cache transaction, all operations within this + * transaction will be committed or rolled back only when + * session ends. + * <p> + * As an example, here is how the {@link CacheStore#write(Cache.Entry)} + * method can be implemented if {@link CacheJdbcStoreSessionListener} + * is configured: + * <pre name="code" class="java"> + * private static class Store extends CacheStoreAdapter<Integer, Integer> { + * @CacheStoreSessionResource + * private CacheStoreSession ses; + * + * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException { + * // Get connection from the current session. + * Connection conn = ses.attachment(); + * + * // Execute update SQL query. + * try { + * conn.createStatement().executeUpdate("..."); + * } + * catch (SQLException e) { + * throw new CacheWriterException("Failed to update the store.", e); + * } + * } + * } + * </pre> + * JDBC connection will be automatically created by the listener + * at the start of the session and closed when it ends. + */ +public class CacheJdbcStoreSessionListener implements CacheStoreSessionListener, LifecycleAware { + /** Data source. */ + private DataSource dataSrc; + + /** + * Sets data source. + * <p> + * This is a required parameter. If data source is not set, + * exception will be thrown on startup. + * + * @param dataSrc Data source. + */ + public void setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + } + + /** + * Gets data source. + * + * @return Data source. + */ + public DataSource getDataSource() { + return dataSrc; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (dataSrc == null) + throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.'); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + if (ses.attachment() == null) { + try { + Connection conn = dataSrc.getConnection(); + + conn.setAutoCommit(false); + + ses.attach(conn); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + Connection conn = ses.attach(null); + + if (conn != null) { + try { + if (commit) + conn.commit(); + else + conn.rollback(); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + finally { + U.closeQuiet(conn); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java index 9cb5d3d..85fd08a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java @@ -18,7 +18,9 @@ package org.apache.ignite.cluster; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.*; import org.jetbrains.annotations.*; import java.util.*; @@ -33,7 +35,7 @@ import java.util.*; * You can use cluster node attributes to provide static information about a node. * This information is initialized once within a cluster, during the node startup, and * remains the same throughout the lifetime of a node. Use - * {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method to initialize your custom + * {@link IgniteConfiguration#getUserAttributes()} method to initialize your custom * node attributes at startup. Here is an example of how to assign an attribute to a node at startup: * <pre name="code" class="xml"> * <bean class="org.apache.ignite.configuration.IgniteConfiguration"> @@ -114,7 +116,7 @@ public interface ClusterNode { /** * Gets a node attribute. Attributes are assigned to nodes at startup - * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method. + * via {@link IgniteConfiguration#getUserAttributes()} method. * <p> * The system adds the following attributes automatically: * <ul> @@ -149,7 +151,7 @@ public interface ClusterNode { /** * Gets all node attributes. Attributes are assigned to nodes at startup - * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method. + * via {@link IgniteConfiguration#getUserAttributes()} method. * <p> * The system adds the following attributes automatically: * <ul> @@ -167,7 +169,7 @@ public interface ClusterNode { /** * Gets collection of addresses this node is known by. * <p> - * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that + * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that * address for all communications and returned collection will contain only that address. * If it is {@code null} then local wildcard address will be used, and Ignite * will make the best effort to supply all addresses of that node in returned collection. @@ -179,12 +181,12 @@ public interface ClusterNode { /** * Gets collection of host names this node is known by. * <p> - * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use + * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use * the host name of that resolved address for all communications and * returned collection will contain only that host name. * If that host name can not be resolved then ip address returned by method {@link #addresses()} is used. * <p> - * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used, + * If {@link IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used, * and this method returns host names of all addresses of that node. * * @return Collection of host names. @@ -238,9 +240,17 @@ public interface ClusterNode { public boolean isDaemon(); /** - * Tests whether or not this node is a client node. + * Tests whether or not this node is connected to cluster as a client. + * <p> + * Do not confuse client in terms of + * discovery {@link DiscoverySpi#isClientMode()} and client in terms of cache + * {@link IgniteConfiguration#isClientMode()}. Cache clients cannot carry data, + * while topology clients connect to topology in a different way. * * @return {@code True} if this node is a client node, {@code false} otherwise. + * @see IgniteConfiguration#isClientMode() + * @see Ignition#isClientMode() + * @see DiscoverySpi#isClientMode() */ public boolean isClient(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index df6b2ee..1aa4fd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -25,7 +25,6 @@ import org.apache.ignite.cache.eviction.*; import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.*; @@ -145,9 +144,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Default value for 'readFromBackup' flag. */ public static final boolean DFLT_READ_FROM_BACKUP = true; - /** Filter that accepts only server nodes. */ - public static final IgnitePredicate<ClusterNode> SERVER_NODES = new IgniteServerNodePredicate(); - /** Filter that accepts all nodes. */ public static final IgnitePredicate<ClusterNode> ALL_NODES = new IgniteAllNodesPredicate(); @@ -316,6 +312,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Cache topology validator. */ private TopologyValidator topValidator; + /** Cache store session listeners. */ + private Factory<? extends CacheStoreSessionListener>[] storeSesLsnrs; + /** Empty constructor (all values are initialized to their defaults). */ public CacheConfiguration() { /* No-op. */ @@ -389,6 +388,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { sqlOnheapRowCacheSize = cc.getSqlOnheapRowCacheSize(); startSize = cc.getStartSize(); storeFactory = cc.getCacheStoreFactory(); + storeSesLsnrs = cc.getCacheStoreSessionListenerFactories(); swapEnabled = cc.isSwapEnabled(); tmLookupClsName = cc.getTransactionManagerLookupClassName(); topValidator = cc.getTopologyValidator(); @@ -1664,7 +1664,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0, "Number of indexed types is expected to be even. Refer to method javadoc for details."); - this.indexedTypes = indexedTypes; + if (indexedTypes != null) { + int len = indexedTypes.length; + + Class<?>[] newIndexedTypes = new Class<?>[len]; + + for (int i = 0; i < len; i++) + newIndexedTypes[i] = U.box(indexedTypes[i]); + + this.indexedTypes = newIndexedTypes; + } + else + this.indexedTypes = null; return this; } @@ -1734,30 +1745,37 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { return this; } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheConfiguration.class, this); + /** + * Gets cache store session listener factories. + * + * @return Cache store session listener factories. + * @see CacheStoreSessionListener + */ + public Factory<? extends CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() { + return storeSesLsnrs; } /** - * Filter that accepts only server nodes. + * Cache store session listener factories. + * <p> + * These listeners override global listeners provided in + * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. + * + * @param storeSesLsnrs Cache store session listener factories. + * @return {@code this} for chaining. + * @see CacheStoreSessionListener */ - public static class IgniteServerNodePredicate implements IgnitePredicate<ClusterNode> { - /** */ - private static final long serialVersionUID = 0L; - - @Override public boolean apply(ClusterNode n) { - Boolean attr = n.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); - - return attr != null && !attr; - } + public CacheConfiguration setCacheStoreSessionListenerFactories( + Factory<? extends CacheStoreSessionListener>... storeSesLsnrs) { + this.storeSesLsnrs = storeSesLsnrs; - @Override public boolean equals(Object obj) { - if (obj == null) - return false; + return this; + } - return obj.getClass().equals(this.getClass()); - } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheConfiguration.class, this); } /** @@ -1767,10 +1785,12 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** */ private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public boolean apply(ClusterNode clusterNode) { return true; } + /** {@inheritDoc} */ @Override public boolean equals(Object obj) { if (obj == null) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index ebe2b8e..2d36c7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -18,6 +18,7 @@ package org.apache.ignite.configuration; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.events.*; @@ -52,6 +53,7 @@ import org.apache.ignite.spi.loadbalancing.roundrobin.*; import org.apache.ignite.spi.swapspace.*; import org.apache.ignite.spi.swapspace.file.*; +import javax.cache.configuration.*; import javax.cache.event.*; import javax.cache.expiry.*; import javax.cache.integration.*; @@ -334,9 +336,6 @@ public class IgniteConfiguration { /** Cache configurations. */ private CacheConfiguration[] cacheCfg; - /** Client cache configurations. */ - private NearCacheConfiguration[] nearCacheCfg; - /** Client mode flag. */ private Boolean clientMode; @@ -398,6 +397,9 @@ public class IgniteConfiguration { /** User's class loader. */ private ClassLoader classLdr; + /** Cache store session listeners. */ + private Factory<CacheStoreSessionListener>[] storeSesLsnrs; + /** * Creates valid grid configuration with all default values. */ @@ -478,6 +480,7 @@ public class IgniteConfiguration { segResolvers = cfg.getSegmentationResolvers(); sndRetryCnt = cfg.getNetworkSendRetryCount(); sndRetryDelay = cfg.getNetworkSendRetryDelay(); + storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories(); svcCfgs = cfg.getServiceConfiguration(); sysPoolSize = cfg.getSystemThreadPoolSize(); timeSrvPortBase = cfg.getTimeServerPortBase(); @@ -1823,9 +1826,11 @@ public class IgniteConfiguration { } /** - * Gets client mode flag. + * Gets client mode flag. Client node cannot hold data in the caches. It's recommended to use + * {@link DiscoverySpi} in client mode if this property is {@code true}. * * @return Client mode flag. + * @see TcpDiscoverySpi#setForceServerMode(boolean) */ public Boolean isClientMode() { return clientMode; @@ -2188,15 +2193,21 @@ public class IgniteConfiguration { } /** + * Gets plugin configurations. + * * @return Plugin configurations. + * @see PluginProvider */ public PluginConfiguration[] getPluginConfigurations() { return pluginCfgs; } /** + * Sets plugin configurations. + * * @param pluginCfgs Plugin configurations. * @return {@code this} for chaining. + * @see PluginProvider */ public IgniteConfiguration setPluginConfigurations(PluginConfiguration... pluginCfgs) { this.pluginCfgs = pluginCfgs; @@ -2242,6 +2253,35 @@ public class IgniteConfiguration { return classLdr; } + /** + * Gets cache store session listener factories. + * + * @return Cache store session listener factories. + * @see CacheStoreSessionListener + */ + public Factory<CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() { + return storeSesLsnrs; + } + + /** + * Cache store session listener factories. + * <p> + * These are global store session listeners, so they are applied to + * all caches. If you need to override listeners for a + * particular cache, use {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])} + * configuration property. + * + * @param storeSesLsnrs Cache store session listener factories. + * @return {@code this} for chaining. + * @see CacheStoreSessionListener + */ + public IgniteConfiguration setCacheStoreSessionListenerFactories( + Factory<CacheStoreSessionListener>... storeSesLsnrs) { + this.storeSesLsnrs = storeSesLsnrs; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java new file mode 100644 index 0000000..5a65bdb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.concurrent.*; + +/** + * Provides ability to execute IGFS code in a context of a specific user. + */ +public abstract class IgfsUserContext { + /** Thread local to hold the current user context. */ + private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>(); + + /** + * Executes given callable in the given user context. + * The main contract of this method is that {@link #currentUser()} method invoked + * inside closure always returns 'user' this callable executed with. + * @param user the user name to invoke closure on behalf of. + * @param clo the closure to execute + * @param <T> The type of closure result. + * @return the result of closure execution. + * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. + */ + public static <T> T doAs(String user, final IgniteOutClosure<T> clo) { + if (F.isEmpty(user)) + throw new IllegalArgumentException("Failed to use null or empty user name."); + + final String ctxUser = userStackThreadLocal.get(); + + if (F.eq(ctxUser, user)) + return clo.apply(); // correct context is already there + + userStackThreadLocal.set(user); + + try { + return clo.apply(); + } + finally { + userStackThreadLocal.set(ctxUser); + } + } + + /** + * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts + * callable that throws checked Exception. + * The Exception is not ever wrapped anyhow. + * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is: + * <pre name="code" class="java"> + * public Foo myOperation() throws MyCheckedException1, MyCheckedException2 { + * try { + * return IgfsUserContext.doAs(user, new Callable<Foo>() { + * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 { + * return makeSomeFoo(); // do the job + * } + * }); + * } + * catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) { + * throw e; + * } + * catch (Exception e) { + * throw new AssertionError("Must never go there."); + * } + * } + * </pre> + * @param user the user name to invoke closure on behalf of. + * @param clbl the Callable to execute + * @param <T> The type of callable result. + * @return the result of closure execution. + * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. + */ + public static <T> T doAs(String user, final Callable<T> clbl) throws Exception { + if (F.isEmpty(user)) + throw new IllegalArgumentException("Failed to use null or empty user name."); + + final String ctxUser = userStackThreadLocal.get(); + + if (F.eq(ctxUser, user)) + return clbl.call(); // correct context is already there + + userStackThreadLocal.set(user); + + try { + return clbl.call(); + } + finally { + userStackThreadLocal.set(ctxUser); + } + } + + /** + * Gets the current context user. + * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will + * return null. Otherwise it will return the user name set in the most lower + * {@link #doAs(String, IgniteOutClosure)} call on the call stack. + * @return The current user, may be null. + */ + @Nullable public static String currentUser() { + return userStackThreadLocal.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java index 9026eac..cb69352 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java @@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem { * @return Map of properties. */ public Map<String,String> properties(); + + + /** + * Closes the secondary file system. + * @throws IgniteException in case of an error. + */ + public void close() throws IgniteException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java index 4d5d146..6da45ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java @@ -1247,6 +1247,20 @@ public class ClusterMetricsSnapshot implements ClusterMetrics { /** * Serializes node metrics into byte array. * + * @param metrics Node metrics to serialize. + * @return New offset. + */ + public static byte[] serialize(ClusterMetrics metrics) { + byte[] buf = new byte[METRICS_SIZE]; + + serialize(buf, 0, metrics); + + return buf; + } + + /** + * Serializes node metrics into byte array. + * * @param data Byte array. * @param off Offset into byte array. * @param metrics Node metrics to serialize.